作者 王勇

删除交换机,级联删除对应的路由键和配置关系

@@ -4,7 +4,7 @@ import com.sunyo.wlpt.message.bus.service.domain.XmlData; @@ -4,7 +4,7 @@ import com.sunyo.wlpt.message.bus.service.domain.XmlData;
4 import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType; 4 import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType;
5 import com.sunyo.wlpt.message.bus.service.rabbit.utils.DirectUtils; 5 import com.sunyo.wlpt.message.bus.service.rabbit.utils.DirectUtils;
6 import com.sunyo.wlpt.message.bus.service.response.ResultJson; 6 import com.sunyo.wlpt.message.bus.service.response.ResultJson;
7 -import com.sunyo.wlpt.message.bus.service.service.AsyncTaskService; 7 +import com.sunyo.wlpt.message.bus.service.service.impl.AsyncTaskService;
8 import com.sunyo.wlpt.message.bus.service.service.UserMessageBindingService; 8 import com.sunyo.wlpt.message.bus.service.service.UserMessageBindingService;
9 import com.sunyo.wlpt.message.bus.service.utils.XmlUtils; 9 import com.sunyo.wlpt.message.bus.service.utils.XmlUtils;
10 import lombok.extern.slf4j.Slf4j; 10 import lombok.extern.slf4j.Slf4j;
@@ -92,4 +92,12 @@ public interface RoutingKeyMapper { @@ -92,4 +92,12 @@ public interface RoutingKeyMapper {
92 * @return List<RoutingKey> 92 * @return List<RoutingKey>
93 */ 93 */
94 List<RoutingKey> selectRoutingKeyExist(RoutingKey routingKey); 94 List<RoutingKey> selectRoutingKeyExist(RoutingKey routingKey);
  95 +
  96 + /**
  97 + * 根据交换机id,删除路由键信息
  98 + *
  99 + * @param exchangeId 交换机id
  100 + * @return
  101 + */
  102 + int deleteByExchangeId(String exchangeId);
95 } 103 }
@@ -23,6 +23,22 @@ public interface UserMessageBindingMapper { @@ -23,6 +23,22 @@ public interface UserMessageBindingMapper {
23 int deleteByPrimaryKey(String id); 23 int deleteByPrimaryKey(String id);
24 24
25 /** 25 /**
  26 + * 根据交换机id删除配置关系
  27 + *
  28 + * @param exchangeId 交换机id
  29 + * @return 删除成功
  30 + */
  31 + int deleteByExchangeId(String exchangeId);
  32 +
  33 + /**
  34 + * 根据队列Id,删除绑定关系
  35 + *
  36 + * @param queueId 队列id
  37 + * @return
  38 + */
  39 + int deleteByQueueId(String queueId);
  40 +
  41 + /**
26 * insert record to table 42 * insert record to table
27 * 43 *
28 * @param record the record 44 * @param record the record
@@ -94,4 +110,5 @@ public interface UserMessageBindingMapper { @@ -94,4 +110,5 @@ public interface UserMessageBindingMapper {
94 * @return 判断校验是否通过 110 * @return 判断校验是否通过
95 */ 111 */
96 List<UserMessageBinding> validateXmlBinding(XmlData xmlData); 112 List<UserMessageBinding> validateXmlBinding(XmlData xmlData);
  113 +
97 } 114 }
1 -package com.sunyo.wlpt.message.bus.service.service;  
2 -  
3 -import com.sunyo.wlpt.message.bus.service.domain.XmlData;  
4 -import org.springframework.scheduling.annotation.Async;  
5 -import org.springframework.stereotype.Component;  
6 -  
7 -import javax.annotation.Resource;  
8 -  
9 -/**  
10 - * @author 子诚  
11 - * Description:这个Service,用来保存异步任务;  
12 - * 注意点:异步任务需要单独放在一个类中  
13 - * 时间:2020/7/30 15:59  
14 - */  
15 -@Component  
16 -public class AsyncTaskService {  
17 -  
18 - @Resource  
19 - private MessageNoteService messageNoteService;  
20 -  
21 - @Async  
22 - public void saveMessage(XmlData sentData)  
23 - {  
24 - // 无论消息是否发送成功,将消息存储于数据库  
25 - messageNoteService.insertMessageSelective(sentData);  
26 - }  
27 -  
28 -}  
@@ -21,6 +21,14 @@ public interface RoutingKeyService { @@ -21,6 +21,14 @@ public interface RoutingKeyService {
21 int deleteByPrimaryKey(String id); 21 int deleteByPrimaryKey(String id);
22 22
23 /** 23 /**
  24 + * 根据交换机id,删除路由键信息
  25 + *
  26 + * @param exchangeId 交换机id
  27 + * @return
  28 + */
  29 + int deleteByExchangeId(String exchangeId);
  30 +
  31 + /**
24 * 新增 32 * 新增
25 * 33 *
26 * @param record the record 34 * @param record the record
@@ -22,6 +22,22 @@ public interface UserMessageBindingService { @@ -22,6 +22,22 @@ public interface UserMessageBindingService {
22 int deleteByPrimaryKey(String id) throws IOException, TimeoutException; 22 int deleteByPrimaryKey(String id) throws IOException, TimeoutException;
23 23
24 /** 24 /**
  25 + * 根据交换机id删除配置关系
  26 + *
  27 + * @param exchangeId 交换机id
  28 + * @return 删除成功
  29 + */
  30 + int deleteByExchangeId(String exchangeId);
  31 +
  32 + /**
  33 + * 根据队列Id,删除绑定关系
  34 + *
  35 + * @param queueId 队列id
  36 + * @return
  37 + */
  38 + int deleteByQueueId(String queueId);
  39 +
  40 + /**
25 * 新增 41 * 新增
26 * 42 *
27 * @param record the record 43 * @param record the record
@@ -77,7 +93,7 @@ public interface UserMessageBindingService { @@ -77,7 +93,7 @@ public interface UserMessageBindingService {
77 * @param xmlData {@link XmlData} 解析之后的数据 93 * @param xmlData {@link XmlData} 解析之后的数据
78 * @return true or false 94 * @return true or false
79 */ 95 */
80 - public Boolean validateXmlBinding(XmlData xmlData); 96 + Boolean validateXmlBinding(XmlData xmlData);
81 } 97 }
82 98
83 99
  1 +package com.sunyo.wlpt.message.bus.service.service.impl;
  2 +
  3 +import com.sunyo.wlpt.message.bus.service.domain.BusExchange;
  4 +import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
  5 +import com.sunyo.wlpt.message.bus.service.domain.XmlData;
  6 +import com.sunyo.wlpt.message.bus.service.service.MessageNoteService;
  7 +import com.sunyo.wlpt.message.bus.service.service.RoutingKeyService;
  8 +import com.sunyo.wlpt.message.bus.service.service.UserMessageBindingService;
  9 +import org.springframework.scheduling.annotation.Async;
  10 +import org.springframework.stereotype.Component;
  11 +
  12 +import javax.annotation.Resource;
  13 +
  14 +/**
  15 + * @author 子诚
  16 + * Description:这个Service,用来保存异步任务;
  17 + * 注意点:异步任务需要单独放在一个类中
  18 + * 时间:2020/7/30 15:59
  19 + */
  20 +@Component
  21 +public class AsyncTaskService {
  22 +
  23 + @Resource
  24 + private MessageNoteService messageNoteService;
  25 +
  26 + @Resource
  27 + private RoutingKeyService routingKeyService;
  28 +
  29 + @Resource
  30 + private UserMessageBindingService userMessageBindingService;
  31 +
  32 + /**
  33 + * 异步,无论消息是否发送成功,将消息存储于数据库
  34 + *
  35 + * @param sentData {@link XmlData}
  36 + */
  37 + @Async
  38 + public void saveMessage(XmlData sentData)
  39 + {
  40 + // 无论消息是否发送成功,将消息存储于数据库
  41 + messageNoteService.insertMessageSelective(sentData);
  42 + }
  43 +
  44 + /**
  45 + * 当删除服务器的时候
  46 + */
  47 +
  48 +
  49 + /**
  50 + * 当删除虚拟主机的时候
  51 + */
  52 +
  53 +
  54 + /**
  55 + * 当删除交换机的时候,
  56 + * 同时,删除对应的路由键,删除包含交换机的绑定关系
  57 + */
  58 + @Async
  59 + void exchangeCascadeDelete(BusExchange busExchange)
  60 + {
  61 + // 删除相关路由键
  62 + routingKeyService.deleteByExchangeId(busExchange.getId());
  63 + // 删除相关配置关系
  64 + userMessageBindingService.deleteByExchangeId(busExchange.getId());
  65 + }
  66 +
  67 + /**
  68 + * 当删除队列的时候,删除包含队列的绑定关系
  69 + */
  70 + @Async
  71 + void queueCascadeDelete(BusQueue busQueue)
  72 + {
  73 + // 删除相关配置关系
  74 + userMessageBindingService.deleteByQueueId(busQueue.getId());
  75 + }
  76 +}
@@ -27,6 +27,9 @@ public class BusExchangeServiceImpl implements BusExchangeService { @@ -27,6 +27,9 @@ public class BusExchangeServiceImpl implements BusExchangeService {
27 private BusExchangeMapper busExchangeMapper; 27 private BusExchangeMapper busExchangeMapper;
28 28
29 @Resource 29 @Resource
  30 + private AsyncTaskService asyncTaskService;
  31 +
  32 + @Resource
30 RabbitUtils rabbitUtils; 33 RabbitUtils rabbitUtils;
31 34
32 @Override 35 @Override
@@ -41,8 +44,12 @@ public class BusExchangeServiceImpl implements BusExchangeService { @@ -41,8 +44,12 @@ public class BusExchangeServiceImpl implements BusExchangeService {
41 String[] split = id.split(splitItem); 44 String[] split = id.split(splitItem);
42 for (int i = 0; i < split.length; i++) { 45 for (int i = 0; i < split.length; i++) {
43 BusExchange busExchange = selectByPrimaryKey(split[i]); 46 BusExchange busExchange = selectByPrimaryKey(split[i]);
  47 + // 删除数据库的交换机记录
44 int num = busExchangeMapper.deleteByPrimaryKey(split[i]); 48 int num = busExchangeMapper.deleteByPrimaryKey(split[i]);
  49 + // 删除MQ服务器上的交换机
45 deleteExchange(busExchange); 50 deleteExchange(busExchange);
  51 + // 级联删除与交换机相关的路由键,配置关系
  52 + asyncTaskService.exchangeCascadeDelete(busExchange);
46 if (num > 0) { 53 if (num > 0) {
47 index = index + num; 54 index = index + num;
48 } 55 }
@@ -54,8 +61,12 @@ public class BusExchangeServiceImpl implements BusExchangeService { @@ -54,8 +61,12 @@ public class BusExchangeServiceImpl implements BusExchangeService {
54 } 61 }
55 } else { 62 } else {
56 BusExchange busExchange = selectByPrimaryKey(id); 63 BusExchange busExchange = selectByPrimaryKey(id);
  64 + // 删除数据库的交换机记录
57 int num = busExchangeMapper.deleteByPrimaryKey(id); 65 int num = busExchangeMapper.deleteByPrimaryKey(id);
  66 + // 删除MQ服务器上的交换机
58 deleteExchange(busExchange); 67 deleteExchange(busExchange);
  68 + // 级联删除与交换机相关的路由键,配置关系
  69 + asyncTaskService.exchangeCascadeDelete(busExchange);
59 return num; 70 return num;
60 } 71 }
61 } 72 }
@@ -31,6 +31,8 @@ public class BusQueueServiceImpl implements BusQueueService { @@ -31,6 +31,8 @@ public class BusQueueServiceImpl implements BusQueueService {
31 @Resource 31 @Resource
32 private RabbitUtils rabbitUtils; 32 private RabbitUtils rabbitUtils;
33 33
  34 + @Resource
  35 + private AsyncTaskService asyncTaskService;
34 36
35 @Override 37 @Override
36 @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED) 38 @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
@@ -44,8 +46,12 @@ public class BusQueueServiceImpl implements BusQueueService { @@ -44,8 +46,12 @@ public class BusQueueServiceImpl implements BusQueueService {
44 String[] split = id.split(splitItem); 46 String[] split = id.split(splitItem);
45 for (int i = 0; i < split.length; i++) { 47 for (int i = 0; i < split.length; i++) {
46 BusQueue busQueue = selectByPrimaryKey(split[i]); 48 BusQueue busQueue = selectByPrimaryKey(split[i]);
  49 + // 删除数据库中的该队列
47 int num = busQueueMapper.deleteByPrimaryKey(split[i]); 50 int num = busQueueMapper.deleteByPrimaryKey(split[i]);
  51 + // 删除MQ服务器上的队列
48 deleteQueue(busQueue); 52 deleteQueue(busQueue);
  53 + // 级联删除数据库中与队列有关的配置关系
  54 + asyncTaskService.queueCascadeDelete(busQueue);
49 if (num > 0) { 55 if (num > 0) {
50 index = index + num; 56 index = index + num;
51 } 57 }
@@ -57,8 +63,12 @@ public class BusQueueServiceImpl implements BusQueueService { @@ -57,8 +63,12 @@ public class BusQueueServiceImpl implements BusQueueService {
57 } 63 }
58 } else { 64 } else {
59 BusQueue busQueue = selectByPrimaryKey(id); 65 BusQueue busQueue = selectByPrimaryKey(id);
  66 + // 删除数据库中的该队列
60 int num = busQueueMapper.deleteByPrimaryKey(id); 67 int num = busQueueMapper.deleteByPrimaryKey(id);
  68 + // 删除MQ服务器上的队列
61 deleteQueue(busQueue); 69 deleteQueue(busQueue);
  70 + // 级联删除数据库中与队列有关的配置关系
  71 + asyncTaskService.queueCascadeDelete(busQueue);
62 return num; 72 return num;
63 } 73 }
64 } 74 }
@@ -51,6 +51,12 @@ public class RoutingKeyServiceImpl implements RoutingKeyService { @@ -51,6 +51,12 @@ public class RoutingKeyServiceImpl implements RoutingKeyService {
51 } 51 }
52 52
53 @Override 53 @Override
  54 + public int deleteByExchangeId(String exchangeId)
  55 + {
  56 + return routingKeyMapper.deleteByExchangeId(exchangeId);
  57 + }
  58 +
  59 + @Override
54 public int insert(RoutingKey record) 60 public int insert(RoutingKey record)
55 { 61 {
56 return routingKeyMapper.insert(record); 62 return routingKeyMapper.insert(record);
@@ -423,6 +423,18 @@ public class UserMessageBindingServiceImpl implements UserMessageBindingService @@ -423,6 +423,18 @@ public class UserMessageBindingServiceImpl implements UserMessageBindingService
423 List<UserMessageBinding> list = userMessageBindingMapper.validateXmlBinding(xmlData); 423 List<UserMessageBinding> list = userMessageBindingMapper.validateXmlBinding(xmlData);
424 return list.size() > 0; 424 return list.size() > 0;
425 } 425 }
  426 +
  427 + @Override
  428 + public int deleteByExchangeId(String exchangeId)
  429 + {
  430 + return userMessageBindingMapper.deleteByExchangeId(exchangeId);
  431 + }
  432 +
  433 + @Override
  434 + public int deleteByQueueId(String queueId)
  435 + {
  436 + return userMessageBindingMapper.deleteByQueueId(queueId);
  437 + }
426 } 438 }
427 439
428 440
@@ -107,6 +107,13 @@ @@ -107,6 +107,13 @@
107 from routing_key 107 from routing_key
108 where id = #{id,jdbcType=VARCHAR} 108 where id = #{id,jdbcType=VARCHAR}
109 </delete> 109 </delete>
  110 + <delete id="deleteByExchangeId" parameterType="java.lang.String">
  111 + <!--@mbg.generated-->
  112 + delete
  113 + from routing_key
  114 + where exchange_id = #{exchangeId,jdbcType=VARCHAR}
  115 + </delete>
  116 +
110 <insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.domain.RoutingKey"> 117 <insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.domain.RoutingKey">
111 <!--@mbg.generated--> 118 <!--@mbg.generated-->
112 insert into routing_key (id, routing_key_name, exchange_id, 119 insert into routing_key (id, routing_key_name, exchange_id,
@@ -33,11 +33,29 @@ @@ -33,11 +33,29 @@
33 from user_message_binding 33 from user_message_binding
34 where id = #{id,jdbcType=VARCHAR} 34 where id = #{id,jdbcType=VARCHAR}
35 </select> 35 </select>
  36 +
36 <delete id="deleteByPrimaryKey" parameterType="java.lang.String"> 37 <delete id="deleteByPrimaryKey" parameterType="java.lang.String">
37 - <!--@mbg.generated-->delete 38 + <!--@mbg.generated-->
  39 + delete
38 from user_message_binding 40 from user_message_binding
39 where id = #{id,jdbcType=VARCHAR} 41 where id = #{id,jdbcType=VARCHAR}
40 </delete> 42 </delete>
  43 +
  44 + <!-- 根据交换机id,删除配置关系-->
  45 + <delete id="deleteByExchangeId" parameterType="java.lang.String">
  46 + <!--@mbg.generated-->
  47 + delete
  48 + from user_message_binding
  49 + where exchange_id = #{exchangeId,jdbcType=VARCHAR}
  50 + </delete>
  51 +
  52 + <!-- 根据队列id,删除配置关系-->
  53 + <delete id="deleteByQueueId" parameterType="java.lang.String">
  54 + <!--@mbg.generated-->
  55 + delete
  56 + from user_message_binding
  57 + where queue_id = #{queueId,jdbcType=VARCHAR}
  58 + </delete>
41 <insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.domain.UserMessageBinding"> 59 <insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.domain.UserMessageBinding">
42 <!--@mbg.generated--> 60 <!--@mbg.generated-->
43 insert into user_message_binding (id, user_id, username, 61 insert into user_message_binding (id, user_id, username,