作者 王勇

操作MQ动态创建创建、解除绑定关系

@@ -8,6 +8,8 @@ import com.sunyo.wlpt.message.bus.service.service.UserMessageBindingService; @@ -8,6 +8,8 @@ import com.sunyo.wlpt.message.bus.service.service.UserMessageBindingService;
8 import org.springframework.web.bind.annotation.*; 8 import org.springframework.web.bind.annotation.*;
9 9
10 import javax.annotation.Resource; 10 import javax.annotation.Resource;
  11 +import java.io.IOException;
  12 +import java.util.concurrent.TimeoutException;
11 13
12 /** 14 /**
13 * @author 子诚 15 * @author 子诚
@@ -71,6 +73,7 @@ public class UserMessageBindingController { @@ -71,6 +73,7 @@ public class UserMessageBindingController {
71 */ 73 */
72 @DeleteMapping("/delete") 74 @DeleteMapping("/delete")
73 public ResultJson deleteUserMessageBinding(@RequestBody UserMessageBinding userMessageBinding) 75 public ResultJson deleteUserMessageBinding(@RequestBody UserMessageBinding userMessageBinding)
  76 + throws IOException, TimeoutException
74 { 77 {
75 // 执行删除方法 78 // 执行删除方法
76 return userMessageBindingService.deleteByPrimaryKey(userMessageBinding.getId()) > 0 79 return userMessageBindingService.deleteByPrimaryKey(userMessageBinding.getId()) > 0
@@ -86,6 +89,7 @@ public class UserMessageBindingController { @@ -86,6 +89,7 @@ public class UserMessageBindingController {
86 */ 89 */
87 @GetMapping("/batchRemove") 90 @GetMapping("/batchRemove")
88 public ResultJson batchRemoveUserMessageBinding(String ids) 91 public ResultJson batchRemoveUserMessageBinding(String ids)
  92 + throws IOException, TimeoutException
89 { 93 {
90 // 执行批量删除 94 // 执行批量删除
91 return userMessageBindingService.deleteByPrimaryKey(ids) > 0 95 return userMessageBindingService.deleteByPrimaryKey(ids) > 0
@@ -116,6 +120,7 @@ public class UserMessageBindingController { @@ -116,6 +120,7 @@ public class UserMessageBindingController {
116 */ 120 */
117 @PostMapping("/insert") 121 @PostMapping("/insert")
118 public ResultJson insertUserMessageBinding(@RequestBody UserMessageBinding userMessageBinding) 122 public ResultJson insertUserMessageBinding(@RequestBody UserMessageBinding userMessageBinding)
  123 + throws IOException, TimeoutException
119 { 124 {
120 // 执行新增 125 // 执行新增
121 return userMessageBindingService.insertSelective(userMessageBinding) > 0 126 return userMessageBindingService.insertSelective(userMessageBinding) > 0
1 package com.sunyo.wlpt.message.bus.service.rabbit.utils; 1 package com.sunyo.wlpt.message.bus.service.rabbit.utils;
2 2
3 -import com.rabbitmq.client.AMQP;  
4 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Channel;
5 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.Connection;
6 import com.rabbitmq.client.ConnectionFactory; 5 import com.rabbitmq.client.ConnectionFactory;
@@ -147,9 +146,12 @@ public class RabbitUtils { @@ -147,9 +146,12 @@ public class RabbitUtils {
147 { 146 {
148 Connection connection = getConnection(serverIp, serverPort, virtualHostName); 147 Connection connection = getConnection(serverIp, serverPort, virtualHostName);
149 Channel channel = connection.createChannel(); 148 Channel channel = connection.createChannel();
150 - AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclare(busExchange.getExchangeName(), busExchange.getExchangeType(), busExchange.getDurability(),  
151 - busExchange.getAutoDelete(), busExchange.getInternal(), null);  
152 - log.info("创建交换机的返回值<----->" + declareOk); 149 + channel.exchangeDeclare(busExchange.getExchangeName(),
  150 + busExchange.getExchangeType(),
  151 + busExchange.getDurability(),
  152 + busExchange.getAutoDelete(),
  153 + busExchange.getInternal(),
  154 + null);
153 closeConnectionAndChanel(channel, connection); 155 closeConnectionAndChanel(channel, connection);
154 } 156 }
155 157
@@ -166,18 +168,26 @@ public class RabbitUtils { @@ -166,18 +168,26 @@ public class RabbitUtils {
166 } 168 }
167 169
168 /** 170 /**
169 - * 添加队列(默认设置参数为null) 171 + * 添加队列(默认设置参数为 null)
170 */ 172 */
171 public void createQueue(String serverIp, Integer serverPort, String virtualHostName, BusQueue busQueue) 173 public void createQueue(String serverIp, Integer serverPort, String virtualHostName, BusQueue busQueue)
172 throws IOException, TimeoutException 174 throws IOException, TimeoutException
173 { 175 {
174 Connection connection = getConnection(serverIp, serverPort, virtualHostName); 176 Connection connection = getConnection(serverIp, serverPort, virtualHostName);
175 Channel channel = connection.createChannel(); 177 Channel channel = connection.createChannel();
176 - channel.queueDeclare(busQueue.getQueueName(), busQueue.getDurability(), false, busQueue.getAutoDelete(), null); 178 + channel.queueDeclare(busQueue.getQueueName(),
  179 + busQueue.getDurability(),
  180 + false,
  181 + busQueue.getAutoDelete(),
  182 + null);
177 closeConnectionAndChanel(channel, connection); 183 closeConnectionAndChanel(channel, connection);
178 } 184 }
179 185
180 /** 186 /**
  187 + * 清空队列 channel.queuePurge(queueName);
  188 + */
  189 +
  190 + /**
181 * 删除队列 channel.queueDelete(queueName); 191 * 删除队列 channel.queueDelete(queueName);
182 */ 192 */
183 public void removeQueue(String serverIp, Integer serverPort, String virtualHostName, BusQueue busQueue) 193 public void removeQueue(String serverIp, Integer serverPort, String virtualHostName, BusQueue busQueue)
@@ -190,22 +200,33 @@ public class RabbitUtils { @@ -190,22 +200,33 @@ public class RabbitUtils {
190 } 200 }
191 201
192 /** 202 /**
193 - * 创建绑定关系 203 + * 创建绑定
194 */ 204 */
195 public void createBinding(String serverIp, Integer serverPort, String virtualHostName, UserMessageBinding userMessageBinding) 205 public void createBinding(String serverIp, Integer serverPort, String virtualHostName, UserMessageBinding userMessageBinding)
196 throws IOException, TimeoutException 206 throws IOException, TimeoutException
197 { 207 {
198 Connection connection = getConnection(serverIp, serverPort, virtualHostName); 208 Connection connection = getConnection(serverIp, serverPort, virtualHostName);
199 Channel channel = connection.createChannel(); 209 Channel channel = connection.createChannel();
  210 + channel.queueBind(userMessageBinding.getQueueName(),
  211 + userMessageBinding.getExchangeName(),
  212 + userMessageBinding.getRoutingKeyName());
  213 + closeConnectionAndChanel(channel, connection);
200 } 214 }
201 215
202 /** 216 /**
203 - * 清空队列 channel.queuePurge(queueName);  
204 - */  
205 -  
206 - /**  
207 * 解除绑定 channel.queueUnbind("queueName", "exchangeName","routingKey"); 217 * 解除绑定 channel.queueUnbind("queueName", "exchangeName","routingKey");
208 */ 218 */
  219 + public void removeBinding(String serverIp, Integer serverPort, String virtualHostName,
  220 + UserMessageBinding userMessageBinding)
  221 + throws IOException, TimeoutException
  222 + {
  223 + Connection connection = getConnection(serverIp, serverPort, virtualHostName);
  224 + Channel channel = connection.createChannel();
  225 + channel.queueUnbind(userMessageBinding.getQueueName(),
  226 + userMessageBinding.getExchangeName(),
  227 + userMessageBinding.getRoutingKeyName());
  228 + closeConnectionAndChanel(channel, connection);
  229 + }
209 230
210 /** 231 /**
211 * 前往创建交换机的路上 232 * 前往创建交换机的路上
@@ -248,6 +269,28 @@ public class RabbitUtils { @@ -248,6 +269,28 @@ public class RabbitUtils {
248 } 269 }
249 270
250 /** 271 /**
  272 + * 前往创建绑定的路上
  273 + */
  274 + public void toCreateBinding(UserMessageBinding userMessageBinding)
  275 + throws IOException, TimeoutException
  276 + {
  277 + VirtualHost virtualHost = getVirtualHost(userMessageBinding.getVirtualHostId());
  278 + BusServer busServer = getBusServer(virtualHost.getServerId());
  279 + createBinding(busServer.getServerIp(), busServer.getServerPort(), virtualHost.getVirtualHostName(), userMessageBinding);
  280 + }
  281 +
  282 + /**
  283 + * 前往解除绑定的路上
  284 + */
  285 + public void toRemoveBinding(UserMessageBinding userMessageBinding)
  286 + throws IOException, TimeoutException
  287 + {
  288 + VirtualHost virtualHost = getVirtualHost(userMessageBinding.getVirtualHostId());
  289 + BusServer busServer = getBusServer(virtualHost.getServerId());
  290 + removeBinding(busServer.getServerIp(), busServer.getServerPort(), virtualHost.getVirtualHostName(), userMessageBinding);
  291 + }
  292 +
  293 + /**
251 * 根据虚拟主机id,获取虚拟主机信息 294 * 根据虚拟主机id,获取虚拟主机信息
252 * 295 *
253 * @param virtualHostId 虚拟主机id 296 * @param virtualHostId 虚拟主机id
@@ -4,6 +4,9 @@ import com.github.pagehelper.PageInfo; @@ -4,6 +4,9 @@ import com.github.pagehelper.PageInfo;
4 import com.sunyo.wlpt.message.bus.service.domain.UserMessageBinding; 4 import com.sunyo.wlpt.message.bus.service.domain.UserMessageBinding;
5 import com.sunyo.wlpt.message.bus.service.domain.XmlData; 5 import com.sunyo.wlpt.message.bus.service.domain.XmlData;
6 6
  7 +import java.io.IOException;
  8 +import java.util.concurrent.TimeoutException;
  9 +
7 /** 10 /**
8 * @author 子诚 11 * @author 子诚
9 * Description: 12 * Description:
@@ -16,7 +19,7 @@ public interface UserMessageBindingService { @@ -16,7 +19,7 @@ public interface UserMessageBindingService {
16 * @param id primaryKey 19 * @param id primaryKey
17 * @return deleteCount 20 * @return deleteCount
18 */ 21 */
19 - int deleteByPrimaryKey(String id); 22 + int deleteByPrimaryKey(String id) throws IOException, TimeoutException;
20 23
21 /** 24 /**
22 * 新增 25 * 新增
@@ -32,7 +35,7 @@ public interface UserMessageBindingService { @@ -32,7 +35,7 @@ public interface UserMessageBindingService {
32 * @param record the record 35 * @param record the record
33 * @return insert count 36 * @return insert count
34 */ 37 */
35 - int insertSelective(UserMessageBinding record); 38 + int insertSelective(UserMessageBinding record) throws IOException, TimeoutException;
36 39
37 /** 40 /**
38 * 根据主键查询 41 * 根据主键查询
@@ -4,6 +4,7 @@ import com.github.pagehelper.PageHelper; @@ -4,6 +4,7 @@ import com.github.pagehelper.PageHelper;
4 import com.github.pagehelper.PageInfo; 4 import com.github.pagehelper.PageInfo;
5 import com.sunyo.wlpt.message.bus.service.domain.*; 5 import com.sunyo.wlpt.message.bus.service.domain.*;
6 import com.sunyo.wlpt.message.bus.service.mapper.UserMessageBindingMapper; 6 import com.sunyo.wlpt.message.bus.service.mapper.UserMessageBindingMapper;
  7 +import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
7 import com.sunyo.wlpt.message.bus.service.service.*; 8 import com.sunyo.wlpt.message.bus.service.service.*;
8 import com.sunyo.wlpt.message.bus.service.utils.IdUtils; 9 import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
9 import lombok.extern.slf4j.Slf4j; 10 import lombok.extern.slf4j.Slf4j;
@@ -12,7 +13,9 @@ import org.springframework.transaction.annotation.Propagation; @@ -12,7 +13,9 @@ import org.springframework.transaction.annotation.Propagation;
12 import org.springframework.transaction.annotation.Transactional; 13 import org.springframework.transaction.annotation.Transactional;
13 14
14 import javax.annotation.Resource; 15 import javax.annotation.Resource;
  16 +import java.io.IOException;
15 import java.util.List; 17 import java.util.List;
  18 +import java.util.concurrent.TimeoutException;
16 19
17 import static com.sunyo.wlpt.message.bus.service.common.Constant.EXIST_UMB; 20 import static com.sunyo.wlpt.message.bus.service.common.Constant.EXIST_UMB;
18 21
@@ -26,6 +29,9 @@ import static com.sunyo.wlpt.message.bus.service.common.Constant.EXIST_UMB; @@ -26,6 +29,9 @@ import static com.sunyo.wlpt.message.bus.service.common.Constant.EXIST_UMB;
26 public class UserMessageBindingServiceImpl implements UserMessageBindingService { 29 public class UserMessageBindingServiceImpl implements UserMessageBindingService {
27 30
28 @Resource 31 @Resource
  32 + private RabbitUtils rabbitUtils;
  33 +
  34 + @Resource
29 private UserInfoService userInfoService; 35 private UserInfoService userInfoService;
30 36
31 @Resource 37 @Resource
@@ -48,7 +54,7 @@ public class UserMessageBindingServiceImpl implements UserMessageBindingService @@ -48,7 +54,7 @@ public class UserMessageBindingServiceImpl implements UserMessageBindingService
48 54
49 @Override 55 @Override
50 @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED) 56 @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
51 - public int deleteByPrimaryKey(String id) 57 + public int deleteByPrimaryKey(String id) throws IOException, TimeoutException
52 { 58 {
53 // 判断删除的个数,需被删除的个数是否一致 59 // 判断删除的个数,需被删除的个数是否一致
54 int index = 0; 60 int index = 0;
@@ -58,7 +64,9 @@ public class UserMessageBindingServiceImpl implements UserMessageBindingService @@ -58,7 +64,9 @@ public class UserMessageBindingServiceImpl implements UserMessageBindingService
58 try { 64 try {
59 String[] split = id.split(splitItem); 65 String[] split = id.split(splitItem);
60 for (int i = 0; i < split.length; i++) { 66 for (int i = 0; i < split.length; i++) {
  67 + UserMessageBinding userMessageBinding = selectByPrimaryKey(split[i]);
61 int num = userMessageBindingMapper.deleteByPrimaryKey(split[i]); 68 int num = userMessageBindingMapper.deleteByPrimaryKey(split[i]);
  69 + deleteBinding(userMessageBinding);
62 if (num > 0) { 70 if (num > 0) {
63 index = index + num; 71 index = index + num;
64 } 72 }
@@ -73,10 +81,21 @@ public class UserMessageBindingServiceImpl implements UserMessageBindingService @@ -73,10 +81,21 @@ public class UserMessageBindingServiceImpl implements UserMessageBindingService
73 return 0; 81 return 0;
74 } 82 }
75 } else { 83 } else {
76 - return userMessageBindingMapper.deleteByPrimaryKey(id); 84 + UserMessageBinding userMessageBinding = selectByPrimaryKey(id);
  85 + int num = userMessageBindingMapper.deleteByPrimaryKey(id);
  86 + deleteBinding(userMessageBinding);
  87 + return num;
77 } 88 }
78 } 89 }
79 90
  91 + /**
  92 + * 解除MQ服务器上的绑定关系
  93 + */
  94 + public void deleteBinding(UserMessageBinding userMessageBinding) throws IOException, TimeoutException
  95 + {
  96 + rabbitUtils.toRemoveBinding(userMessageBinding);
  97 + }
  98 +
80 @Override 99 @Override
81 public int insert(UserMessageBinding record) 100 public int insert(UserMessageBinding record)
82 { 101 {
@@ -85,7 +104,7 @@ public class UserMessageBindingServiceImpl implements UserMessageBindingService @@ -85,7 +104,7 @@ public class UserMessageBindingServiceImpl implements UserMessageBindingService
85 104
86 @Override 105 @Override
87 @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED) 106 @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
88 - public int insertSelective(UserMessageBinding userMessageBinding) 107 + public int insertSelective(UserMessageBinding userMessageBinding) throws IOException, TimeoutException
89 { 108 {
90 int index = 0; 109 int index = 0;
91 String queueId = userMessageBinding.getQueueId(); 110 String queueId = userMessageBinding.getQueueId();
@@ -382,7 +401,7 @@ public class UserMessageBindingServiceImpl implements UserMessageBindingService @@ -382,7 +401,7 @@ public class UserMessageBindingServiceImpl implements UserMessageBindingService
382 /** 401 /**
383 * 接下来的 校验-填充-添加 402 * 接下来的 校验-填充-添加
384 */ 403 */
385 - public int nextValidateAndFill(UserMessageBinding userMessageBinding) 404 + public int nextValidateAndFill(UserMessageBinding userMessageBinding) throws IOException, TimeoutException
386 { 405 {
387 String validate = validateBinding(userMessageBinding); 406 String validate = validateBinding(userMessageBinding);
388 if (EXIST_UMB.equals(validate)) { 407 if (EXIST_UMB.equals(validate)) {
@@ -390,7 +409,9 @@ public class UserMessageBindingServiceImpl implements UserMessageBindingService @@ -390,7 +409,9 @@ public class UserMessageBindingServiceImpl implements UserMessageBindingService
390 return 0; 409 return 0;
391 } else if (validate == null) { 410 } else if (validate == null) {
392 // 此处添加MQ服务器上的绑定关系 411 // 此处添加MQ服务器上的绑定关系
393 - return userMessageBindingMapper.insertSelective(umb_fillName(userMessageBinding)); 412 + UserMessageBinding completeBinding = umb_fillName(userMessageBinding);
  413 + rabbitUtils.toCreateBinding(completeBinding);
  414 + return userMessageBindingMapper.insertSelective(completeBinding);
394 } else { 415 } else {
395 return 0; 416 return 0;
396 } 417 }