作者 王勇

超级用户的密码,改成AES加密

... ... @@ -50,7 +50,7 @@ public class RabbitController {
@GetMapping("/test/consumer")
public void consumer() throws IOException, TimeoutException
{
directUtils.directConsumer("Q_zicheng", "E_zicheng", "R_zicheng");
}
@PostMapping("/product/old")
... ...
... ... @@ -5,7 +5,7 @@ import com.rabbitmq.http.client.domain.UserPermissions;
import com.sunyo.wlpt.message.bus.service.domain.BusServer;
import com.sunyo.wlpt.message.bus.service.domain.UserInfo;
import com.sunyo.wlpt.message.bus.service.domain.VirtualHost;
import com.sunyo.wlpt.message.bus.service.utils.EncryptionUtils;
import com.sunyo.wlpt.message.bus.service.utils.AESUtils;
import java.io.IOException;
import java.net.URISyntaxException;
... ... @@ -25,9 +25,7 @@ public class ClientUtils {
// 该服务器超级用户的用户名称
String superUsername = busServer.getSuperUsername();
// 该服务器超级用户的用户密码
String base = EncryptionUtils.decryptBase64(busServer.getSuperPassword());
String[] split = base.split("\\.");
String superPassword = split[split.length - 1];
String superPassword = AESUtils.decrypt(busServer.getSuperPassword());
// 服务器的客户端端口号
String clientPort = busServer.getClientPort().toString();
... ...
... ... @@ -4,15 +4,13 @@ import com.rabbitmq.client.*;
import com.sunyo.wlpt.message.bus.service.domain.XmlData;
import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.utils.EncryptionUtils;
import com.sunyo.wlpt.message.bus.service.utils.AESUtils;
import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
... ... @@ -29,60 +27,6 @@ public class DirectUtils {
@Autowired
private StringRedisTemplate redisTemplate;
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String vHost;
/**
* @return 链接 rabbitmq
* @throws IOException IO异常
* @throws TimeoutException 超时异常
*/
@PostConstruct
public Connection getConnection() throws IOException, TimeoutException
{
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost(host);
//端口,amqp协议 端口 类似与mysql的3306
factory.setPort(port);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost(vHost);
factory.setUsername(username);
factory.setPassword(password);
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
public Connection getConnection(String hostIp, int hostPort, String vHostName) throws Exception
{
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost(hostIp);
//端口
factory.setPort(hostPort);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost(vHostName);
factory.setUsername(username);
factory.setPassword(password);
// 通过工程获取连接
return factory.newConnection();
}
/**
* 链接 RabbitMQ
*
... ... @@ -96,8 +40,6 @@ public class DirectUtils {
*/
public static Connection getConnection(String hostIp, int hostPort, String vHostName, String userName, String password) throws IOException, TimeoutException
{
String base = EncryptionUtils.decryptBase64(password);
String[] split = base.split("\\.");
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
... ... @@ -107,7 +49,7 @@ public class DirectUtils {
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost(vHostName);
factory.setUsername(userName);
factory.setPassword(split[split.length - 1]);
factory.setPassword(AESUtils.decrypt(password));
// 通过工程获取连接
return factory.newConnection();
}
... ... @@ -132,79 +74,21 @@ public class DirectUtils {
}
}
/**
* DirectExchange的 消息生产者
*
* @param exchangeName 交换机名称
* @param routingKeyName 路由键名称
* @param msg 发送的消息
* @throws IOException
* @throws TimeoutException
*/
public void directProducer(String exchangeName, String routingKeyName, String msg) throws IOException, TimeoutException
{
// 1、创建ConnectionFactory
Connection connection = getConnection();
// 2、 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
// 3、开启消息的确认机制(confirm:保证消息能够发送到 exchange)
channel.confirmSelect();
// 4、避免消息被重复消费
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
// 指定消息是否需要持久化,1:需要持久化;2:不需要持久化
.deliveryMode(1)
// 设置全局唯一消息机制id(雪花id)
.messageId(IdUtils.generateId())
.build();
// 5、开启 return 机制(保证消息,从 Exchange 分发到 Queue )
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException
{
// 当消息没有从 Exchange 分发到 Queue 时,才会执行
log.error(new String(body, "UTF8") + "->没有从 Exchange 分发到Queue中");
}
});
// 6、发送消息,并指定 mandatory 参数为true
channel.basicPublish(exchangeName, routingKeyName, true, properties, msg.getBytes());
log.info("消息生产者,目标交换机:{};路由键:{};发送信息:{}", exchangeName, routingKeyName, msg);
// 7、添加一个异步 confirm 确认监听,用于发送消息到Broker端之后,回送消息的监听
channel.addConfirmListener(new ConfirmListener() {
// 发送成功
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException
{
log.info("消息发送成功,标识:{};是否是批量:{}", deliveryTag, multiple);
}
// 发送失败
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException
{
log.error("消息发送失败,标识:{};是否是批量:{}", deliveryTag, multiple);
}
});
// finally,关闭连接
closeConnectionAndChanel(channel, connection);
}
/**
* DirectExchange的 消息消费者
*
* @throws IOException IO异常
* @throws TimeoutException 超时异常
*/
public void directConsumer(String queueName, String exchangeName, String routingKeyName) throws IOException, TimeoutException
public void directConsumer(XmlData xmlData) throws IOException, TimeoutException
{
// 1、创建ConnectionFactory
Connection connection = getConnection();
Connection connection = getConnection(xmlData.getServerIp(), xmlData.getServerPort(),
xmlData.getVirtualHostName(), xmlData.getSuperUsername(), xmlData.getSuperPassword());
// 2、 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
// 3、设置绑定关系(队列、交换机名称、路由键名称)
channel.queueBind(queueName, exchangeName, routingKeyName);
// channel.queueBind(queueName, exchangeName, routingKeyName);
// 一次只接受一条未确认的消息
channel.basicQos(1);
// 4、开启监听Queue
... ... @@ -239,7 +123,7 @@ public class DirectUtils {
}
};
// 消费消息
channel.basicConsume(queueName, false, consumer);
channel.basicConsume(xmlData.getQueueName(), false, consumer);
}
... ...
... ... @@ -8,9 +8,8 @@ import com.sunyo.wlpt.message.bus.service.domain.*;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.BusServerService;
import com.sunyo.wlpt.message.bus.service.service.VirtualHostService;
import com.sunyo.wlpt.message.bus.service.utils.EncryptionUtils;
import com.sunyo.wlpt.message.bus.service.utils.AESUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
... ... @@ -32,21 +31,6 @@ public class RabbitUtils {
@Resource
private BusServerService busServerService;
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String vHost;
/**
* 关闭连接与通道
*
... ... @@ -70,37 +54,15 @@ public class RabbitUtils {
/**
* 获取 rabbitMq 的连接,重载
*/
public Connection getConnection() throws IOException, TimeoutException
{
// 定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost(host);
// 端口,amqp协议 端口 类似与mysql的3306
factory.setPort(port);
// 设置账号信息,用户名、密码、vhost
factory.setVirtualHost(vHost);
factory.setUsername(username);
factory.setPassword(password);
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
/**
* 获取 rabbitMq 的连接,重载
*/
public Connection getConnection(String serverIp, Integer serverPort, String virtualHostName, String superUsername, String superPassword)
throws IOException, TimeoutException
{
String base = EncryptionUtils.decryptBase64(superPassword);
String[] split = base.split("\\.");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(serverIp);
factory.setPort(serverPort);
factory.setVirtualHost(virtualHostName);
factory.setUsername(superUsername);
factory.setPassword(split[split.length - 1]);
factory.setPassword(AESUtils.decrypt(superPassword));
Connection connection = factory.newConnection();
return connection;
}
... ...
... ... @@ -7,7 +7,7 @@ import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper;
import com.sunyo.wlpt.message.bus.service.mapper.UserInfoMapper;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
import com.sunyo.wlpt.message.bus.service.service.*;
import com.sunyo.wlpt.message.bus.service.utils.EncryptionUtils;
import com.sunyo.wlpt.message.bus.service.utils.AESUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
... ... @@ -107,9 +107,9 @@ public class BusServerServiceImpl implements BusServerService {
@Override
public int insertSelective(BusServer server)
{
// 超级用户的密码,设计规则(超级用户名+.+超级用户密码)
String key = server.getSuperUsername() + "." + server.getSuperPassword();
server.setSuperPassword(EncryptionUtils.encryptBase64(key));
// 超级用户的密码,使用AES加密
String superPassword = server.getSuperPassword();
server.setSuperPassword(AESUtils.encrypt(superPassword));
return busServerMapper.insertSelective(server);
}
... ...
package com.sunyo.wlpt.message.bus.service.utils;
import lombok.extern.slf4j.Slf4j;
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.util.Base64;
/**
* @author 子诚
* Description:对称加密->AES
* 时间:2020/8/24 17:47
*/
@Slf4j
public class AESUtils {
// public static void main(String[] args)
// {
// String encrypt = encrypt("vmvnv1v2");
// System.out.println(encrypt);
// }
/**
* 自定义的默认的秘钥(128比特位,即长度为16)
*/
private static final String KEY = "1234shangyou4321";
/**
* 定义AES加密逻辑,加密
*/
public static String encrypt(String content)
{
String encrypt = encrypt(content, KEY);
return encrypt;
}
/**
* 定义AES解密逻辑,解密
* 与加密逻辑对应
*/
public static String decrypt(String content)
{
String decrypt = decrypt(content, KEY);
return decrypt;
}
/**
* AES加密
*
* @param content
* @return
*/
public static String encrypt(String content, String securityKey)
{
try {
SecretKey key = new SecretKeySpec(securityKey.getBytes("UTF-8"), "AES");
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.ENCRYPT_MODE, key);
return new String(Base64.getEncoder().encode(cipher.doFinal(content.getBytes("utf-8"))));
} catch (Exception e) {
if (log.isDebugEnabled()) {
e.printStackTrace();
}
}
return content;
}
/**
* AES解密
*
* @param content
* @return
*/
public static String decrypt(String content, String securityKey)
{
try {
SecretKey key = new SecretKeySpec(securityKey.getBytes("UTF-8"), "AES");
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.DECRYPT_MODE, key);
return new String(cipher.doFinal(Base64.getDecoder().decode(content)), "utf-8");
} catch (Exception e) {
if (log.isDebugEnabled()) {
e.printStackTrace();
}
}
return content;
}
}
... ...