作者 王勇

重新修改关于rabbitMQ的封装

package com.sunyo.wlpt.message.bus.service.rabbit.utils;
import com.rabbitmq.http.client.Client;
import com.rabbitmq.http.client.domain.TopicPermissions;
import com.rabbitmq.http.client.domain.UserPermissions;
import com.sunyo.wlpt.message.bus.service.domain.BusServer;
import com.sunyo.wlpt.message.bus.service.domain.UserInfo;
import com.sunyo.wlpt.message.bus.service.domain.VirtualHost;
import com.sunyo.wlpt.message.bus.service.utils.EncryptionUtils;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
/**
* @author 子诚
* Description:com.rabbitmq.http.client的封装类
* 时间:2020/8/13 17:57
*/
public class ClientUtils {
public static Client connectClient(BusServer busServer) throws IOException, URISyntaxException
{
// 服务器的IP地址
String host = busServer.getServerIp();
// 该服务器超级用户的用户名称
String superUsername = busServer.getSuperUsername();
// 该服务器超级用户的用户密码
String base = EncryptionUtils.decryptBase64(busServer.getSuperPassword());
String[] split = base.split("\\.");
String superPassword = split[split.length - 1];
// 服务器的客户端端口号
String clientPort = busServer.getClientPort().toString();
// 访问客户端的url
String url = "http://" + host + ":" + clientPort + "/api";
Client client = new Client(url, superUsername, superPassword);
return client;
}
/**
* 创建MQ用户
*
* @param userInfo 用户信息
* @param busServer {@link BusServer}
* @param password 新增用户的密码
*/
public static void addRabbitUser(UserInfo userInfo, BusServer busServer, String password) throws IOException, URISyntaxException
{
// 新增用户的用户名称
String username = userInfo.getUsername();
// 虚拟主机名称
String virtualHostName = userInfo.getVirtualHostName();
// 与客户端建立连接
Client client = connectClient(busServer);
ArrayList<String> list = new ArrayList<>();
// 创建用户,权限为none
client.createUser(username, password.toCharArray(), list);
// 用户与虚拟主机建立联系
UserPermissions p = new UserPermissions();
p.setConfigure(username + ".*");
p.setRead(username + ".*");
p.setWrite(username + ".*");
client.updatePermissions(virtualHostName, username, p);
TopicPermissions topicPermissions = new TopicPermissions();
topicPermissions.setVhost(virtualHostName);
topicPermissions.setExchange("");
topicPermissions.setRead(".*");
topicPermissions.setWrite(".*");
client.updateTopicPermissions(virtualHostName, username, topicPermissions);
}
/**
* 删除用户
*/
public static void deleteMQUser(UserInfo userInfo, BusServer busServer) throws IOException, URISyntaxException
{
String username = userInfo.getUsername();
Client client = connectClient(busServer);
client.deleteUser(username);
}
/**
* 修改MQ的用户密码
*/
public static void updatePassword(BusServer busServer, String username, String password) throws IOException, URISyntaxException
{
char[] newPassword = password.toCharArray();
Client client = connectClient(busServer);
ArrayList<String> tags = new ArrayList<>();
client.updateUser(username, newPassword, tags);
}
/**
* 创建虚拟主机
*/
public static void createVirtualHost(BusServer busServer, String vHost) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
client.createVhost(vHost);
}
/**
* 创建虚拟主机
*/
public static void createVirtualHost(BusServer busServer, VirtualHost vHost) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
client.createVhost(vHost.getVirtualHostName(),vHost.getDescription());
}
/**
* 删除虚拟主机
*/
public static void deleteVirtualHost(BusServer busServer, String vHost) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
client.deleteVhost(vHost);
}
/**
* 清楚用户与虚拟主机之间的关系
*/
public static void clearPermissions(BusServer busServer, String vHost, String username) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
client.clearPermissions(vHost, username);
client.clearTopicPermissions(vHost, username);
}
}
... ...
... ... @@ -4,6 +4,7 @@ import com.rabbitmq.client.*;
import com.sunyo.wlpt.message.bus.service.domain.XmlData;
import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.utils.EncryptionUtils;
import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
... ... @@ -95,6 +96,8 @@ public class DirectUtils {
*/
public static Connection getConnection(String hostIp, int hostPort, String vHostName, String userName, String password) throws Exception
{
String base = EncryptionUtils.decryptBase64(password);
String[] split = base.split("\\.");
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
... ... @@ -104,7 +107,7 @@ public class DirectUtils {
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost(vHostName);
factory.setUsername(userName);
factory.setPassword(password);
factory.setPassword(split[split.length - 1]);
// 通过工程获取连接
return factory.newConnection();
}
... ... @@ -258,7 +261,7 @@ public class DirectUtils {
{
// 1、创建Connection
Connection connection = getConnection(xmlData.getServerIp(), xmlData.getServerPort(),
xmlData.getVirtualHostName());
xmlData.getVirtualHostName(),xmlData.getSuperUsername(), xmlData.getSuperPassword());
// 2、 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
// 3、开启消息的确认机制(confirm:保证消息能够发送到 exchange)
... ...
... ... @@ -6,6 +6,7 @@ import com.rabbitmq.client.ConnectionFactory;
import com.sunyo.wlpt.message.bus.service.domain.*;
import com.sunyo.wlpt.message.bus.service.service.BusServerService;
import com.sunyo.wlpt.message.bus.service.service.VirtualHostService;
import com.sunyo.wlpt.message.bus.service.utils.EncryptionUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
... ... @@ -87,64 +88,28 @@ public class RabbitUtils {
/**
* 获取 rabbitMq 的连接,重载
*/
public Connection getConnection(String virtualHostName) throws IOException, TimeoutException
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setVirtualHost(virtualHostName);
factory.setUsername(username);
factory.setPassword(password);
Connection connection = factory.newConnection();
return connection;
}
/**
* 获取 rabbitMq 的连接,重载
*/
public Connection getConnection(String serverIp, Integer serverPort, String virtualHostName)
public Connection getConnection(String serverIp, Integer serverPort, String virtualHostName, String superUsername, String superPassword)
throws IOException, TimeoutException
{
String base = EncryptionUtils.decryptBase64(superPassword);
String[] split = base.split("\\.");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(serverIp);
factory.setPort(serverPort);
factory.setVirtualHost(virtualHostName);
factory.setUsername(username);
factory.setPassword(password);
factory.setUsername(superUsername);
factory.setPassword(split[split.length - 1]);
Connection connection = factory.newConnection();
return connection;
}
/**
* 获取 rabbitMq 的连接,重载
*
* @param hostIp 服务器ip
* @param hostPort 服务器端口号
* @param vHostName 虚拟主机名称
* @param userName 用户名
* @param password 密码
* @return
* @throws Exception
*/
public static Connection getConnection(String hostIp, int hostPort, String vHostName, String userName, String password)
throws Exception
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostIp);
factory.setPort(hostPort);
factory.setVirtualHost(vHostName);
factory.setUsername(userName);
factory.setPassword(password);
return factory.newConnection();
}
/**
* 添加交换机
*/
public void createExchange(String serverIp, Integer serverPort, String virtualHostName, BusExchange busExchange)
public void createExchange(BusServer server, String virtualHostName, BusExchange busExchange)
throws IOException, TimeoutException
{
Connection connection = getConnection(serverIp, serverPort, virtualHostName);
Connection connection = getConnection(server.getServerIp(), server.getServerPort(), virtualHostName, server.getSuperUsername(), server.getSuperPassword());
Channel channel = connection.createChannel();
channel.exchangeDeclare(busExchange.getExchangeName(),
busExchange.getExchangeType(),
... ... @@ -158,10 +123,10 @@ public class RabbitUtils {
/**
* 删除交换机 channel.exchangeDelete(exchangeName);
*/
public void removeExchange(String serverIp, Integer serverPort, String virtualHostName, BusExchange busExchange)
public void removeExchange(BusServer server, String virtualHostName, BusExchange busExchange)
throws IOException, TimeoutException
{
Connection connection = getConnection(serverIp, serverPort, virtualHostName);
Connection connection = getConnection(server.getServerIp(), server.getServerPort(), virtualHostName, server.getSuperUsername(), server.getSuperPassword());
Channel channel = connection.createChannel();
channel.exchangeDelete(busExchange.getExchangeName());
closeConnectionAndChanel(channel, connection);
... ... @@ -170,10 +135,10 @@ public class RabbitUtils {
/**
* 添加队列(默认设置参数为 null)
*/
public void createQueue(String serverIp, Integer serverPort, String virtualHostName, BusQueue busQueue)
public void createQueue(BusServer server, String virtualHostName, BusQueue busQueue)
throws IOException, TimeoutException
{
Connection connection = getConnection(serverIp, serverPort, virtualHostName);
Connection connection = getConnection(server.getServerIp(), server.getServerPort(), virtualHostName, server.getSuperUsername(), server.getSuperPassword());
Channel channel = connection.createChannel();
channel.queueDeclare(busQueue.getQueueName(),
busQueue.getDurability(),
... ... @@ -190,10 +155,10 @@ public class RabbitUtils {
/**
* 删除队列 channel.queueDelete(queueName);
*/
public void removeQueue(String serverIp, Integer serverPort, String virtualHostName, BusQueue busQueue)
public void removeQueue(BusServer server, String virtualHostName, BusQueue busQueue)
throws IOException, TimeoutException
{
Connection connection = getConnection(serverIp, serverPort, virtualHostName);
Connection connection = getConnection(server.getServerIp(), server.getServerPort(), virtualHostName, server.getSuperUsername(), server.getSuperPassword());
Channel channel = connection.createChannel();
channel.queueDelete(busQueue.getQueueName());
closeConnectionAndChanel(channel, connection);
... ... @@ -202,10 +167,10 @@ public class RabbitUtils {
/**
* 创建绑定
*/
public void createBinding(String serverIp, Integer serverPort, String virtualHostName, UserMessageBinding userMessageBinding)
public void createBinding(BusServer server, String virtualHostName, UserMessageBinding userMessageBinding)
throws IOException, TimeoutException
{
Connection connection = getConnection(serverIp, serverPort, virtualHostName);
Connection connection = getConnection(server.getServerIp(), server.getServerPort(), virtualHostName, server.getSuperUsername(), server.getSuperPassword());
Channel channel = connection.createChannel();
channel.queueBind(userMessageBinding.getQueueName(),
userMessageBinding.getExchangeName(),
... ... @@ -216,11 +181,10 @@ public class RabbitUtils {
/**
* 解除绑定 channel.queueUnbind("queueName", "exchangeName","routingKey");
*/
public void removeBinding(String serverIp, Integer serverPort, String virtualHostName,
UserMessageBinding userMessageBinding)
public void removeBinding(BusServer server, String virtualHostName, UserMessageBinding userMessageBinding)
throws IOException, TimeoutException
{
Connection connection = getConnection(serverIp, serverPort, virtualHostName);
Connection connection = getConnection(server.getServerIp(), server.getServerPort(), virtualHostName, server.getSuperUsername(), server.getSuperPassword());
Channel channel = connection.createChannel();
channel.queueUnbind(userMessageBinding.getQueueName(),
userMessageBinding.getExchangeName(),
... ... @@ -231,41 +195,45 @@ public class RabbitUtils {
/**
* 前往创建交换机的路上
*/
public void toCreateExchange(BusExchange busExchange) throws IOException, TimeoutException
public void toCreateExchange(BusExchange busExchange)
throws IOException, TimeoutException
{
VirtualHost virtualHost = getVirtualHost(busExchange.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
createExchange(busServer.getServerIp(), busServer.getServerPort(), virtualHost.getVirtualHostName(), busExchange);
createExchange(busServer, virtualHost.getVirtualHostName(), busExchange);
}
/**
* 前往删除交换机的路上
*/
public void toRemoveExchange(BusExchange busExchange) throws IOException, TimeoutException
public void toRemoveExchange(BusExchange busExchange)
throws IOException, TimeoutException
{
VirtualHost virtualHost = getVirtualHost(busExchange.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
removeExchange(busServer.getServerIp(), busServer.getServerPort(), virtualHost.getVirtualHostName(), busExchange);
removeExchange(busServer, virtualHost.getVirtualHostName(), busExchange);
}
/**
* 前往创建队列的路上
*/
public void toCreateQueue(BusQueue BusQueue) throws IOException, TimeoutException
public void toCreateQueue(BusQueue BusQueue)
throws IOException, TimeoutException
{
VirtualHost virtualHost = getVirtualHost(BusQueue.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
createQueue(busServer.getServerIp(), busServer.getServerPort(), virtualHost.getVirtualHostName(), BusQueue);
createQueue(busServer, virtualHost.getVirtualHostName(), BusQueue);
}
/**
* 前往删除队列的路上
*/
public void toRemoveQueue(BusQueue BusQueue) throws IOException, TimeoutException
public void toRemoveQueue(BusQueue BusQueue)
throws IOException, TimeoutException
{
VirtualHost virtualHost = getVirtualHost(BusQueue.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
removeQueue(busServer.getServerIp(), busServer.getServerPort(), virtualHost.getVirtualHostName(), BusQueue);
removeQueue(busServer, virtualHost.getVirtualHostName(), BusQueue);
}
/**
... ... @@ -276,7 +244,7 @@ public class RabbitUtils {
{
VirtualHost virtualHost = getVirtualHost(userMessageBinding.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
createBinding(busServer.getServerIp(), busServer.getServerPort(), virtualHost.getVirtualHostName(), userMessageBinding);
createBinding(busServer, virtualHost.getVirtualHostName(), userMessageBinding);
}
/**
... ... @@ -287,7 +255,7 @@ public class RabbitUtils {
{
VirtualHost virtualHost = getVirtualHost(userMessageBinding.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
removeBinding(busServer.getServerIp(), busServer.getServerPort(), virtualHost.getVirtualHostName(), userMessageBinding);
removeBinding(busServer, virtualHost.getVirtualHostName(), userMessageBinding);
}
/**
... ...