正在显示
3 个修改的文件
包含
114 行增加
和
24 行删除
| @@ -36,7 +36,7 @@ spring: | @@ -36,7 +36,7 @@ spring: | ||
| 36 | rabbitmq: | 36 | rabbitmq: |
| 37 | host: 192.168.37.139 | 37 | host: 192.168.37.139 |
| 38 | port: 5672 | 38 | port: 5672 |
| 39 | - username: zicheng | 39 | + username: rabbit |
| 40 | password: 123456 | 40 | password: 123456 |
| 41 | virtual-host: V_zicheng | 41 | virtual-host: V_zicheng |
| 42 | # 开启手动ack机制 | 42 | # 开启手动ack机制 |
| @@ -2,10 +2,10 @@ package com.sunyo.wlpt.message.bus.service.service.impl; | @@ -2,10 +2,10 @@ package com.sunyo.wlpt.message.bus.service.service.impl; | ||
| 2 | 2 | ||
| 3 | import com.github.pagehelper.PageHelper; | 3 | import com.github.pagehelper.PageHelper; |
| 4 | import com.github.pagehelper.PageInfo; | 4 | import com.github.pagehelper.PageInfo; |
| 5 | -import com.sunyo.wlpt.message.bus.service.domain.BusServer; | 5 | +import com.sunyo.wlpt.message.bus.service.domain.*; |
| 6 | import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper; | 6 | import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper; |
| 7 | -import com.sunyo.wlpt.message.bus.service.service.BusServerService; | ||
| 8 | -import org.springframework.context.annotation.Lazy; | 7 | +import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils; |
| 8 | +import com.sunyo.wlpt.message.bus.service.service.*; | ||
| 9 | import org.springframework.stereotype.Service; | 9 | import org.springframework.stereotype.Service; |
| 10 | import org.springframework.transaction.annotation.Propagation; | 10 | import org.springframework.transaction.annotation.Propagation; |
| 11 | import org.springframework.transaction.annotation.Transactional; | 11 | import org.springframework.transaction.annotation.Transactional; |
| @@ -24,11 +24,26 @@ import java.util.concurrent.TimeoutException; | @@ -24,11 +24,26 @@ import java.util.concurrent.TimeoutException; | ||
| 24 | public class BusServerServiceImpl implements BusServerService { | 24 | public class BusServerServiceImpl implements BusServerService { |
| 25 | 25 | ||
| 26 | @Resource | 26 | @Resource |
| 27 | - private BusServerMapper busServerMapper; | 27 | + private RabbitUtils rabbitUtils; |
| 28 | + | ||
| 29 | + @Resource | ||
| 30 | + private RoutingKeyService routingKeyService; | ||
| 31 | + | ||
| 32 | + @Resource | ||
| 33 | + private UserMessageBindingService userMessageBindingService; | ||
| 34 | + | ||
| 35 | + @Resource | ||
| 36 | + private BusQueueService busQueueService; | ||
| 37 | + | ||
| 38 | + @Resource | ||
| 39 | + private BusExchangeService busExchangeService; | ||
| 28 | 40 | ||
| 29 | - @Lazy | ||
| 30 | @Resource | 41 | @Resource |
| 31 | - private AsyncTaskService asyncTaskService; | 42 | + private VirtualHostService virtualHostService; |
| 43 | + | ||
| 44 | + @Resource | ||
| 45 | + private BusServerMapper busServerMapper; | ||
| 46 | + | ||
| 32 | 47 | ||
| 33 | @Override | 48 | @Override |
| 34 | public PageInfo selectBusServerList(BusServer busServer, Integer pageNum, Integer pageSize) | 49 | public PageInfo selectBusServerList(BusServer busServer, Integer pageNum, Integer pageSize) |
| @@ -58,9 +73,9 @@ public class BusServerServiceImpl implements BusServerService { | @@ -58,9 +73,9 @@ public class BusServerServiceImpl implements BusServerService { | ||
| 58 | String[] split = id.split(splitItem); | 73 | String[] split = id.split(splitItem); |
| 59 | for (int i = 0; i < split.length; i++) { | 74 | for (int i = 0; i < split.length; i++) { |
| 60 | BusServer busServer = busServerMapper.selectByPrimaryKey(split[i]); | 75 | BusServer busServer = busServerMapper.selectByPrimaryKey(split[i]); |
| 76 | + // 级联删除与该服务器相关的:虚拟主机,配置关系,队列,交换机,路由键 | ||
| 77 | + serverCascadeDelete(busServer); | ||
| 61 | int num = busServerMapper.deleteByPrimaryKey(split[i]); | 78 | int num = busServerMapper.deleteByPrimaryKey(split[i]); |
| 62 | - // 异步删除与该服务器相关的:虚拟主机,配置关系,队列,交换机,路由键 | ||
| 63 | - asyncTaskService.serverCascadeDelete(busServer); | ||
| 64 | if (num > 0) { | 79 | if (num > 0) { |
| 65 | index = index + num; | 80 | index = index + num; |
| 66 | } | 81 | } |
| @@ -72,10 +87,9 @@ public class BusServerServiceImpl implements BusServerService { | @@ -72,10 +87,9 @@ public class BusServerServiceImpl implements BusServerService { | ||
| 72 | } | 87 | } |
| 73 | } else { | 88 | } else { |
| 74 | BusServer busServer = busServerMapper.selectByPrimaryKey(id); | 89 | BusServer busServer = busServerMapper.selectByPrimaryKey(id); |
| 75 | - int num = busServerMapper.deleteByPrimaryKey(id); | ||
| 76 | - // 异步删除与该服务器相关的:虚拟主机,配置关系,队列,交换机,路由键 | ||
| 77 | - asyncTaskService.serverCascadeDelete(busServer); | ||
| 78 | - return num; | 90 | + // 级联删除与该服务器相关的:虚拟主机,配置关系,队列,交换机,路由键 |
| 91 | + serverCascadeDelete(busServer); | ||
| 92 | + return busServerMapper.deleteByPrimaryKey(id); | ||
| 79 | } | 93 | } |
| 80 | } | 94 | } |
| 81 | 95 | ||
| @@ -138,4 +152,39 @@ public class BusServerServiceImpl implements BusServerService { | @@ -138,4 +152,39 @@ public class BusServerServiceImpl implements BusServerService { | ||
| 138 | { | 152 | { |
| 139 | return busServerMapper.selectServerExist(serverName); | 153 | return busServerMapper.selectServerExist(serverName); |
| 140 | } | 154 | } |
| 155 | + | ||
| 156 | + public void serverCascadeDelete(BusServer busServer) throws IOException, TimeoutException | ||
| 157 | + { | ||
| 158 | + String serverId = busServer.getId(); | ||
| 159 | + List<UserMessageBinding> bindings = userMessageBindingService.selectByServerId(serverId); | ||
| 160 | + // 1.1 删除数据库中的绑定关系 | ||
| 161 | + userMessageBindingService.deleteByServerId(serverId); | ||
| 162 | + for (UserMessageBinding userMessageBinding : bindings) { | ||
| 163 | + // 1.2、解除MQ服务器上的绑定关系 | ||
| 164 | + rabbitUtils.toRemoveBinding(userMessageBinding); | ||
| 165 | + } | ||
| 166 | + List<VirtualHost> virtualHostList = virtualHostService.selectByServerId(serverId); | ||
| 167 | + for (VirtualHost virtualHost : virtualHostList) { | ||
| 168 | + String virtualHostId = virtualHost.getId(); | ||
| 169 | + List<BusQueue> queues = busQueueService.selectByVirtualHostId(virtualHostId); | ||
| 170 | + // 3.1、删除数据库中对应的队列 | ||
| 171 | + busQueueService.deleteByVirtualHostId(virtualHostId); | ||
| 172 | + for (BusQueue queue : queues) { | ||
| 173 | + // 3.2、删除MQ服务器上的队列 | ||
| 174 | + rabbitUtils.toRemoveQueue(queue); | ||
| 175 | + } | ||
| 176 | + List<BusExchange> exchanges = busExchangeService.selectByVirtualHostId(virtualHostId); | ||
| 177 | + for (BusExchange busExchange : exchanges) { | ||
| 178 | + String exchangeId = busExchange.getId(); | ||
| 179 | + // 5.1、删除数据库中对应的路由键 | ||
| 180 | + routingKeyService.deleteByExchangeId(exchangeId); | ||
| 181 | + // 4.1 根据虚拟主机id,删除交换机 | ||
| 182 | + busExchangeService.deleteByVirtualHostId(virtualHostId); | ||
| 183 | + // 4.2 在MQ服务器上删除该交换机 | ||
| 184 | + rabbitUtils.toRemoveExchange(busExchange); | ||
| 185 | + } | ||
| 186 | + // 2.1、删除数据库中的虚拟主机列表 | ||
| 187 | + virtualHostService.deleteByServerId(serverId); | ||
| 188 | + } | ||
| 189 | + } | ||
| 141 | } | 190 | } |
| @@ -2,10 +2,13 @@ package com.sunyo.wlpt.message.bus.service.service.impl; | @@ -2,10 +2,13 @@ package com.sunyo.wlpt.message.bus.service.service.impl; | ||
| 2 | 2 | ||
| 3 | import com.github.pagehelper.PageHelper; | 3 | import com.github.pagehelper.PageHelper; |
| 4 | import com.github.pagehelper.PageInfo; | 4 | import com.github.pagehelper.PageInfo; |
| 5 | +import com.sunyo.wlpt.message.bus.service.domain.BusExchange; | ||
| 6 | +import com.sunyo.wlpt.message.bus.service.domain.BusQueue; | ||
| 7 | +import com.sunyo.wlpt.message.bus.service.domain.UserMessageBinding; | ||
| 5 | import com.sunyo.wlpt.message.bus.service.domain.VirtualHost; | 8 | import com.sunyo.wlpt.message.bus.service.domain.VirtualHost; |
| 6 | import com.sunyo.wlpt.message.bus.service.mapper.VirtualHostMapper; | 9 | import com.sunyo.wlpt.message.bus.service.mapper.VirtualHostMapper; |
| 7 | -import com.sunyo.wlpt.message.bus.service.service.VirtualHostService; | ||
| 8 | -import org.springframework.context.annotation.Lazy; | 10 | +import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils; |
| 11 | +import com.sunyo.wlpt.message.bus.service.service.*; | ||
| 9 | import org.springframework.stereotype.Service; | 12 | import org.springframework.stereotype.Service; |
| 10 | import org.springframework.transaction.annotation.Propagation; | 13 | import org.springframework.transaction.annotation.Propagation; |
| 11 | import org.springframework.transaction.annotation.Transactional; | 14 | import org.springframework.transaction.annotation.Transactional; |
| @@ -26,9 +29,20 @@ public class VirtualHostServiceImpl implements VirtualHostService { | @@ -26,9 +29,20 @@ public class VirtualHostServiceImpl implements VirtualHostService { | ||
| 26 | @Resource | 29 | @Resource |
| 27 | private VirtualHostMapper virtualHostMapper; | 30 | private VirtualHostMapper virtualHostMapper; |
| 28 | 31 | ||
| 29 | - @Lazy | ||
| 30 | @Resource | 32 | @Resource |
| 31 | - private AsyncTaskService asyncTaskService; | 33 | + private RabbitUtils rabbitUtils; |
| 34 | + | ||
| 35 | + @Resource | ||
| 36 | + private RoutingKeyService routingKeyService; | ||
| 37 | + | ||
| 38 | + @Resource | ||
| 39 | + private UserMessageBindingService userMessageBindingService; | ||
| 40 | + | ||
| 41 | + @Resource | ||
| 42 | + private BusQueueService busQueueService; | ||
| 43 | + | ||
| 44 | + @Resource | ||
| 45 | + private BusExchangeService busExchangeService; | ||
| 32 | 46 | ||
| 33 | @Override | 47 | @Override |
| 34 | @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED) | 48 | @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED) |
| @@ -42,9 +56,9 @@ public class VirtualHostServiceImpl implements VirtualHostService { | @@ -42,9 +56,9 @@ public class VirtualHostServiceImpl implements VirtualHostService { | ||
| 42 | String[] split = id.split(splitItem); | 56 | String[] split = id.split(splitItem); |
| 43 | for (int i = 0; i < split.length; i++) { | 57 | for (int i = 0; i < split.length; i++) { |
| 44 | VirtualHost virtualHost = virtualHostMapper.selectByPrimaryKey(split[i]); | 58 | VirtualHost virtualHost = virtualHostMapper.selectByPrimaryKey(split[i]); |
| 59 | + // 级联删除与该虚拟主机相关的队列,交换机,路由键,配置关系 | ||
| 60 | + virtualHostCascadeDelete(virtualHost); | ||
| 45 | int num = virtualHostMapper.deleteByPrimaryKey(split[i]); | 61 | int num = virtualHostMapper.deleteByPrimaryKey(split[i]); |
| 46 | - // 异步删除与该虚拟主机相关的队列,交换机,路由键,配置关系 | ||
| 47 | - asyncTaskService.virtualHostCascadeDelete(virtualHost); | ||
| 48 | if (num > 0) { | 62 | if (num > 0) { |
| 49 | index = index + num; | 63 | index = index + num; |
| 50 | } | 64 | } |
| @@ -54,13 +68,11 @@ public class VirtualHostServiceImpl implements VirtualHostService { | @@ -54,13 +68,11 @@ public class VirtualHostServiceImpl implements VirtualHostService { | ||
| 54 | } else { | 68 | } else { |
| 55 | return 0; | 69 | return 0; |
| 56 | } | 70 | } |
| 57 | - | ||
| 58 | } else { | 71 | } else { |
| 59 | VirtualHost virtualHost = virtualHostMapper.selectByPrimaryKey(id); | 72 | VirtualHost virtualHost = virtualHostMapper.selectByPrimaryKey(id); |
| 60 | - int num = virtualHostMapper.deleteByPrimaryKey(id); | ||
| 61 | - // 异步删除与该虚拟主机相关的队列,交换机,路由键,配置关系 | ||
| 62 | - asyncTaskService.virtualHostCascadeDelete(virtualHost); | ||
| 63 | - return num; | 73 | + // 级联删除与该虚拟主机相关的队列,交换机,路由键,配置关系 |
| 74 | + virtualHostCascadeDelete(virtualHost); | ||
| 75 | + return virtualHostMapper.deleteByPrimaryKey(id); | ||
| 64 | } | 76 | } |
| 65 | 77 | ||
| 66 | } | 78 | } |
| @@ -133,4 +145,33 @@ public class VirtualHostServiceImpl implements VirtualHostService { | @@ -133,4 +145,33 @@ public class VirtualHostServiceImpl implements VirtualHostService { | ||
| 133 | { | 145 | { |
| 134 | return virtualHostMapper.deleteByServerId(serverId); | 146 | return virtualHostMapper.deleteByServerId(serverId); |
| 135 | } | 147 | } |
| 148 | + | ||
| 149 | + public void virtualHostCascadeDelete(VirtualHost virtualHost) throws IOException, TimeoutException | ||
| 150 | + { | ||
| 151 | + String virtualHostId = virtualHost.getId(); | ||
| 152 | + List<UserMessageBinding> bindings = userMessageBindingService.selectByVirtualHostId(virtualHostId); | ||
| 153 | + // 1.1、删除数据库中的绑定关系 | ||
| 154 | + userMessageBindingService.deleteByVirtualHostId(virtualHostId); | ||
| 155 | + for (UserMessageBinding userMessageBinding : bindings) { | ||
| 156 | + // 1.2、解除MQ服务器上的绑定关系 | ||
| 157 | + rabbitUtils.toRemoveBinding(userMessageBinding); | ||
| 158 | + } | ||
| 159 | + List<BusQueue> queues = busQueueService.selectByVirtualHostId(virtualHostId); | ||
| 160 | + // 2.1、删除数据库中对应的队列 | ||
| 161 | + busQueueService.deleteByVirtualHostId(virtualHostId); | ||
| 162 | + for (BusQueue queue : queues) { | ||
| 163 | + // 2.2、删除MQ服务器上的队列 | ||
| 164 | + rabbitUtils.toRemoveQueue(queue); | ||
| 165 | + } | ||
| 166 | + List<BusExchange> exchanges = busExchangeService.selectByVirtualHostId(virtualHostId); | ||
| 167 | + // 3.1 根据虚拟主机id,删除交换机 | ||
| 168 | + busExchangeService.deleteByVirtualHostId(virtualHostId); | ||
| 169 | + for (BusExchange busExchange : exchanges) { | ||
| 170 | + String exchangeId = busExchange.getId(); | ||
| 171 | + // 4.1、删除数据库中对应的路由键 | ||
| 172 | + routingKeyService.deleteByExchangeId(exchangeId); | ||
| 173 | + // 3.2 在MQ服务器上删除该交换机 | ||
| 174 | + rabbitUtils.toRemoveExchange(busExchange); | ||
| 175 | + } | ||
| 176 | + } | ||
| 136 | } | 177 | } |
-
请 注册 或 登录 后发表评论