作者 王勇

操作MQ动态创建删除交换机、队列

... ... @@ -3,6 +3,11 @@ server:
# spring 配置
spring:
security:
user:
name: admin
password: 123456
application:
name: message-bus-service
... ... @@ -58,11 +63,6 @@ spring:
default-property-inclusion: ALWAYS
time-zone: GMT+8
date-format: yyyy-MM-dd HH:mm:ss
# Spring Security配置
security:
user:
name: admin
password: 123456
# zipkin 链路追踪配置
zipkin:
... ... @@ -96,6 +96,7 @@ eureka:
user:
name: "admin"
password: "123456"
client:
healthcheck:
enabled: true
... ...
... ... @@ -16,9 +16,5 @@ public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http.csrf().disable();
//http.authorizeRequests().antMatchers("/","/bus/**").permitAll().and().csrf().disable();
}
}
... ...
... ... @@ -8,7 +8,9 @@ import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static com.sunyo.wlpt.message.bus.service.common.Constant.EXIST_EXCHANGE_INFO;
... ... @@ -39,7 +41,8 @@ public class BusExchangeController {
@RequestParam(value = "exchangeName", required = false) String exchangeName,
@RequestParam(value = "virtualHostId", required = false) String virtualHostId,
@RequestParam(value = "pageNum", defaultValue = "1") Integer pageNum,
@RequestParam(value = "pageSize", defaultValue = "10") Integer pageSize) {
@RequestParam(value = "pageSize", defaultValue = "10") Integer pageSize)
{
// 获取查询参数
BusExchange busExchange = BusExchange.builder().exchangeName(exchangeName).virtualHostId(virtualHostId).build();
// 分页查询
... ... @@ -56,8 +59,8 @@ public class BusExchangeController {
* @return {@link ResultJson}
*/
@DeleteMapping("/delete")
public ResultJson deleteBusExchange(@RequestBody BusExchange busExchange) {
public ResultJson deleteBusExchange(@RequestBody BusExchange busExchange) throws IOException, TimeoutException
{
return busExchangeService.deleteByPrimaryKey(busExchange.getId()) > 0
? new ResultJson<>("200", "删除MQ交换机,成功")
: new ResultJson<>("500", "删除MQ交换机,失败");
... ... @@ -70,8 +73,8 @@ public class BusExchangeController {
* @return {@link ResultJson}
*/
@GetMapping("/batchRemove")
public ResultJson batchRemoveBusExchange(String ids) {
public ResultJson batchRemoveBusExchange(String ids) throws IOException, TimeoutException
{
return busExchangeService.deleteByPrimaryKey(ids) > 0
? new ResultJson<>("200", "批量删除MQ交换机,成功")
: new ResultJson<>("500", "批量删除MQ交换机,失败");
... ... @@ -84,7 +87,8 @@ public class BusExchangeController {
* @return {@link ResultJson}
*/
@PutMapping("/update")
public ResultJson updateBusExchange(@RequestBody BusExchange busExchange) {
public ResultJson updateBusExchange(@RequestBody BusExchange busExchange)
{
// 先验证,修改好的核心信息(交换机名称)是否已存在
String message = validateBusExchange(busExchange);
... ... @@ -102,7 +106,8 @@ public class BusExchangeController {
* @return {@link ResultJson}
*/
@PostMapping("/insert")
public ResultJson insertBusExchange(@RequestBody BusExchange busExchange) {
public ResultJson insertBusExchange(@RequestBody BusExchange busExchange) throws IOException, TimeoutException
{
//先验证,增加的虚拟主机的核心信息(交换机名称)是否已存在
String message = validateBusExchange(busExchange);
// 设置id
... ... @@ -121,7 +126,8 @@ public class BusExchangeController {
* @param busExchange {@link BusExchange}
* @return 通过,无返回消息
*/
private String validateBusExchange(BusExchange busExchange) {
private String validateBusExchange(BusExchange busExchange)
{
// 判断交换机名称,是否为空
if ("".equals(busExchange.getExchangeName()) || busExchange.getExchangeName() == null) {
return "该交换机信息中,没有交换机名称";
... ...
... ... @@ -11,7 +11,9 @@ import io.netty.util.internal.StringUtil;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static com.sunyo.wlpt.message.bus.service.common.Constant.EXIST_QUEUE_INFO;
... ... @@ -67,9 +69,8 @@ public class BusQueueController {
* @return {@link ResultJson}
*/
@DeleteMapping("/delete")
public ResultJson deleteBusQueue(@RequestBody BusQueue busQueue)
public ResultJson deleteBusQueue(@RequestBody BusQueue busQueue) throws IOException, TimeoutException
{
return busQueueService.deleteByPrimaryKey(busQueue.getId()) > 0
? new ResultJson<>("200", "删除消息队列,成功")
: new ResultJson<>("500", "删除消息队列,失败");
... ... @@ -82,9 +83,8 @@ public class BusQueueController {
* @return {@link ResultJson}
*/
@GetMapping("/batchRemove")
public ResultJson batchRemoveBusQueue(String ids)
public ResultJson batchRemoveBusQueue(String ids) throws IOException, TimeoutException
{
return busQueueService.deleteByPrimaryKey(ids) > 0
? new ResultJson<>("200", "批量删除消息队列,成功")
: new ResultJson<>("500", "批量删除消息队列,失败");
... ... @@ -115,7 +115,7 @@ public class BusQueueController {
* @return {@link ResultJson}
*/
@PostMapping("/insert")
public ResultJson insertBusQueue(@RequestBody BusQueue busQueue)
public ResultJson insertBusQueue(@RequestBody BusQueue busQueue) throws IOException, TimeoutException
{
//先验证,增加的虚拟主机的核心信息(交换机名称)是否已存在
String message = validateBusQueue(busQueue);
... ...
... ... @@ -3,10 +3,8 @@ package com.sunyo.wlpt.message.bus.service.controller;
import com.sunyo.wlpt.message.bus.service.domain.XmlData;
import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.DirectUtils;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.AsyncTaskService;
import com.sunyo.wlpt.message.bus.service.service.MessageNoteService;
import com.sunyo.wlpt.message.bus.service.service.UserMessageBindingService;
import com.sunyo.wlpt.message.bus.service.utils.XmlUtils;
import lombok.extern.slf4j.Slf4j;
... ... @@ -36,12 +34,6 @@ public class RabbitController {
private XmlUtils xmlUtils;
@Resource
private MessageNoteService messageNoteService;
@Resource
private RabbitUtils rabbitUtils;
@Resource
private DirectUtils directUtils;
@Resource
... ...
package com.sunyo.wlpt.message.bus.service.rabbit.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import javax.annotation.Resource;
/**
* @author 子诚
* Description:开启消息可靠性(confirm和return机制)
* 时间:2020/7/20 9:30
*/
@Slf4j
//@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 初始化的方法,设置当前上下文(下面的两个方法),为confirm和return机制的回滚事件
*/
// @PostConstruct
public void initMethod() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 根据手动ack来确定
if (ack) {
log.info("消息发送至exchange,成功");
} else {
log.error("消息发送至exchange,失败");
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息从exchange分发到queue,失败");
}
}
package com.sunyo.wlpt.message.bus.service.rabbit.test;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @author 子诚
* Description:测试-消息-消费者
* 时间:2020/7/20 11:09
*/
@Slf4j
@Component
public class TestConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
// @RabbitListener(queues = "Q_test")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
try {
// 0、获取messageId
String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
// 1、设置key到redis,设置缓存时间为10秒
if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)) {
// 2、消费消息
log.info("成功消费了消息:" + msg);
// 3、设置key的value为1
redisTemplate.opsForValue().set(messageId, "1", 10, TimeUnit.SECONDS);
// 4、手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} else {
// 获取redis中的value,如果是1,就手动ack。如果是0,就什么也不做(是0代表着,正在被消费中)
if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {
// 手动ack,第一个参数是所确认消息的标识,第二参数是是否批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
}
/**
* 设置Qos机制
* 第一个参数:单条消息的大小(0表示即无限制)
* 第二个参数:每次处理消息的数量
* 第三个参数:是否为consumer级别(false表示仅当前channel有效)
*/
// channel.basicQos(0, 1, false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
log.error("消息ID:" + message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));
log.error("接收消息发送错误:" + e.getMessage());
}
}
}
package com.sunyo.wlpt.message.bus.service.rabbit.test;
import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
/**
* @author 子诚
* Description:测试-消息-生产者
* 时间:2020/7/20 11:08
*/
@Slf4j
@Component
public class TestProduct {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* @param exchangeName 交换机名称
* @param routingKeyName 路由键名称
* @param msg 发送的消息
*/
public void sentMessage(String exchangeName, String routingKeyName, String msg) throws IOException {
CorrelationData messageId = new CorrelationData(IdUtils.generateId());
rabbitTemplate.convertAndSend(exchangeName, routingKeyName, msg, messageId);
log.info("成功发送消息-> {};到交换机->{};路由键为 ->{};", msg, exchangeName, routingKeyName);
}
}
package com.sunyo.wlpt.message.bus.service.rabbit.utils;
import com.sunyo.wlpt.message.bus.service.domain.BusExchange;
import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import com.sunyo.wlpt.message.bus.service.domain.UserMessageBinding;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author 子诚
* Description:
* 时间:2020/7/16 16:32
*/
@Slf4j
@RefreshScope
@Component
public class BootRabbitUtils {
@Resource
private AmqpAdmin amqpAdmin;
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 创建交换机(交换机名称,是否持久化,是否删除)
*
* @param busExchange {@link BusExchange}
*/
public void createExchange(BusExchange busExchange) {
// 类型-直连路由
String type_direct = "direct";
// 类型-动态路由
String type_topic = "topic";
// 类型-广播
String type_fanout = "fanout";
// 类型-头部
String type_headers = "headers";
// 创建交换机,直连接类型
if (type_direct.equals(busExchange.getExchangeType())) {
amqpAdmin.declareExchange(
new DirectExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
);
log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_direct);
}
// 创建交换机,扇形交换机
if (type_topic.equals(busExchange.getExchangeType())) {
amqpAdmin.declareExchange(
new TopicExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
);
log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_topic);
}
// 创建交换机,广播(主题)交换机
if (type_fanout.equals(busExchange.getExchangeType())) {
amqpAdmin.declareExchange(
new FanoutExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
);
log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_fanout);
}
// 创建交换机,首部交换机
if (type_headers.equals(busExchange.getExchangeType())) {
amqpAdmin.declareExchange(
new HeadersExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
);
log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_headers);
}
}
/**
* 创建交换机,通过 exchangeName 创建
*/
public void createExchange(String exchangeName) {
amqpAdmin.declareExchange(
new DirectExchange(exchangeName)
);
log.info("创建了交换机:{};类型:{};", exchangeName, "DirectExchange");
}
/**
* 根据交换机名称,删除虚拟机
*
* @param exchangeName 交换机名称
*/
public void deleteExchange(String exchangeName) {
boolean flag = amqpAdmin.deleteExchange(exchangeName);
}
/**
* 创建队列
*
* @param busQueue {@link BusQueue}
*/
public void createQueue(BusQueue busQueue) {
amqpAdmin.declareQueue(
new Queue(busQueue.getQueueName(), busQueue.getDurability(), false, busQueue.getAutoDelete())
);
log.info("创建了队列,队列名称->{}", busQueue.getQueueName());
}
/**
* 创建队列
*
* @param queueName 队列名称
*/
public void createQueue(String queueName) {
amqpAdmin.declareQueue(
new Queue(queueName)
);
log.info("创建了队列,队列名称->{}", queueName);
}
/**
* 删除队列,根据队列名称
*
* @param queueName 队列名称
*/
public void deleteQueue(String queueName) {
boolean flag = amqpAdmin.deleteQueue(queueName);
}
/**
* 创建绑定关系
*
* @param userMessageBinding {@link UserMessageBinding}
*/
public void createBing(UserMessageBinding userMessageBinding) {
amqpAdmin.declareBinding(
new Binding(userMessageBinding.getQueueName(), Binding.DestinationType.QUEUE, userMessageBinding.getExchangeName(), userMessageBinding.getRoutingKeyName(), null)
);
log.info("创建了绑定关系,交换机->{};队列->{};路由键->{}", userMessageBinding.getQueueName(), userMessageBinding.getExchangeName(), userMessageBinding.getRoutingKeyName());
}
/**
* 创建绑定关系
*
* @param exchangeName 交换机名称
* @param queueName 队列名称
* @param routingKeyName 路由键名称
*/
public void createBing(String exchangeName, String queueName, String routingKeyName) {
amqpAdmin.declareBinding(
new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKeyName, null)
);
log.info("创建了绑定关系,交换机->{};队列->{};路由键->{}", exchangeName, queueName, routingKeyName);
}
}
... ...
package com.sunyo.wlpt.message.bus.service.rabbit.utils;
import com.sunyo.wlpt.message.bus.service.domain.BusExchange;
import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import com.sunyo.wlpt.message.bus.service.domain.UserMessageBinding;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.sunyo.wlpt.message.bus.service.domain.*;
import com.sunyo.wlpt.message.bus.service.service.BusServerService;
import com.sunyo.wlpt.message.bus.service.service.VirtualHostService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 子诚
* Description:
* 时间:2020/7/16 16:32
* 时间:2020/7/30 17:21
*/
@Slf4j
@RefreshScope
@Component
public class RabbitUtils {
@Resource
private AmqpAdmin amqpAdmin;
private VirtualHostService virtualHostService;
@Resource
private RabbitTemplate rabbitTemplate;
private BusServerService busServerService;
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String vHost;
/**
* 创建交换机(交换机名称,是否持久化,是否删除)
* 关闭连接与通道
*
* @param busExchange {@link BusExchange}
*/
public void createExchange(BusExchange busExchange) {
// 类型-直连路由
String type_direct = "direct";
// 类型-动态路由
String type_topic = "topic";
// 类型-广播
String type_fanout = "fanout";
// 类型-头部
String type_headers = "headers";
// 创建交换机,直连接类型
if (type_direct.equals(busExchange.getExchangeType())) {
amqpAdmin.declareExchange(
new DirectExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
);
log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_direct);
}
// 创建交换机,扇形交换机
if (type_topic.equals(busExchange.getExchangeType())) {
amqpAdmin.declareExchange(
new TopicExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
);
log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_topic);
}
// 创建交换机,广播(主题)交换机
if (type_fanout.equals(busExchange.getExchangeType())) {
amqpAdmin.declareExchange(
new FanoutExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
);
log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_fanout);
}
// 创建交换机,首部交换机
if (type_headers.equals(busExchange.getExchangeType())) {
amqpAdmin.declareExchange(
new HeadersExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
);
log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_headers);
* @param channel 通道
* @param conn 连接
*/
public static void closeConnectionAndChanel(Channel channel, Connection conn)
{
try {
if (channel != null) {
channel.close();
}
if (conn != null) {
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 创建交换机,通过 exchangeName 创建
* 获取 rabbitMq 的连接,重载
*/
public void createExchange(String exchangeName) {
amqpAdmin.declareExchange(
new DirectExchange(exchangeName)
);
log.info("创建了交换机:{};类型:{};", exchangeName, "DirectExchange");
public Connection getConnection() throws IOException, TimeoutException
{
// 定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost(host);
// 端口,amqp协议 端口 类似与mysql的3306
factory.setPort(port);
// 设置账号信息,用户名、密码、vhost
factory.setVirtualHost(vHost);
factory.setUsername(username);
factory.setPassword(password);
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
/**
* 根据交换机名称,删除虚拟机
*
* @param exchangeName 交换机名称
* 获取 rabbitMq 的连接,重载
*/
public void deleteExchange(String exchangeName) {
boolean flag = amqpAdmin.deleteExchange(exchangeName);
public Connection getConnection(String virtualHostName) throws IOException, TimeoutException
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setVirtualHost(virtualHostName);
factory.setUsername(username);
factory.setPassword(password);
Connection connection = factory.newConnection();
return connection;
}
/**
* 创建队列
*
* @param busQueue {@link BusQueue}
* 获取 rabbitMq 的连接,重载
*/
public void createQueue(BusQueue busQueue) {
amqpAdmin.declareQueue(
new Queue(busQueue.getQueueName(), busQueue.getDurability(), false, busQueue.getAutoDelete())
);
log.info("创建了队列,队列名称->{}", busQueue.getQueueName());
public Connection getConnection(String serverIp, Integer serverPort, String virtualHostName)
throws IOException, TimeoutException
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(serverIp);
factory.setPort(serverPort);
factory.setVirtualHost(virtualHostName);
factory.setUsername(username);
factory.setPassword(password);
Connection connection = factory.newConnection();
return connection;
}
/**
* 创建队列
* 获取 rabbitMq 的连接,重载
*
* @param queueName 队列名称
* @param hostIp 服务器ip
* @param hostPort 服务器端口号
* @param vHostName 虚拟主机名称
* @param userName 用户名
* @param password 密码
* @return
* @throws Exception
*/
public void createQueue(String queueName) {
amqpAdmin.declareQueue(
new Queue(queueName)
);
log.info("创建了队列,队列名称->{}", queueName);
public static Connection getConnection(String hostIp, int hostPort, String vHostName, String userName, String password)
throws Exception
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostIp);
factory.setPort(hostPort);
factory.setVirtualHost(vHostName);
factory.setUsername(userName);
factory.setPassword(password);
return factory.newConnection();
}
/**
* 删除队列,根据队列名称
*
* @param queueName 队列名称
* 添加交换机
*/
public void deleteQueue(String queueName) {
boolean flag = amqpAdmin.deleteQueue(queueName);
public void createExchange(String serverIp, Integer serverPort, String virtualHostName, BusExchange busExchange)
throws IOException, TimeoutException
{
Connection connection = getConnection(serverIp, serverPort, virtualHostName);
Channel channel = connection.createChannel();
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclare(busExchange.getExchangeName(), busExchange.getExchangeType(), busExchange.getDurability(),
busExchange.getAutoDelete(), busExchange.getInternal(), null);
log.info("创建交换机的返回值<----->" + declareOk);
closeConnectionAndChanel(channel, connection);
}
/**
* 删除交换机 channel.exchangeDelete(exchangeName);
*/
public void removeExchange(String serverIp, Integer serverPort, String virtualHostName, BusExchange busExchange)
throws IOException, TimeoutException
{
Connection connection = getConnection(serverIp, serverPort, virtualHostName);
Channel channel = connection.createChannel();
channel.exchangeDelete(busExchange.getExchangeName());
closeConnectionAndChanel(channel, connection);
}
/**
* 添加队列(默认设置参数为null)
*/
public void createQueue(String serverIp, Integer serverPort, String virtualHostName, BusQueue busQueue)
throws IOException, TimeoutException
{
Connection connection = getConnection(serverIp, serverPort, virtualHostName);
Channel channel = connection.createChannel();
channel.queueDeclare(busQueue.getQueueName(), busQueue.getDurability(), false, busQueue.getAutoDelete(), null);
closeConnectionAndChanel(channel, connection);
}
/**
* 删除队列 channel.queueDelete(queueName);
*/
public void removeQueue(String serverIp, Integer serverPort, String virtualHostName, BusQueue busQueue)
throws IOException, TimeoutException
{
Connection connection = getConnection(serverIp, serverPort, virtualHostName);
Channel channel = connection.createChannel();
channel.queueDelete(busQueue.getQueueName());
closeConnectionAndChanel(channel, connection);
}
/**
* 创建绑定关系
*/
public void createBinding(String serverIp, Integer serverPort, String virtualHostName, UserMessageBinding userMessageBinding)
throws IOException, TimeoutException
{
Connection connection = getConnection(serverIp, serverPort, virtualHostName);
Channel channel = connection.createChannel();
}
/**
* 清空队列 channel.queuePurge(queueName);
*/
/**
* 解除绑定 channel.queueUnbind("queueName", "exchangeName","routingKey");
*/
/**
* 前往创建交换机的路上
*/
public void toCreateExchange(BusExchange busExchange) throws IOException, TimeoutException
{
VirtualHost virtualHost = getVirtualHost(busExchange.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
createExchange(busServer.getServerIp(), busServer.getServerPort(), virtualHost.getVirtualHostName(), busExchange);
}
/**
* 前往删除交换机的路上
*/
public void toRemoveExchange(BusExchange busExchange) throws IOException, TimeoutException
{
VirtualHost virtualHost = getVirtualHost(busExchange.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
removeExchange(busServer.getServerIp(), busServer.getServerPort(), virtualHost.getVirtualHostName(), busExchange);
}
/**
* 前往创建队列的路上
*/
public void toCreateQueue(BusQueue BusQueue) throws IOException, TimeoutException
{
VirtualHost virtualHost = getVirtualHost(BusQueue.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
createQueue(busServer.getServerIp(), busServer.getServerPort(), virtualHost.getVirtualHostName(), BusQueue);
}
/**
* 前往删除队列的路上
*/
public void toRemoveQueue(BusQueue BusQueue) throws IOException, TimeoutException
{
VirtualHost virtualHost = getVirtualHost(BusQueue.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
removeQueue(busServer.getServerIp(), busServer.getServerPort(), virtualHost.getVirtualHostName(), BusQueue);
}
/**
* 根据虚拟主机id,获取虚拟主机信息
*
* @param userMessageBinding {@link UserMessageBinding}
* @param virtualHostId 虚拟主机id
* @return {@link VirtualHost}
*/
public void createBing(UserMessageBinding userMessageBinding) {
amqpAdmin.declareBinding(
new Binding(userMessageBinding.getQueueName(), Binding.DestinationType.QUEUE, userMessageBinding.getExchangeName(), userMessageBinding.getRoutingKeyName(), null)
);
log.info("创建了绑定关系,交换机->{};队列->{};路由键->{}", userMessageBinding.getQueueName(), userMessageBinding.getExchangeName(), userMessageBinding.getRoutingKeyName());
public VirtualHost getVirtualHost(String virtualHostId)
{
return virtualHostService.selectByPrimaryKey(virtualHostId);
}
/**
* 创建绑定关系
* 根据服务器id,获取服务器信息
*
* @param exchangeName 交换机名称
* @param queueName 队列名称
* @param routingKeyName 路由键名称
*/
public void createBing(String exchangeName, String queueName, String routingKeyName) {
amqpAdmin.declareBinding(
new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKeyName, null)
);
log.info("创建了绑定关系,交换机->{};队列->{};路由键->{}", exchangeName, queueName, routingKeyName);
* @param serverId 服务器id
* @return {@link BusServer}
*/
public BusServer getBusServer(String serverId)
{
return busServerService.selectByPrimaryKey(serverId);
}
}
... ...
... ... @@ -24,4 +24,5 @@ public class AsyncTaskService {
// 无论消息是否发送成功,将消息存储于数据库
messageNoteService.insertMessageSelective(sentData);
}
}
... ...
... ... @@ -3,7 +3,9 @@ package com.sunyo.wlpt.message.bus.service.service;
import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.domain.BusExchange;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* @author 子诚
... ... @@ -19,7 +21,7 @@ public interface BusExchangeService {
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id);
int deleteByPrimaryKey(String id) throws IOException, TimeoutException;
/**
* 新增
... ... @@ -35,7 +37,7 @@ public interface BusExchangeService {
* @param record the record
* @return insert count
*/
int insertSelective(BusExchange record);
int insertSelective(BusExchange record) throws IOException, TimeoutException;
/**
* 查询,根据主键
... ...
... ... @@ -3,7 +3,9 @@ package com.sunyo.wlpt.message.bus.service.service;
import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* @author 子诚
... ... @@ -18,7 +20,7 @@ public interface BusQueueService {
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id);
int deleteByPrimaryKey(String id) throws IOException, TimeoutException;
/**
* 新增
... ... @@ -34,7 +36,7 @@ public interface BusQueueService {
* @param record the record
* @return insert count
*/
int insertSelective(BusQueue record);
int insertSelective(BusQueue record) throws IOException, TimeoutException;
/**
* 查询,根据主键
... ...
... ... @@ -4,13 +4,16 @@ import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.domain.BusExchange;
import com.sunyo.wlpt.message.bus.service.mapper.BusExchangeMapper;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
import com.sunyo.wlpt.message.bus.service.service.BusExchangeService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* @author 子诚
... ... @@ -23,9 +26,12 @@ public class BusExchangeServiceImpl implements BusExchangeService {
@Resource
private BusExchangeMapper busExchangeMapper;
@Resource
RabbitUtils rabbitUtils;
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public int deleteByPrimaryKey(String id)
public int deleteByPrimaryKey(String id) throws IOException, TimeoutException
{
// 判断删除的个数,需被删除的个数是否一致
int index = 0;
... ... @@ -34,7 +40,9 @@ public class BusExchangeServiceImpl implements BusExchangeService {
if (id.contains(splitItem)) {
String[] split = id.split(splitItem);
for (int i = 0; i < split.length; i++) {
BusExchange busExchange = selectByPrimaryKey(split[i]);
int num = busExchangeMapper.deleteByPrimaryKey(split[i]);
deleteExchange(busExchange);
if (num > 0) {
index = index + num;
}
... ... @@ -45,10 +53,21 @@ public class BusExchangeServiceImpl implements BusExchangeService {
return 0;
}
} else {
return busExchangeMapper.deleteByPrimaryKey(id);
BusExchange busExchange = selectByPrimaryKey(id);
int num = busExchangeMapper.deleteByPrimaryKey(id);
deleteExchange(busExchange);
return num;
}
}
/**
* 删除MQ服务器上的交换机
*/
public void deleteExchange(BusExchange busExchange) throws IOException, TimeoutException
{
rabbitUtils.toRemoveExchange(busExchange);
}
@Override
public int insert(BusExchange record)
{
... ... @@ -56,8 +75,9 @@ public class BusExchangeServiceImpl implements BusExchangeService {
}
@Override
public int insertSelective(BusExchange record)
public int insertSelective(BusExchange record) throws IOException, TimeoutException
{
rabbitUtils.toCreateExchange(record);
return busExchangeMapper.insertSelective(record);
}
... ...
... ... @@ -4,6 +4,7 @@ import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import com.sunyo.wlpt.message.bus.service.mapper.BusQueueMapper;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
import com.sunyo.wlpt.message.bus.service.service.BusQueueService;
import io.netty.util.internal.StringUtil;
import org.springframework.stereotype.Service;
... ... @@ -11,8 +12,10 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* @author 子诚
... ... @@ -25,9 +28,13 @@ public class BusQueueServiceImpl implements BusQueueService {
@Resource
private BusQueueMapper busQueueMapper;
@Resource
private RabbitUtils rabbitUtils;
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public int deleteByPrimaryKey(String id)
public int deleteByPrimaryKey(String id) throws IOException, TimeoutException
{
// 判断删除的个数,需被删除的个数是否一致
int index = 0;
... ... @@ -36,7 +43,9 @@ public class BusQueueServiceImpl implements BusQueueService {
if (id.contains(splitItem)) {
String[] split = id.split(splitItem);
for (int i = 0; i < split.length; i++) {
BusQueue busQueue = selectByPrimaryKey(split[i]);
int num = busQueueMapper.deleteByPrimaryKey(split[i]);
deleteQueue(busQueue);
if (num > 0) {
index = index + num;
}
... ... @@ -47,10 +56,21 @@ public class BusQueueServiceImpl implements BusQueueService {
return 0;
}
} else {
return busQueueMapper.deleteByPrimaryKey(id);
BusQueue busQueue = selectByPrimaryKey(id);
int num = busQueueMapper.deleteByPrimaryKey(id);
deleteQueue(busQueue);
return num;
}
}
/**
* 删除MQ服务器上的队列
*/
public void deleteQueue(BusQueue busQueue) throws IOException, TimeoutException
{
rabbitUtils.toRemoveQueue(busQueue);
}
@Override
public int insert(BusQueue record)
{
... ... @@ -58,8 +78,9 @@ public class BusQueueServiceImpl implements BusQueueService {
}
@Override
public int insertSelective(BusQueue record)
public int insertSelective(BusQueue record) throws IOException, TimeoutException
{
rabbitUtils.toCreateQueue(record);
return busQueueMapper.insertSelective(record);
}
... ... @@ -102,7 +123,7 @@ public class BusQueueServiceImpl implements BusQueueService {
List<BusQueue> list = new ArrayList<>();
String userIds = busQueue.getUserId();
String splitItem = ",";
if(!StringUtil.isNullOrEmpty(userIds)){
if (!StringUtil.isNullOrEmpty(userIds)) {
if (userIds.contains(splitItem)) {
String[] split = userIds.split(splitItem);
for (String userId : split) {
... ...