...
|
...
|
@@ -2,9 +2,8 @@ package com.sunyo.wlpt.message.bus.service.service.impl; |
|
|
|
|
|
import com.sunyo.wlpt.message.bus.service.domain.*;
|
|
|
import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
|
|
|
import com.sunyo.wlpt.message.bus.service.service.MessageNoteService;
|
|
|
import com.sunyo.wlpt.message.bus.service.service.RoutingKeyService;
|
|
|
import com.sunyo.wlpt.message.bus.service.service.UserMessageBindingService;
|
|
|
import com.sunyo.wlpt.message.bus.service.service.*;
|
|
|
import org.springframework.context.annotation.Lazy;
|
|
|
import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Propagation;
|
...
|
...
|
@@ -30,12 +29,25 @@ public class AsyncTaskService { |
|
|
@Resource
|
|
|
private MessageNoteService messageNoteService;
|
|
|
|
|
|
@Lazy
|
|
|
@Resource
|
|
|
private RoutingKeyService routingKeyService;
|
|
|
|
|
|
@Resource
|
|
|
private UserMessageBindingService userMessageBindingService;
|
|
|
|
|
|
@Lazy
|
|
|
@Resource
|
|
|
private BusQueueService busQueueService;
|
|
|
|
|
|
@Lazy
|
|
|
@Resource
|
|
|
private BusExchangeService busExchangeService;
|
|
|
|
|
|
@Lazy
|
|
|
@Resource
|
|
|
private VirtualHostService virtualHostService;
|
|
|
|
|
|
/**
|
|
|
* 异步,无论消息是否发送成功,将消息存储于数据库
|
|
|
*
|
...
|
...
|
@@ -49,14 +61,80 @@ public class AsyncTaskService { |
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 当删除服务器的时候
|
|
|
* 当删除服务器的时候,级联删除与服务器有关的
|
|
|
* 虚拟主机,交换机,路由键,队列,绑定关系
|
|
|
*/
|
|
|
|
|
|
@Async
|
|
|
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
|
|
|
public void serverCascadeDelete(BusServer busServer) throws IOException, TimeoutException
|
|
|
{
|
|
|
String serverId = busServer.getId();
|
|
|
List<UserMessageBinding> bindings = userMessageBindingService.selectByServerId(serverId);
|
|
|
// 1.1 删除数据库中的绑定关系
|
|
|
userMessageBindingService.deleteByServerId(serverId);
|
|
|
for (UserMessageBinding userMessageBinding : bindings) {
|
|
|
// 1.2、解除MQ服务器上的绑定关系
|
|
|
rabbitUtils.toRemoveBinding(userMessageBinding);
|
|
|
}
|
|
|
List<VirtualHost> virtualHostList = virtualHostService.selectByServerId(serverId);
|
|
|
// 2.1、删除数据库中的虚拟主机列表
|
|
|
virtualHostService.deleteByServerId(serverId);
|
|
|
for (VirtualHost virtualHost : virtualHostList) {
|
|
|
String virtualHostId = virtualHost.getId();
|
|
|
List<BusQueue> queues = busQueueService.selectByVirtualHostId(virtualHostId);
|
|
|
// 3.1、删除数据库中对应的队列
|
|
|
busQueueService.deleteByVirtualHostId(virtualHostId);
|
|
|
for (BusQueue queue : queues) {
|
|
|
// 3.2、删除MQ服务器上的队列
|
|
|
rabbitUtils.toRemoveQueue(queue);
|
|
|
}
|
|
|
List<BusExchange> exchanges = busExchangeService.selectByVirtualHostId(virtualHostId);
|
|
|
// 4.1 根据虚拟主机id,删除交换机
|
|
|
busExchangeService.deleteByVirtualHostId(virtualHostId);
|
|
|
for (BusExchange busExchange : exchanges) {
|
|
|
String exchangeId = busExchange.getId();
|
|
|
// 5.1、删除数据库中对应的路由键
|
|
|
routingKeyService.deleteByExchangeId(exchangeId);
|
|
|
// 4.2 在MQ服务器上删除该交换机
|
|
|
rabbitUtils.toRemoveExchange(busExchange);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 当删除虚拟主机的时候
|
|
|
* 当删除虚拟主机的时候,级联删除与虚拟主机有关的
|
|
|
* 交换机,路由键,队列,绑定关系
|
|
|
*/
|
|
|
|
|
|
@Async
|
|
|
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
|
|
|
public void virtualHostCascadeDelete(VirtualHost virtualHost) throws IOException, TimeoutException
|
|
|
{
|
|
|
String virtualHostId = virtualHost.getId();
|
|
|
List<UserMessageBinding> bindings = userMessageBindingService.selectByVirtualHostId(virtualHostId);
|
|
|
// 1.1、删除数据库中的绑定关系
|
|
|
userMessageBindingService.deleteByVirtualHostId(virtualHostId);
|
|
|
for (UserMessageBinding userMessageBinding : bindings) {
|
|
|
// 1.2、解除MQ服务器上的绑定关系
|
|
|
rabbitUtils.toRemoveBinding(userMessageBinding);
|
|
|
}
|
|
|
List<BusQueue> queues = busQueueService.selectByVirtualHostId(virtualHostId);
|
|
|
// 2.1、删除数据库中对应的队列
|
|
|
busQueueService.deleteByVirtualHostId(virtualHostId);
|
|
|
for (BusQueue queue : queues) {
|
|
|
// 2.2、删除MQ服务器上的队列
|
|
|
rabbitUtils.toRemoveQueue(queue);
|
|
|
}
|
|
|
List<BusExchange> exchanges = busExchangeService.selectByVirtualHostId(virtualHostId);
|
|
|
// 3.1 根据虚拟主机id,删除交换机
|
|
|
busExchangeService.deleteByVirtualHostId(virtualHostId);
|
|
|
for (BusExchange busExchange : exchanges) {
|
|
|
String exchangeId = busExchange.getId();
|
|
|
// 4.1、删除数据库中对应的路由键
|
|
|
routingKeyService.deleteByExchangeId(exchangeId);
|
|
|
// 3.2 在MQ服务器上删除该交换机
|
|
|
rabbitUtils.toRemoveExchange(busExchange);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 当删除交换机的时候,
|
...
|
...
|
@@ -67,34 +145,34 @@ public class AsyncTaskService { |
|
|
*/
|
|
|
@Async
|
|
|
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
|
|
|
void exchangeCascadeDelete(BusExchange busExchange)
|
|
|
public void exchangeCascadeDelete(BusExchange busExchange)
|
|
|
{
|
|
|
String exchangeId = busExchange.getId();
|
|
|
List<RoutingKey> routingKeyList = routingKeyService.selectByExchangeId(exchangeId);
|
|
|
// 删除相关路由键
|
|
|
routingKeyService.deleteByExchangeId(exchangeId);
|
|
|
// 删除交换机相关配置关系
|
|
|
// 删除交换机相关绑定关系
|
|
|
userMessageBindingService.deleteByExchangeId(exchangeId);
|
|
|
// 删除路由键相关配置关系
|
|
|
// 删除路由键相关绑定关系
|
|
|
for (int i = 0; i < routingKeyList.size(); i++) {
|
|
|
userMessageBindingService.deleteByRoutingKeyId(routingKeyList.get(0).getId());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 当删除路由键的时候,删除包含该路由键的配置关系
|
|
|
* 当删除路由键的时候,删除包含该路由键的绑定关系
|
|
|
*/
|
|
|
@Async
|
|
|
void routingKeyCascadeDelete(RoutingKey routingKey) throws IOException, TimeoutException
|
|
|
public void routingKeyCascadeDelete(RoutingKey routingKey) throws IOException, TimeoutException
|
|
|
{
|
|
|
String routingKeyId = routingKey.getId();
|
|
|
// 根据路由键id查询出所有的包含该路由键的配置关系
|
|
|
// 根据路由键id查询出所有的包含该路由键的绑定关系
|
|
|
List<UserMessageBinding> bindings = userMessageBindingService.selectByRoutingKeyId(routingKeyId);
|
|
|
for (UserMessageBinding userMessageBinding : bindings) {
|
|
|
// 解除MQ服务器上的配置关系
|
|
|
// 解除MQ服务器上的绑定关系
|
|
|
rabbitUtils.toRemoveBinding(userMessageBinding);
|
|
|
}
|
|
|
// 删除数据库中的包含该路由键相关配置关系
|
|
|
// 删除数据库中的包含该路由键相关绑定关系
|
|
|
userMessageBindingService.deleteByRoutingKeyId(routingKey.getId());
|
|
|
}
|
|
|
|
...
|
...
|
@@ -102,9 +180,9 @@ public class AsyncTaskService { |
|
|
* 当删除队列的时候,删除包含队列的绑定关系
|
|
|
*/
|
|
|
@Async
|
|
|
void queueCascadeDelete(BusQueue busQueue)
|
|
|
public void queueCascadeDelete(BusQueue busQueue)
|
|
|
{
|
|
|
// 删除相关配置关系
|
|
|
// 删除相关绑定关系
|
|
|
userMessageBindingService.deleteByQueueId(busQueue.getId());
|
|
|
}
|
|
|
|
...
|
...
|
|