作者 王勇

拉取模式,取消息基本完善

@@ -39,21 +39,7 @@ spring: @@ -39,21 +39,7 @@ spring:
39 username: rabbit 39 username: rabbit
40 password: 123456 40 password: 123456
41 virtual-host: / 41 virtual-host: /
42 - # 开启手动ack机制  
43 - # listener:  
44 - # simple:  
45 - # acknowledge-mode: manual  
46 - # # 开启 confrim 机制--保证消息发送到交换机  
47 - # publisher-confrim-type: simple  
48 - # # 开启 return 机制--保证消息从交换机分发到队列  
49 - # publisher-returns: true  
50 42
51 -  
52 -  
53 - # host: 192.168.1.63  
54 - # port: 5672  
55 - # username: mrz  
56 - # password: vmvnv1v2  
57 # 多环境配置 43 # 多环境配置
58 profiles: 44 profiles:
59 active: dev 45 active: dev
@@ -116,10 +102,6 @@ management: @@ -116,10 +102,6 @@ management:
116 shutdown: 102 shutdown:
117 enabled: true 103 enabled: true
118 104
119 -path:  
120 - # 文件夹的位置,就一个文件夹  
121 - dir: xml  
122 -  
123 # 基础信息配置 105 # 基础信息配置
124 info: 106 info:
125 version: 1.0 107 version: 1.0
@@ -46,6 +46,11 @@ @@ -46,6 +46,11 @@
46 </dependency> 46 </dependency>
47 47
48 <dependency> 48 <dependency>
  49 + <groupId>org.apache.httpcomponents</groupId>
  50 + <artifactId>httpclient</artifactId>
  51 + </dependency>
  52 +
  53 + <dependency>
49 <groupId>com.rabbitmq</groupId> 54 <groupId>com.rabbitmq</groupId>
50 <artifactId>http-client</artifactId> 55 <artifactId>http-client</artifactId>
51 <version>3.7.0.RELEASE</version> 56 <version>3.7.0.RELEASE</version>
1 -package com.sunyo.wlpt.message.bus.service.config;  
2 -  
3 -import org.springframework.beans.factory.annotation.Value;  
4 -import org.springframework.context.annotation.Configuration;  
5 -import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;  
6 -import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;  
7 -  
8 -/**  
9 - * @author 子诚  
10 - * Description:  
11 - * 时间:2020/7/28 15:18  
12 - */  
13 -@Configuration  
14 -public class MyWebMvcConfigurer implements WebMvcConfigurer {  
15 - @Value("${path.dir}")  
16 - private String dir;  
17 -  
18 - @Override  
19 - public void addResourceHandlers(ResourceHandlerRegistry registry) {  
20 - registry.addResourceHandler("/bus/**").addResourceLocations("file:" + dir + "/");  
21 - WebMvcConfigurer.super.addResourceHandlers(registry);  
22 - }  
23 -}  
@@ -19,6 +19,8 @@ import java.util.concurrent.CountDownLatch; @@ -19,6 +19,8 @@ import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.Future; 19 import java.util.concurrent.Future;
20 import java.util.concurrent.TimeoutException; 20 import java.util.concurrent.TimeoutException;
21 21
  22 +import static com.sunyo.wlpt.message.bus.service.common.Constant.RESULT_SUCCESS;
  23 +
22 /** 24 /**
23 * @author 子诚 25 * @author 子诚
24 * Description: 26 * Description:
@@ -29,6 +31,7 @@ import java.util.concurrent.TimeoutException; @@ -29,6 +31,7 @@ import java.util.concurrent.TimeoutException;
29 @RequestMapping("bus/rabbit") 31 @RequestMapping("bus/rabbit")
30 @RestController 32 @RestController
31 public class RabbitController { 33 public class RabbitController {
  34 +
32 @Resource 35 @Resource
33 private UserMessageBindingService userMessageBindingService; 36 private UserMessageBindingService userMessageBindingService;
34 37
@@ -47,10 +50,58 @@ public class RabbitController { @@ -47,10 +50,58 @@ public class RabbitController {
47 * @throws IOException 50 * @throws IOException
48 * @throws TimeoutException 51 * @throws TimeoutException
49 */ 52 */
50 - @GetMapping("/test/consumer")  
51 - public void consumer() throws IOException, TimeoutException 53 + @GetMapping("/consumer_one")
  54 + public ResultJson consumerOne(
  55 + @RequestParam(value = "receiver") String receiver,
  56 + @RequestParam(value = "SERV") String SERV,
  57 + @RequestParam(value = "VSHT") String VSHT,
  58 + @RequestParam(value = "RCVR") String RCVR)
52 { 59 {
  60 + try {
  61 + XmlData xmlData = XmlData.builder()
  62 + .receiver(receiver)
  63 + .serverName(SERV)
  64 + .virtualHostName(VSHT)
  65 + .queueName(RCVR)
  66 + .build();
  67 + ResultJson validate = xmlUtils.validateReceiveParam(xmlData);
  68 + if (!RESULT_SUCCESS.equals(validate.getCode())) {
  69 + return validate;
  70 + }
  71 + ResultJson result = directUtils.directConsumerByPull(xmlData);
  72 + return result;
  73 + } catch (IOException | TimeoutException e) {
  74 + return ResultJson.error(CustomExceptionType.RECEIVE_SERVER_EXCEPTION);
  75 + }
  76 + }
53 77
  78 + @GetMapping("/consumer_more")
  79 + public ResultJson consumerMore(
  80 + @RequestParam(value = "receiver") String receiver,
  81 + @RequestParam(value = "SERV") String SERV,
  82 + @RequestParam(value = "VSHT") String VSHT,
  83 + @RequestParam(value = "RCVR") String RCVR,
  84 + @RequestParam(value = "count", required = false) Integer count)
  85 + {
  86 + try {
  87 + XmlData xmlData = XmlData.builder()
  88 + .receiver(receiver)
  89 + .serverName(SERV)
  90 + .virtualHostName(VSHT)
  91 + .queueName(RCVR)
  92 + .build();
  93 + ResultJson validate = xmlUtils.validateReceiveParam(xmlData);
  94 + if (!RESULT_SUCCESS.equals(validate.getCode())) {
  95 + return validate;
  96 + }
  97 + if (count == null || count < 1) {
  98 + count = 1;
  99 + }
  100 + ResultJson result = directUtils.directConsumerByPullMore(xmlData, count);
  101 + return result;
  102 + } catch (IOException | TimeoutException e) {
  103 + return ResultJson.error(CustomExceptionType.RECEIVE_SERVER_EXCEPTION);
  104 + }
54 } 105 }
55 106
56 @PostMapping("/product/old") 107 @PostMapping("/product/old")
@@ -63,6 +63,11 @@ public class XmlData implements Serializable { @@ -63,6 +63,11 @@ public class XmlData implements Serializable {
63 private String sender; 63 private String sender;
64 64
65 /** 65 /**
  66 + * 接收者
  67 + */
  68 + private String receiver;
  69 +
  70 + /**
66 * 对应:标签 RCVR -> 指定接收者(队列名称) 71 * 对应:标签 RCVR -> 指定接收者(队列名称)
67 */ 72 */
68 private String queueName; 73 private String queueName;
  1 +package com.sunyo.wlpt.message.bus.service.domain.view;
  2 +
  3 +import com.rabbitmq.http.client.domain.ExchangeInfo;
  4 +import lombok.AllArgsConstructor;
  5 +import lombok.Builder;
  6 +import lombok.Data;
  7 +import lombok.NoArgsConstructor;
  8 +
  9 +import java.io.Serializable;
  10 +
  11 +/**
  12 + * @author 子诚
  13 + * Description:自定义MQ返回信息
  14 + * 时间:2020/8/27 16:11
  15 + */
  16 +@Data
  17 +@Builder
  18 +@AllArgsConstructor
  19 +@NoArgsConstructor
  20 +public class ViewExchangeInfo implements Serializable {
  21 + private static final long serialVersionUID = -3353890396017709770L;
  22 + private String serverName;
  23 + private ExchangeInfo exchangeInfo;
  24 +}
@@ -6,6 +6,8 @@ import lombok.Builder; @@ -6,6 +6,8 @@ import lombok.Builder;
6 import lombok.Data; 6 import lombok.Data;
7 import lombok.NoArgsConstructor; 7 import lombok.NoArgsConstructor;
8 8
  9 +import java.io.Serializable;
  10 +
9 /** 11 /**
10 * @author 子诚 12 * @author 子诚
11 * Description: 13 * Description:
@@ -15,7 +17,8 @@ import lombok.NoArgsConstructor; @@ -15,7 +17,8 @@ import lombok.NoArgsConstructor;
15 @Builder 17 @Builder
16 @AllArgsConstructor 18 @AllArgsConstructor
17 @NoArgsConstructor 19 @NoArgsConstructor
18 -public class ViewQueueInfo { 20 +public class ViewQueueInfo implements Serializable {
  21 + private static final long serialVersionUID = -1016067862213381683L;
19 private String serverName; 22 private String serverName;
20 private QueueInfo queueInfo; 23 private QueueInfo queueInfo;
21 24
@@ -8,6 +8,9 @@ package com.sunyo.wlpt.message.bus.service.exception; @@ -8,6 +8,9 @@ package com.sunyo.wlpt.message.bus.service.exception;
8 8
9 public enum CustomExceptionType { 9 public enum CustomExceptionType {
10 MESSAGE_SUCCESS("10200", "消息发送成功"), 10 MESSAGE_SUCCESS("10200", "消息发送成功"),
  11 + RECEIVE_SUCCESS("20200", "接收消息,成功!"),
  12 +
  13 + RECEIVE_SERVER_EXCEPTION("20500", "服务器异常,接收消息失败!请联系管理员处理"),
11 SERVER_EXCEPTION("10500", "服务器异常,发送消息失败!"), 14 SERVER_EXCEPTION("10500", "服务器异常,发送消息失败!"),
12 CLIENT_EXCEPTION("10400", "报文格式错误,请检查报文格式!"), 15 CLIENT_EXCEPTION("10400", "报文格式错误,请检查报文格式!"),
13 16
@@ -27,6 +30,22 @@ public enum CustomExceptionType { @@ -27,6 +30,22 @@ public enum CustomExceptionType {
27 EXCHANGE_NO_EXIST("20405", "报文数据错误,交换机不存在!"), 30 EXCHANGE_NO_EXIST("20405", "报文数据错误,交换机不存在!"),
28 ROUTING_KEY_NO_EXIST("20406", "报文数据错误,路由键不存在!"), 31 ROUTING_KEY_NO_EXIST("20406", "报文数据错误,路由键不存在!"),
29 32
  33 + RECEIVE_RECEIVER_ERROR("30401", "请仔细检查,接收者不存在!"),
  34 + RECEIVE_SERVER_ERROR("30402", "参数错误,服务器名称不存在!"),
  35 + RECEIVE_HOST_ERROR("30403", "参数错误,虚拟主机名称不存在!"),
  36 + RECEIVE_QUEUE_ERROR("30404", "参数错误,队列名称不存在!"),
  37 + RECEIVE_SERVER_HOST_ERROR("30405", "请仔细检查,该虚拟主机不属于该MQ服务器!"),
  38 + RECEIVE_HOST_QUEUE_ERROR("30406", "请仔细检查,该队列不属于该虚拟主机!"),
  39 + RECEIVE_USER_SERVER_HOST_ERROR("30407", "请仔细检查,该用户无权操作此虚拟主机"),
  40 + RECEIVE_USER_QUEUE_ERROR("30408", "请仔细检查,该用户无权操作此队列"),
  41 +
  42 + RECEIVE_RECEIVER_NO_EXIST("30411", "请仔细检查,接收者不能为空!"),
  43 + RECEIVE_PASSWORD_NO_EXIST("30412", "请仔细检查,接收者不能为空!"),
  44 + RECEIVE_SERVER_NO_EXIST("30413", "参数错误,服务器名称不能为空!"),
  45 + RECEIVE_HOST_NO_EXIST("30414", "参数错误,虚拟主机名称不能为空!"),
  46 + RECEIVE_QUEUE_NO_EXIST("30415", "参数错误,队列名称不能为空!"),
  47 +
  48 +
30 CLIENT_ERROR("400", "客户端异常"), 49 CLIENT_ERROR("400", "客户端异常"),
31 SYSTEM_ERROR("500", "系统服务异常"), 50 SYSTEM_ERROR("500", "系统服务异常"),
32 OTHER_ERROR("999", "其他未知异常"); 51 OTHER_ERROR("999", "其他未知异常");
@@ -94,6 +94,15 @@ public interface BusQueueMapper { @@ -94,6 +94,15 @@ public interface BusQueueMapper {
94 List<BusQueue> validateBusQueue(BusQueue busQueue); 94 List<BusQueue> validateBusQueue(BusQueue busQueue);
95 95
96 /** 96 /**
  97 + * 校验队列是否存在,根据用户名以及队列名称
  98 + *
  99 + * @param username 用户名称
  100 + * @param queueName 队列名称
  101 + * @return
  102 + */
  103 + List<BusQueue> validateByUserNameAndQueueName(@Param("username") String username, @Param("queueName") String queueName);
  104 +
  105 + /**
97 * 仅,查询队列列表 106 * 仅,查询队列列表
98 * 107 *
99 * @param busQueue {@link BusQueue} 108 * @param busQueue {@link BusQueue}
@@ -110,6 +119,14 @@ public interface BusQueueMapper { @@ -110,6 +119,14 @@ public interface BusQueueMapper {
110 List<BusQueue> selectByUsername(String username); 119 List<BusQueue> selectByUsername(String username);
111 120
112 /** 121 /**
  122 + * 根据用户名查询队列信息
  123 + *
  124 + * @param queueName 队列名称
  125 + * @return
  126 + */
  127 + BusQueue selectByQueueName(@Param("queueName") String queueName);
  128 +
  129 + /**
113 * 根据用户名称和虚拟主机Id查询队列信息 130 * 根据用户名称和虚拟主机Id查询队列信息
114 * 131 *
115 * @param username 用户名称 132 * @param username 用户名称
@@ -103,6 +103,18 @@ public interface UserInfoMapper { @@ -103,6 +103,18 @@ public interface UserInfoMapper {
103 List<UserInfo> validateUserInfo(UserInfo userInfo); 103 List<UserInfo> validateUserInfo(UserInfo userInfo);
104 104
105 /** 105 /**
  106 + * 校验用户信息
  107 + *
  108 + * @param username 用户名称
  109 + * @param serverName 服务器名称
  110 + * @param virtualHostName 虚拟主机名称
  111 + * @return
  112 + */
  113 + List<UserInfo> validateUserInfoByParams(@Param("username") String username,
  114 + @Param("serverName") String serverName,
  115 + @Param("virtualHostName") String virtualHostName);
  116 +
  117 + /**
106 * 根据队列中的用户名,虚拟主机名称和虚拟主机id,查询用户关系 118 * 根据队列中的用户名,虚拟主机名称和虚拟主机id,查询用户关系
107 * 119 *
108 * @param busQueue {@link BusQueue} 120 * @param busQueue {@link BusQueue}
1 package com.sunyo.wlpt.message.bus.service.rabbit.utils; 1 package com.sunyo.wlpt.message.bus.service.rabbit.utils;
2 2
3 import com.rabbitmq.http.client.Client; 3 import com.rabbitmq.http.client.Client;
  4 +import com.rabbitmq.http.client.domain.ExchangeInfo;
4 import com.rabbitmq.http.client.domain.QueueInfo; 5 import com.rabbitmq.http.client.domain.QueueInfo;
5 import com.rabbitmq.http.client.domain.UserPermissions; 6 import com.rabbitmq.http.client.domain.UserPermissions;
6 import com.sunyo.wlpt.message.bus.service.domain.BusServer; 7 import com.sunyo.wlpt.message.bus.service.domain.BusServer;
7 import com.sunyo.wlpt.message.bus.service.domain.UserInfo; 8 import com.sunyo.wlpt.message.bus.service.domain.UserInfo;
8 import com.sunyo.wlpt.message.bus.service.domain.VirtualHost; 9 import com.sunyo.wlpt.message.bus.service.domain.VirtualHost;
  10 +import com.sunyo.wlpt.message.bus.service.domain.view.ViewExchangeInfo;
9 import com.sunyo.wlpt.message.bus.service.domain.view.ViewQueueInfo; 11 import com.sunyo.wlpt.message.bus.service.domain.view.ViewQueueInfo;
10 import com.sunyo.wlpt.message.bus.service.utils.AESUtils; 12 import com.sunyo.wlpt.message.bus.service.utils.AESUtils;
11 13
@@ -167,6 +169,13 @@ public class ClientUtils { @@ -167,6 +169,13 @@ public class ClientUtils {
167 return reformatQueueInfo(busServer.getServerName(), queues); 169 return reformatQueueInfo(busServer.getServerName(), queues);
168 } 170 }
169 171
  172 + /**
  173 + * 队列信息,重新格式化
  174 + *
  175 + * @param serverName MQ服务器名称
  176 + * @param queues 队列信息 {@link QueueInfo}
  177 + * @return
  178 + */
170 public static List<ViewQueueInfo> reformatQueueInfo(String serverName, List<QueueInfo> queues) 179 public static List<ViewQueueInfo> reformatQueueInfo(String serverName, List<QueueInfo> queues)
171 { 180 {
172 List<ViewQueueInfo> list = new ArrayList<>(); 181 List<ViewQueueInfo> list = new ArrayList<>();
@@ -204,10 +213,74 @@ public class ClientUtils { @@ -204,10 +213,74 @@ public class ClientUtils {
204 * @throws IOException 213 * @throws IOException
205 * @throws URISyntaxException 214 * @throws URISyntaxException
206 */ 215 */
207 - public QueueInfo getViewQueues(BusServer busServer, String vHostName, String queueName) throws IOException, URISyntaxException 216 + public ViewQueueInfo getViewQueues(BusServer busServer, String vHostName, String queueName) throws IOException, URISyntaxException
208 { 217 {
209 Client client = connectClient(busServer); 218 Client client = connectClient(busServer);
210 QueueInfo queue = client.getQueue(vHostName, queueName); 219 QueueInfo queue = client.getQueue(vHostName, queueName);
211 - return queue; 220 + ViewQueueInfo viewQueueInfo = ViewQueueInfo.builder().serverName(busServer.getServerName()).queueInfo(queue).build();
  221 + return viewQueueInfo;
  222 + }
  223 +
  224 + /**
  225 + * 获取MQ界面的交换机信息,重载
  226 + *
  227 + * @param busServer MQ服务器信息
  228 + * @return
  229 + * @throws IOException
  230 + * @throws URISyntaxException
  231 + */
  232 + public static List<ViewExchangeInfo> getViewExchanges(BusServer busServer) throws IOException, URISyntaxException
  233 + {
  234 + Client client = connectClient(busServer);
  235 + List<ExchangeInfo> exchanges = client.getExchanges();
  236 + List<ViewExchangeInfo> viewExchangeInfos = reformatExchangeInfo(busServer.getServerName(), exchanges);
  237 + return viewExchangeInfos;
  238 + }
  239 +
  240 + public static List<ViewExchangeInfo> reformatExchangeInfo(String serverName, List<ExchangeInfo> exchanges)
  241 + {
  242 + List<ViewExchangeInfo> list = new ArrayList<>();
  243 + // 将获取到的队列信息,拼接一个属性,服务器名称
  244 + for (ExchangeInfo exchangeInfo : exchanges) {
  245 + ViewExchangeInfo viewExchangeInfo = ViewExchangeInfo.builder().serverName(serverName).exchangeInfo(exchangeInfo).build();
  246 + list.add(viewExchangeInfo);
  247 + }
  248 + return list;
  249 + }
  250 +
  251 + /**
  252 + * 获取MQ界面的交换机信息,重载
  253 + *
  254 + * @param busServer MQ服务器信息
  255 + * @param vHostName 虚拟主机
  256 + * @return
  257 + * @throws IOException
  258 + * @throws URISyntaxException
  259 + */
  260 + public static List<ViewExchangeInfo> getViewExchanges(BusServer busServer, String vHostName) throws IOException, URISyntaxException
  261 + {
  262 + Client client = connectClient(busServer);
  263 + List<ExchangeInfo> exchanges = client.getExchanges(vHostName);
  264 + List<ViewExchangeInfo> viewExchangeInfos = reformatExchangeInfo(busServer.getServerName(), exchanges);
  265 + return viewExchangeInfos;
212 } 266 }
  267 +
  268 + /**
  269 + * 获取MQ界面的交换机信息,重载
  270 + *
  271 + * @param busServer MQ服务器信息
  272 + * @param vHostName 虚拟主机
  273 + * @param exchangeName 交换机名称
  274 + * @return
  275 + * @throws IOException
  276 + * @throws URISyntaxException
  277 + */
  278 + public static ViewExchangeInfo getViewExchanges(BusServer busServer, String vHostName, String exchangeName) throws IOException, URISyntaxException
  279 + {
  280 + Client client = connectClient(busServer);
  281 + ExchangeInfo exchangeInfo = client.getExchange(vHostName, exchangeName);
  282 + ViewExchangeInfo viewExchangeInfo = ViewExchangeInfo.builder().serverName(busServer.getServerName()).exchangeInfo(exchangeInfo).build();
  283 + return viewExchangeInfo;
  284 + }
  285 +
213 } 286 }
@@ -6,12 +6,15 @@ import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType; @@ -6,12 +6,15 @@ import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType;
6 import com.sunyo.wlpt.message.bus.service.response.ResultJson; 6 import com.sunyo.wlpt.message.bus.service.response.ResultJson;
7 import com.sunyo.wlpt.message.bus.service.utils.AESUtils; 7 import com.sunyo.wlpt.message.bus.service.utils.AESUtils;
8 import com.sunyo.wlpt.message.bus.service.utils.IdUtils; 8 import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
  9 +import io.netty.util.internal.StringUtil;
9 import lombok.extern.slf4j.Slf4j; 10 import lombok.extern.slf4j.Slf4j;
10 import org.springframework.beans.factory.annotation.Autowired; 11 import org.springframework.beans.factory.annotation.Autowired;
11 import org.springframework.data.redis.core.StringRedisTemplate; 12 import org.springframework.data.redis.core.StringRedisTemplate;
12 import org.springframework.stereotype.Component; 13 import org.springframework.stereotype.Component;
13 14
14 import java.io.IOException; 15 import java.io.IOException;
  16 +import java.util.ArrayList;
  17 +import java.util.List;
15 import java.util.concurrent.TimeUnit; 18 import java.util.concurrent.TimeUnit;
16 import java.util.concurrent.TimeoutException; 19 import java.util.concurrent.TimeoutException;
17 20
@@ -74,59 +77,6 @@ public class DirectUtils { @@ -74,59 +77,6 @@ public class DirectUtils {
74 } 77 }
75 } 78 }
76 79
77 - /**  
78 - * DirectExchange的 消息消费者  
79 - *  
80 - * @throws IOException IO异常  
81 - * @throws TimeoutException 超时异常  
82 - */  
83 - public void directConsumer(XmlData xmlData) throws IOException, TimeoutException  
84 - {  
85 - // 1、创建ConnectionFactory  
86 - Connection connection = getConnection(xmlData.getServerIp(), xmlData.getServerPort(),  
87 - xmlData.getVirtualHostName(), xmlData.getSuperUsername(), xmlData.getSuperPassword());  
88 - // 2、 通过Connection创建一个新的Channel  
89 - Channel channel = connection.createChannel();  
90 - // 3、设置绑定关系(队列、交换机名称、路由键名称)  
91 -// channel.queueBind(queueName, exchangeName, routingKeyName);  
92 - // 一次只接受一条未确认的消息  
93 - channel.basicQos(1);  
94 - // 4、开启监听Queue  
95 - DefaultConsumer consumer = new DefaultConsumer(channel) {  
96 - @Override  
97 - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException  
98 - {  
99 - try {  
100 - // 0、获取出全局唯一的 信息业务id(messageId)  
101 - String messageId = properties.getMessageId();  
102 - // 必须保证 messageId 不为空,避免空指针异常  
103 - if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.MINUTES)) {  
104 -  
105 - // 消费成功,将redis中的 messageId 对应的value修改为 1  
106 - redisTemplate.opsForValue().set(messageId, "1", 10, TimeUnit.MINUTES);  
107 - // 手动ack  
108 - channel.basicAck(envelope.getDeliveryTag(), false);  
109 -  
110 - log.info("接收到消息:" + new String(body, "UTF-8"));  
111 - } else {  
112 - // 获取redis中的value,如果是1,就手动ack。如果是0,就什么也不做(是0代表着,正在被消费中)  
113 - if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {  
114 - // 手动ack  
115 - channel.basicAck(envelope.getDeliveryTag(), false);  
116 - }  
117 - }  
118 - } catch (Exception e) {  
119 - // 手动ack  
120 - channel.basicAck(envelope.getDeliveryTag(), false);  
121 - log.error("接收消息发送错误:" + e.getMessage());  
122 - }  
123 - }  
124 - };  
125 - // 消费消息  
126 - channel.basicConsume(xmlData.getQueueName(), false, consumer);  
127 - }  
128 -  
129 -  
130 public ResultJson sendMessage(XmlData xmlData) throws IOException, TimeoutException 80 public ResultJson sendMessage(XmlData xmlData) throws IOException, TimeoutException
131 { 81 {
132 /** 82 /**
@@ -194,6 +144,160 @@ public class DirectUtils { @@ -194,6 +144,160 @@ public class DirectUtils {
194 closeConnectionAndChanel(channel, connection); 144 closeConnectionAndChanel(channel, connection);
195 return ResultJson.success(CustomExceptionType.MESSAGE_SUCCESS); 145 return ResultJson.success(CustomExceptionType.MESSAGE_SUCCESS);
196 } 146 }
  147 +
  148 + /**
  149 + * byte字节数组 转 String
  150 + *
  151 + * @param byteArray 字节数组
  152 + * @return
  153 + */
  154 + public static String byteArrayToStr(byte[] byteArray)
  155 + {
  156 + if (byteArray == null) {
  157 + return null;
  158 + }
  159 + String str = new String(byteArray);
  160 + return str;
  161 + }
  162 +
  163 + /**
  164 + * DirectExchange的 消息消费者(推模式)
  165 + *
  166 + * @throws IOException IO异常
  167 + * @throws TimeoutException 超时异常
  168 + */
  169 + public List<String> directConsumerByPush(XmlData xmlData) throws IOException, TimeoutException
  170 + {
  171 + List<String> list = new ArrayList<>();
  172 +
  173 + // 1、创建ConnectionFactory
  174 + Connection connection = getConnection(xmlData.getServerIp(), xmlData.getServerPort(),
  175 + xmlData.getVirtualHostName(), xmlData.getSuperUsername(), xmlData.getSuperPassword());
  176 +
  177 + // 2、 通过Connection创建一个新的Channel
  178 + Channel channel = connection.createChannel();
  179 +
  180 + // 一次只接受一条未确认的消息
  181 + channel.basicQos(1);
  182 +
  183 + // 4、开启监听Queue
  184 + DefaultConsumer consumer = new DefaultConsumer(channel) {
  185 + @Override
  186 + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
  187 + {
  188 + try {
  189 + // 0、获取出全局唯一的 信息业务id(messageId)
  190 + String messageId = properties.getMessageId();
  191 +
  192 + // 必须保证 messageId 不为空,避免空指针异常
  193 + if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.MINUTES)) {
  194 +
  195 + // 消费成功,将redis中的 messageId 对应的value修改为 1
  196 + redisTemplate.opsForValue().set(messageId, "1", 1, TimeUnit.MINUTES);
  197 + // 手动ack
  198 + channel.basicAck(envelope.getDeliveryTag(), false);
  199 + log.info("接收到消息:" + new String(body, "UTF-8"));
  200 + String message = new String(body, "UTF-8");
  201 + list.add(message);
  202 + } else {
  203 + // 获取redis中的value,如果是1,就手动ack。如果是0,就什么也不做(是0代表着,正在被消费中)
  204 + if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {
  205 + // 手动ack
  206 + channel.basicAck(envelope.getDeliveryTag(), false);
  207 + }
  208 + }
  209 + } catch (Exception e) {
  210 + // 手动ack
  211 + channel.basicAck(envelope.getDeliveryTag(), false);
  212 + log.error("接收消息发送错误:" + e.getMessage());
  213 + }
  214 + }
  215 + };
  216 + // 消费消息
  217 + channel.basicConsume(xmlData.getQueueName(), false, consumer);
  218 + return list;
  219 + }
  220 +
  221 + /**
  222 + * DirectExchange的 消息消费者(拉模式)
  223 + *
  224 + * @throws IOException IO异常
  225 + * @throws TimeoutException 超时异常
  226 + */
  227 + public ResultJson directConsumerByPull(XmlData xmlData) throws IOException, TimeoutException
  228 + {
  229 + List<String> list = new ArrayList<>();
  230 +
  231 + Connection connection = getConnection(xmlData.getServerIp(), xmlData.getServerPort(), xmlData.getVirtualHostName(), xmlData.getSuperUsername(), xmlData.getSuperPassword());
  232 + Channel channel = connection.createChannel();
  233 + channel.basicQos(1);
  234 +
  235 + GetResponse response = channel.basicGet(xmlData.getQueueName(), false);
  236 + if (response != null && response.getMessageCount() >= 0) {
  237 + String messageId = response.getProps().getMessageId();
  238 + if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 1, TimeUnit.MINUTES)) {
  239 + redisTemplate.opsForValue().set(messageId, "1", 1, TimeUnit.MINUTES);
  240 + String data = byteArrayToStr(response.getBody());
  241 + list.add(data);
  242 + channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
  243 + } else {
  244 + if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {
  245 + channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
  246 + }
  247 + }
  248 + }
  249 + String message = list.get(0);
  250 + return StringUtil.isNullOrEmpty(message)
  251 + ? ResultJson.error(CustomExceptionType.RECEIVE_SERVER_EXCEPTION)
  252 + : ResultJson.success(CustomExceptionType.RECEIVE_SUCCESS, message);
  253 + }
  254 +
  255 + /**
  256 + * DirectExchange的 消息消费者(拉模式、批量拉取)
  257 + */
  258 + public ResultJson directConsumerByPullMore(XmlData xmlData, Integer count) throws IOException, TimeoutException
  259 + {
  260 + List<String> list = new ArrayList<>();
  261 + String serverIp = xmlData.getServerIp();
  262 + Integer serverPort = xmlData.getServerPort();
  263 + String virtualHostName = xmlData.getVirtualHostName();
  264 + String superUsername = xmlData.getSuperUsername();
  265 + String superPassword = xmlData.getSuperPassword();
  266 +
  267 + Connection connection = getConnection(serverIp, serverPort, virtualHostName, superUsername, superPassword);
  268 + Channel channel = connection.createChannel();
  269 + channel.basicQos(1);
  270 +
  271 + while (list.size() < count) {
  272 + GetResponse response = channel.basicGet(xmlData.getQueueName(), false);
  273 + if (response != null) {
  274 + if (response.getMessageCount() >= 0) {
  275 + String messageId = response.getProps().getMessageId();
  276 + if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 1, TimeUnit.MINUTES)) {
  277 + redisTemplate.opsForValue().set(messageId, "1", 1, TimeUnit.MINUTES);
  278 + String data = byteArrayToStr(response.getBody());
  279 + list.add(data);
  280 + channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
  281 + } else {
  282 + if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {
  283 + channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
  284 + }
  285 + }
  286 + }
  287 + } else {
  288 + break;
  289 + }
  290 + }
  291 +
  292 + int size = list.size();
  293 + if (0 < size && size < count) {
  294 + return new ResultJson<>("20200", "接收消息,成功!但是,该队列内只有" + size + "条消息", list);
  295 + }
  296 + return size > 0
  297 + ? ResultJson.success(CustomExceptionType.RECEIVE_SUCCESS, list)
  298 + : ResultJson.error(CustomExceptionType.RECEIVE_SERVER_EXCEPTION);
  299 + }
  300 +
197 } 301 }
198 302
199 303
@@ -130,6 +130,11 @@ public class ResultJson<T> implements Serializable { @@ -130,6 +130,11 @@ public class ResultJson<T> implements Serializable {
130 return new ResultJson<>(customExceptionType.getCode(), customExceptionType.getMsg()); 130 return new ResultJson<>(customExceptionType.getCode(), customExceptionType.getMsg());
131 } 131 }
132 132
  133 + public static ResultJson success(CustomExceptionType customExceptionType, Object data)
  134 + {
  135 + return new ResultJson<>(customExceptionType.getCode(), customExceptionType.getMsg(),data);
  136 + }
  137 +
133 /** 138 /**
134 * 请求出现异常时的响应数据封装 139 * 请求出现异常时的响应数据封装
135 * 140 *
@@ -90,6 +90,15 @@ public interface BusQueueService { @@ -90,6 +90,15 @@ public interface BusQueueService {
90 List<BusQueue> validateBusQueue(BusQueue busQueue); 90 List<BusQueue> validateBusQueue(BusQueue busQueue);
91 91
92 /** 92 /**
  93 + * 校验队列是否存在,根据用户名以及队列名称
  94 + *
  95 + * @param username 用户名称
  96 + * @param queueName 队列名称
  97 + * @return
  98 + */
  99 + List<BusQueue> validateByUserNameAndQueueName(String username, String queueName);
  100 +
  101 + /**
93 * 仅,查询队列列表 102 * 仅,查询队列列表
94 * 103 *
95 * @param busQueue {@link BusQueue} 104 * @param busQueue {@link BusQueue}
@@ -113,5 +122,12 @@ public interface BusQueueService { @@ -113,5 +122,12 @@ public interface BusQueueService {
113 */ 122 */
114 List<BusQueue> selectByUsername(String username); 123 List<BusQueue> selectByUsername(String username);
115 124
  125 + /**
  126 + * 根据用户名查询队列信息
  127 + *
  128 + * @param queueName 队列名称
  129 + * @return
  130 + */
  131 + BusQueue selectByQueueName(String queueName);
116 } 132 }
117 133
@@ -134,6 +134,16 @@ public interface UserInfoService { @@ -134,6 +134,16 @@ public interface UserInfoService {
134 * @return 134 * @return
135 */ 135 */
136 ResultJson batchDeleteUserRelation(String ids) throws IOException, URISyntaxException; 136 ResultJson batchDeleteUserRelation(String ids) throws IOException, URISyntaxException;
  137 +
  138 + /**
  139 + * 校验用户信息
  140 + *
  141 + * @param username 用户名称
  142 + * @param serverName 服务器名称
  143 + * @param virtualHostName 虚拟主机名称
  144 + * @return
  145 + */
  146 + List<UserInfo> validateUserInfoByParams(String username, String serverName, String virtualHostName);
137 } 147 }
138 148
139 149
@@ -170,6 +170,12 @@ public class BusQueueServiceImpl implements BusQueueService { @@ -170,6 +170,12 @@ public class BusQueueServiceImpl implements BusQueueService {
170 } 170 }
171 171
172 @Override 172 @Override
  173 + public List<BusQueue> validateByUserNameAndQueueName(String username, String queueName)
  174 + {
  175 + return busQueueMapper.validateByUserNameAndQueueName(username, queueName);
  176 + }
  177 +
  178 + @Override
173 public List<BusQueue> getQueueList(BusQueue busQueue) 179 public List<BusQueue> getQueueList(BusQueue busQueue)
174 { 180 {
175 List<BusQueue> list = new ArrayList<>(); 181 List<BusQueue> list = new ArrayList<>();
@@ -202,5 +208,11 @@ public class BusQueueServiceImpl implements BusQueueService { @@ -202,5 +208,11 @@ public class BusQueueServiceImpl implements BusQueueService {
202 { 208 {
203 return busQueueMapper.selectByUsername(username); 209 return busQueueMapper.selectByUsername(username);
204 } 210 }
  211 +
  212 + @Override
  213 + public BusQueue selectByQueueName(String queueName)
  214 + {
  215 + return busQueueMapper.selectByQueueName(queueName);
  216 + }
205 } 217 }
206 218
@@ -454,6 +454,12 @@ public class UserInfoServiceImpl implements UserInfoService { @@ -454,6 +454,12 @@ public class UserInfoServiceImpl implements UserInfoService {
454 ? new ResultJson<>("200", "批量删除用户关系,成功!") 454 ? new ResultJson<>("200", "批量删除用户关系,成功!")
455 : new ResultJson<>("500", "批量删除用户关系,失败!"); 455 : new ResultJson<>("500", "批量删除用户关系,失败!");
456 } 456 }
  457 +
  458 + @Override
  459 + public List<UserInfo> validateUserInfoByParams(String username, String serverName, String virtualHostName)
  460 + {
  461 + return userInfoMapper.validateUserInfoByParams(username, serverName, virtualHostName);
  462 + }
457 } 463 }
458 464
459 465
@@ -10,7 +10,6 @@ import org.dom4j.Document; @@ -10,7 +10,6 @@ import org.dom4j.Document;
10 import org.dom4j.DocumentException; 10 import org.dom4j.DocumentException;
11 import org.dom4j.DocumentHelper; 11 import org.dom4j.DocumentHelper;
12 import org.dom4j.Element; 12 import org.dom4j.Element;
13 -import org.springframework.beans.factory.annotation.Value;  
14 import org.springframework.stereotype.Component; 13 import org.springframework.stereotype.Component;
15 14
16 import javax.annotation.Resource; 15 import javax.annotation.Resource;
@@ -30,6 +29,9 @@ public class XmlUtils { @@ -30,6 +29,9 @@ public class XmlUtils {
30 private UserInfoService userInfoService; 29 private UserInfoService userInfoService;
31 30
32 @Resource 31 @Resource
  32 + private BusQueueService busQueueService;
  33 +
  34 + @Resource
33 private BusServerService busServerService; 35 private BusServerService busServerService;
34 36
35 @Resource 37 @Resource
@@ -41,9 +43,6 @@ public class XmlUtils { @@ -41,9 +43,6 @@ public class XmlUtils {
41 @Resource 43 @Resource
42 private RoutingKeyService routingKeyService; 44 private RoutingKeyService routingKeyService;
43 45
44 - @Value("${path.dir}")  
45 - private String dir;  
46 -  
47 /** 46 /**
48 * 解析字符串类型的 xml 47 * 解析字符串类型的 xml
49 * 48 *
@@ -140,9 +139,6 @@ public class XmlUtils { @@ -140,9 +139,6 @@ public class XmlUtils {
140 return ResultJson.error(CustomExceptionType.ROUTING_KEY_NO_EXIST); 139 return ResultJson.error(CustomExceptionType.ROUTING_KEY_NO_EXIST);
141 } 140 }
142 141
143 - // 获取密码  
144 -// xmlData.setPassword(userList.get(0).getPassword());  
145 -  
146 // 获取服务器ip 142 // 获取服务器ip
147 xmlData.setServerPort(serverList.get(0).getServerPort()); 143 xmlData.setServerPort(serverList.get(0).getServerPort());
148 // 获取服务器port 144 // 获取服务器port
@@ -153,7 +149,67 @@ public class XmlUtils { @@ -153,7 +149,67 @@ public class XmlUtils {
153 xmlData.setSuperPassword(serverList.get(0).getSuperPassword()); 149 xmlData.setSuperPassword(serverList.get(0).getSuperPassword());
154 150
155 ResultJson<XmlData> result = new ResultJson<>("200", "通过格式与数据校验", xmlData); 151 ResultJson<XmlData> result = new ResultJson<>("200", "通过格式与数据校验", xmlData);
  152 + return result;
  153 + }
  154 +
  155 + /**
  156 + * 消费消息的参数的校验
  157 + *
  158 + * @param xmlData
  159 + * @return
  160 + */
  161 + public ResultJson validateReceiveParam(XmlData xmlData)
  162 + {
  163 + String receiver = xmlData.getReceiver();
  164 + String serverName = xmlData.getServerName();
  165 + String virtualHostName = xmlData.getVirtualHostName();
  166 + String queueName = xmlData.getQueueName();
  167 +
  168 + if (StringUtil.isNullOrEmpty(receiver)) {
  169 + return ResultJson.error(CustomExceptionType.RECEIVE_RECEIVER_NO_EXIST);
  170 + }
  171 + if (StringUtil.isNullOrEmpty(serverName)) {
  172 + return ResultJson.error(CustomExceptionType.RECEIVE_SERVER_NO_EXIST);
  173 + }
  174 + if (StringUtil.isNullOrEmpty(virtualHostName)) {
  175 + return ResultJson.error(CustomExceptionType.RECEIVE_HOST_NO_EXIST);
  176 + }
  177 + if (StringUtil.isNullOrEmpty(queueName)) {
  178 + return ResultJson.error(CustomExceptionType.RECEIVE_QUEUE_NO_EXIST);
  179 + }
  180 + if (userInfoService.selectUserExist(receiver).size() == 0) {
  181 + return ResultJson.error(CustomExceptionType.RECEIVE_RECEIVER_ERROR);
  182 + }
  183 + BusServer busServer = busServerService.selectByServerName(serverName);
  184 + if (busServer == null) {
  185 + return ResultJson.error(CustomExceptionType.RECEIVE_SERVER_ERROR);
  186 + }
  187 + VirtualHost virtualHost = virtualHostService.selectByVirtualHostName(virtualHostName);
  188 + if (virtualHost == null) {
  189 + return ResultJson.error(CustomExceptionType.RECEIVE_HOST_ERROR);
  190 + }
  191 + if (!busServer.getId().equals(virtualHost.getServerId())) {
  192 + return ResultJson.error(CustomExceptionType.RECEIVE_SERVER_HOST_ERROR);
  193 + }
  194 + if (userInfoService.validateUserInfoByParams(receiver, serverName, virtualHostName).size() == 0) {
  195 + return ResultJson.error(CustomExceptionType.RECEIVE_USER_SERVER_HOST_ERROR);
  196 + }
  197 + BusQueue busQueue = busQueueService.selectByQueueName(queueName);
  198 + if (busQueue == null) {
  199 + return ResultJson.error(CustomExceptionType.RECEIVE_QUEUE_ERROR);
  200 + }
  201 + if (!virtualHost.getId().equals(busQueue.getVirtualHostId())) {
  202 + return ResultJson.error(CustomExceptionType.RECEIVE_HOST_QUEUE_ERROR);
  203 + }
  204 + if(busQueueService.validateByUserNameAndQueueName(receiver,queueName).size()==0){
  205 + return ResultJson.error(CustomExceptionType.RECEIVE_USER_QUEUE_ERROR);
  206 + }
156 207
  208 + xmlData.setServerIp(busServer.getServerIp());
  209 + xmlData.setServerPort(busServer.getServerPort());
  210 + xmlData.setSuperUsername(busServer.getSuperUsername());
  211 + xmlData.setSuperPassword(busServer.getSuperPassword());
  212 + ResultJson<XmlData> result = ResultJson.success("通过校验", xmlData);
157 return result; 213 return result;
158 } 214 }
159 } 215 }
@@ -264,6 +264,14 @@ @@ -264,6 +264,14 @@
264 </where> 264 </where>
265 </select> 265 </select>
266 266
  267 + <select id="validateByUserNameAndQueueName" parameterType="java.lang.String" resultMap="BaseResultMap">
  268 + select
  269 + <include refid="Base_Column_List"/>
  270 + from bus_queue
  271 + where username = #{username,jdbcType=VARCHAR}
  272 + and queue_name = #{queueName,jdbcType=VARCHAR}
  273 + </select>
  274 +
267 <select id="selectByUsername" parameterType="java.lang.String" resultMap="BaseResultMap"> 275 <select id="selectByUsername" parameterType="java.lang.String" resultMap="BaseResultMap">
268 select 276 select
269 <include refid="Base_Column_List"/> 277 <include refid="Base_Column_List"/>
@@ -271,6 +279,13 @@ @@ -271,6 +279,13 @@
271 where username = #{username,jdbcType=VARCHAR} 279 where username = #{username,jdbcType=VARCHAR}
272 </select> 280 </select>
273 281
  282 + <select id="selectByQueueName" parameterType="java.lang.String" resultMap="BaseResultMap">
  283 + select
  284 + <include refid="Base_Column_List"/>
  285 + from bus_queue
  286 + where queue_name = #{queueName,jdbcType=VARCHAR}
  287 + </select>
  288 +
274 <select id="selectByUsernameAndHostId" parameterType="java.lang.String" resultMap="BaseResultMap"> 289 <select id="selectByUsernameAndHostId" parameterType="java.lang.String" resultMap="BaseResultMap">
275 select 290 select
276 <include refid="Base_Column_List"/> 291 <include refid="Base_Column_List"/>
@@ -47,7 +47,7 @@ @@ -47,7 +47,7 @@
47 from user_info 47 from user_info
48 <where> 48 <where>
49 username = #{username,jdbcType=VARCHAR} 49 username = #{username,jdbcType=VARCHAR}
50 - and server_name= #{serverName,jdbcType=VARCHAR} 50 + and server_name = #{serverName,jdbcType=VARCHAR}
51 </where> 51 </where>
52 </delete> 52 </delete>
53 53
@@ -261,7 +261,19 @@ @@ -261,7 +261,19 @@
261 group by username, server_name, server_id 261 group by username, server_name, server_id
262 </select> 262 </select>
263 263
264 - <select id="validateUserInfo" resultType="com.sunyo.wlpt.message.bus.service.domain.UserInfo"> 264 + <select id="validateUserInfo" parameterType="com.sunyo.wlpt.message.bus.service.domain.BusQueue"
  265 + resultMap="BaseResultMap">
  266 + select
  267 + <include refid="Base_Column_List"/>
  268 + from user_info
  269 + <where>
  270 + username = #{username,jdbcType=VARCHAR}
  271 + and server_name = #{serverName,jdbcType=VARCHAR}
  272 + and virtual_host_name = #{virtualHostName,jdbcType=VARCHAR}
  273 + </where>
  274 + </select>
  275 +
  276 + <select id="validateUserInfoByParams" parameterType="java.lang.String" resultMap="BaseResultMap">
265 select 277 select
266 <include refid="Base_Column_List"/> 278 <include refid="Base_Column_List"/>
267 from user_info 279 from user_info