作者 朱兆平

消息类型更改为大数据小组所定义的

@@ -11,6 +11,10 @@ spring: @@ -11,6 +11,10 @@ spring:
11 application: 11 application:
12 name: message-bus-service 12 name: message-bus-service
13 13
  14 + data:
  15 + elasticsearch:
  16 + repositories:
  17 + enabled: true
14 # 数据源配置 18 # 数据源配置
15 datasource: 19 datasource:
16 type: com.alibaba.druid.pool.DruidDataSource 20 type: com.alibaba.druid.pool.DruidDataSource
@@ -39,7 +43,6 @@ spring: @@ -39,7 +43,6 @@ spring:
39 username: mrz 43 username: mrz
40 password: vmvnv1v2 44 password: vmvnv1v2
41 virtual-host: / 45 virtual-host: /
42 -  
43 # 多环境配置 46 # 多环境配置
44 profiles: 47 profiles:
45 active: dev 48 active: dev
@@ -95,6 +98,11 @@ management: @@ -95,6 +98,11 @@ management:
95 show-details: always 98 show-details: always
96 shutdown: 99 shutdown:
97 enabled: true 100 enabled: true
  101 + health:
  102 + rabbit:
  103 + enabled: false
  104 + elasticsearch:
  105 + enabled: false
98 106
99 elasticsearch: 107 elasticsearch:
100 # http连接超时时间 108 # http连接超时时间
  1 +FROM java:8u111
  2 +VOLUME /tmp
  3 +ADD *.jar app.jar
  4 +EXPOSE 19031
  5 +ENTRYPOINT ["java","-jar","/app.jar"]
  6 +# Ubuntu 时区
  7 +RUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
@@ -33,7 +33,7 @@ public class RouterController { @@ -33,7 +33,7 @@ public class RouterController {
33 @Autowired 33 @Autowired
34 MessageRouterReciverFilterService messageRouterReciverFilterService; 34 MessageRouterReciverFilterService messageRouterReciverFilterService;
35 35
36 - @ApiOperation(value = "批量添加消息路由", notes = "超级管理修改其他用户密码") 36 + @ApiOperation(value = "批量添加消息路由", notes = "批量添加消息路由")
37 @PostMapping("batchAdd") 37 @PostMapping("batchAdd")
38 public ResultJson batchAddRouter(@RequestBody MessageRouter messageRouter){ 38 public ResultJson batchAddRouter(@RequestBody MessageRouter messageRouter){
39 if (messageRouter.getSndrs()!=null && messageRouter.getTypes()!=null && messageRouter.getRcvrs()!=null){ 39 if (messageRouter.getSndrs()!=null && messageRouter.getTypes()!=null && messageRouter.getRcvrs()!=null){
@@ -71,13 +71,14 @@ public class RouterController { @@ -71,13 +71,14 @@ public class RouterController {
71 String reciverid=UUID.randomUUID().toString(); 71 String reciverid=UUID.randomUUID().toString();
72 messageRouter.setId(routerid); 72 messageRouter.setId(routerid);
73 int result=messageRouterService.addRouter(messageRouter); 73 int result=messageRouterService.addRouter(messageRouter);
74 - if(messageRouter.getReciver().getRcvrTopic()!=null &&messageRouter.getReciver().getRcvrTopic()!=""){ 74 + //没有指定接收者只添加路由
  75 + if(messageRouter.getReciver()!=null && messageRouter.getReciver().getRcvrTopic()!=null && messageRouter.getReciver().getRcvrTopic()!=""){
75 MessageRouterReciver routerReciver=new MessageRouterReciver(); 76 MessageRouterReciver routerReciver=new MessageRouterReciver();
76 routerReciver.setId(reciverid); 77 routerReciver.setId(reciverid);
77 routerReciver.setMessageRouterId(routerid); 78 routerReciver.setMessageRouterId(routerid);
78 routerReciver.setRcvrTopic(messageRouter.getReciver().getRcvrTopic()); 79 routerReciver.setRcvrTopic(messageRouter.getReciver().getRcvrTopic());
79 messageRouterReciverService.addMessageRouterReciver(routerReciver); 80 messageRouterReciverService.addMessageRouterReciver(routerReciver);
80 - if(messageRouter.getReciver().getReciverFilter().getFilter()!=null&& messageRouter.getReciver().getReciverFilter().getFilter()!=""){ 81 + if(messageRouter.getReciver().getReciverFilter()!=null && messageRouter.getReciver().getReciverFilter().getFilter()!=null&& messageRouter.getReciver().getReciverFilter().getFilter()!=""){
81 MessageRouterReciverFilter routerReciverFilter=new MessageRouterReciverFilter(); 82 MessageRouterReciverFilter routerReciverFilter=new MessageRouterReciverFilter();
82 routerReciverFilter.setId(UUID.randomUUID().toString()); 83 routerReciverFilter.setId(UUID.randomUUID().toString());
83 routerReciverFilter.setMessageRouterReciverId(reciverid); 84 routerReciverFilter.setMessageRouterReciverId(reciverid);
@@ -5,6 +5,7 @@ import com.sunyo.wlpt.message.bus.service.exception.CustomException; @@ -5,6 +5,7 @@ import com.sunyo.wlpt.message.bus.service.exception.CustomException;
5 import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType; 5 import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType;
6 import com.sunyo.wlpt.message.bus.service.model.ESPage; 6 import com.sunyo.wlpt.message.bus.service.model.ESPage;
7 import com.sunyo.wlpt.message.bus.service.model.MessageBusMsg; 7 import com.sunyo.wlpt.message.bus.service.model.MessageBusMsg;
  8 +import com.sunyo.wlpt.message.bus.service.model.es.MESSAGEBUS;
8 import com.sunyo.wlpt.message.bus.service.response.ResultJson; 9 import com.sunyo.wlpt.message.bus.service.response.ResultJson;
9 import com.sunyo.wlpt.message.bus.service.service.ElasticSearchInfoService; 10 import com.sunyo.wlpt.message.bus.service.service.ElasticSearchInfoService;
10 import lombok.extern.slf4j.Slf4j; 11 import lombok.extern.slf4j.Slf4j;
@@ -107,19 +108,19 @@ public class ElasticSearchInfoController { @@ -107,19 +108,19 @@ public class ElasticSearchInfoController {
107 @RequestMapping("/search/wildmsg") 108 @RequestMapping("/search/wildmsg")
108 public ResultJson searchMsg(@RequestBody MessageBusMsg messageBusMsg) 109 public ResultJson searchMsg(@RequestBody MessageBusMsg messageBusMsg)
109 { 110 {
110 - Sort sort = Sort.by(Sort.Direction.DESC, "ddtm","creatime"); 111 + Sort sort = Sort.by(Sort.Direction.DESC, "MSG.HEADER.DDTM","ADMIN.CREATTIME");
111 //前端提交的起始页从1开始,ES是从0开始 112 //前端提交的起始页从1开始,ES是从0开始
112 ESPage page = ESPage.of(messageBusMsg.getPageNum()-1,messageBusMsg.getPageSize(),sort); 113 ESPage page = ESPage.of(messageBusMsg.getPageNum()-1,messageBusMsg.getPageSize(),sort);
113 WildcardQueryBuilder queryBuilders=null; 114 WildcardQueryBuilder queryBuilders=null;
114 if(StringUtils.isNotBlank(messageBusMsg.getAlias_sendContent())){ 115 if(StringUtils.isNotBlank(messageBusMsg.getAlias_sendContent())){
115 - queryBuilders = QueryBuilders.wildcardQuery("msg", "*"+messageBusMsg.getAlias_sendContent().toLowerCase()+"*"); 116 + queryBuilders = QueryBuilders.wildcardQuery("MSG.BODY", "*"+messageBusMsg.getAlias_sendContent().toLowerCase()+"*");
116 }else { 117 }else {
117 return ResultJson.error(new CustomException(CustomExceptionType.SEARCH_EXCEPTION)); 118 return ResultJson.error(new CustomException(CustomExceptionType.SEARCH_EXCEPTION));
118 } 119 }
119 120
120 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); 121 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
121 searchSourceBuilder.query(queryBuilders); 122 searchSourceBuilder.query(queryBuilders);
122 - Page<MessageBusMsg> data = elasticSearchInfoService.search(searchSourceBuilder,MessageBusMsg.class,page); 123 + Page<MESSAGEBUS> data = elasticSearchInfoService.search(searchSourceBuilder,MESSAGEBUS.class,page);
123 ResultJson resultJson = new ResultJson("200","success",data); 124 ResultJson resultJson = new ResultJson("200","success",data);
124 return resultJson; 125 return resultJson;
125 } 126 }
@@ -128,7 +129,7 @@ public class ElasticSearchInfoController { @@ -128,7 +129,7 @@ public class ElasticSearchInfoController {
128 public ResultJson search(@RequestBody MessageBusMsg messageBusMsg) 129 public ResultJson search(@RequestBody MessageBusMsg messageBusMsg)
129 { 130 {
130 131
131 - Sort sort = Sort.by(Sort.Direction.DESC, "ddtm","creatime"); 132 + Sort sort = Sort.by(Sort.Direction.DESC, "MSG.HEADER.DDTM","ADMIN.CREATTIME");
132 ESPage page = ESPage.of(messageBusMsg.getPageNum()-1,messageBusMsg.getPageSize(),sort); 133 ESPage page = ESPage.of(messageBusMsg.getPageNum()-1,messageBusMsg.getPageSize(),sort);
133 /** 134 /**
134 * term精确字段检索不要与matchQuery检索混用 135 * term精确字段检索不要与matchQuery检索混用
@@ -139,7 +140,7 @@ public class ElasticSearchInfoController { @@ -139,7 +140,7 @@ public class ElasticSearchInfoController {
139 log.info(new Date().toString()); 140 log.info(new Date().toString());
140 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); 141 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
141 searchSourceBuilder.query(queryBuilder); 142 searchSourceBuilder.query(queryBuilder);
142 - Page<MessageBusMsg> data = elasticSearchInfoService.search(searchSourceBuilder,MessageBusMsg.class,page); 143 + Page<MESSAGEBUS> data = elasticSearchInfoService.search(searchSourceBuilder,MESSAGEBUS.class,page);
143 ResultJson resultJson = new ResultJson("200","success",data); 144 ResultJson resultJson = new ResultJson("200","success",data);
144 return resultJson; 145 return resultJson;
145 } 146 }
@@ -148,33 +149,33 @@ public class ElasticSearchInfoController { @@ -148,33 +149,33 @@ public class ElasticSearchInfoController {
148 149
149 //and条件 150 //and条件
150 if (StringUtils.isNotBlank(messageBusMsg.getSndr())){ 151 if (StringUtils.isNotBlank(messageBusMsg.getSndr())){
151 - queryBuilder = queryBuilder.must(QueryBuilders.termQuery("sndr",messageBusMsg.getSndr())); 152 + queryBuilder = queryBuilder.must(QueryBuilders.termQuery("MSG.HEADER.SNDR",messageBusMsg.getSndr()));
152 } 153 }
153 if (StringUtils.isNotBlank(messageBusMsg.getBtype())){ 154 if (StringUtils.isNotBlank(messageBusMsg.getBtype())){
154 - queryBuilder = queryBuilder.must(QueryBuilders.termQuery("btype",messageBusMsg.getBtype())); 155 + queryBuilder = queryBuilder.must(QueryBuilders.termQuery("MSG.HEADER.TYPE",messageBusMsg.getBtype()));
155 } 156 }
156 if (StringUtils.isNotBlank(messageBusMsg.getStype())){ 157 if (StringUtils.isNotBlank(messageBusMsg.getStype())){
157 - queryBuilder = queryBuilder.must(QueryBuilders.termQuery("stype",messageBusMsg.getStype())); 158 + queryBuilder = queryBuilder.must(QueryBuilders.termQuery("MSG.HEADER.STYPE",messageBusMsg.getStype()));
158 } 159 }
159 160
160 if (StringUtils.isNotBlank(messageBusMsg.getSeqn())){ 161 if (StringUtils.isNotBlank(messageBusMsg.getSeqn())){
161 - queryBuilder = queryBuilder.must(QueryBuilders.termQuery("seqn",messageBusMsg.getSeqn())); 162 + queryBuilder = queryBuilder.must(QueryBuilders.termQuery("MSG.HEADER.SEQNO",messageBusMsg.getSeqn()));
162 } 163 }
163 if (StringUtils.isNotBlank(messageBusMsg.getMsgid())){ 164 if (StringUtils.isNotBlank(messageBusMsg.getMsgid())){
164 - queryBuilder = queryBuilder.must(QueryBuilders.termQuery("msgid",messageBusMsg.getMsgid())); 165 + queryBuilder = queryBuilder.must(QueryBuilders.termQuery("ADMIN.MSGID",messageBusMsg.getMsgid()));
165 } 166 }
166 167
167 - if (StringUtils.isNotBlank(messageBusMsg.getRcvrsUserName())){  
168 - queryBuilder = queryBuilder.should(QueryBuilders.termQuery("rcvrs.username",messageBusMsg.getRcvrsUserName()));  
169 - } 168 +// if (StringUtils.isNotBlank(messageBusMsg.getRcvrsUserName())){
  169 +// queryBuilder = queryBuilder.should(QueryBuilders.termQuery("rcvrs.username",messageBusMsg.getRcvrsUserName()));
  170 +// }
170 171
171 if (StringUtils.isNotBlank(messageBusMsg.getRcvlogUsername())){ 172 if (StringUtils.isNotBlank(messageBusMsg.getRcvlogUsername())){
172 - queryBuilder = queryBuilder.should(QueryBuilders.termQuery("rcvlog.username",messageBusMsg.getRcvlogUsername())); 173 + queryBuilder = queryBuilder.should(QueryBuilders.termQuery("ADMIN.RCVLOG.USER",messageBusMsg.getRcvlogUsername()));
173 } 174 }
174 if (messageBusMsg.getCreatimeSearch() !=null && !messageBusMsg.getCreatimeSearch().isEmpty() && StringUtils.isNotBlank(messageBusMsg.getCreatimeSearch().get(0)) && StringUtils.isNotBlank(messageBusMsg.getCreatimeSearch().get(1))){ 175 if (messageBusMsg.getCreatimeSearch() !=null && !messageBusMsg.getCreatimeSearch().isEmpty() && StringUtils.isNotBlank(messageBusMsg.getCreatimeSearch().get(0)) && StringUtils.isNotBlank(messageBusMsg.getCreatimeSearch().get(1))){
175 - queryBuilder = queryBuilder.must(QueryBuilders.rangeQuery("ddtm").from(messageBusMsg.getCreatimeSearch().get(0)).to(messageBusMsg.getCreatimeSearch().get(1))); 176 + queryBuilder = queryBuilder.must(QueryBuilders.rangeQuery("MSG.HEADER.DDTM").from(messageBusMsg.getCreatimeSearch().get(0)).to(messageBusMsg.getCreatimeSearch().get(1)));
176 //or条件 177 //or条件
177 - queryBuilder = queryBuilder.should(QueryBuilders.rangeQuery("creatime").from(messageBusMsg.getCreatimeSearch().get(0)).to(messageBusMsg.getCreatimeSearch().get(1))); 178 + queryBuilder = queryBuilder.should(QueryBuilders.rangeQuery("MSG.ADMIN.CREATTIME").from(messageBusMsg.getCreatimeSearch().get(0)).to(messageBusMsg.getCreatimeSearch().get(1)));
178 } 179 }
179 180
180 if (StringUtils.isNotBlank(messageBusMsg.getMsgid())){ 181 if (StringUtils.isNotBlank(messageBusMsg.getMsgid())){
@@ -182,7 +183,7 @@ public class ElasticSearchInfoController { @@ -182,7 +183,7 @@ public class ElasticSearchInfoController {
182 } 183 }
183 //wild搜索大写搜不到小写也搜不到大写的包含,要转成小写 184 //wild搜索大写搜不到小写也搜不到大写的包含,要转成小写
184 if (StringUtils.isNotBlank(messageBusMsg.getAlias_sendContent())){ 185 if (StringUtils.isNotBlank(messageBusMsg.getAlias_sendContent())){
185 - queryBuilder = queryBuilder.must(QueryBuilders.wildcardQuery("msg", "*"+messageBusMsg.getAlias_sendContent().toLowerCase()+"*")); 186 + queryBuilder = queryBuilder.must(QueryBuilders.wildcardQuery("MSG.BODY", "*"+messageBusMsg.getAlias_sendContent().toLowerCase()+"*"));
186 } 187 }
187 log.info("bool查询语句为:{}",queryBuilder); 188 log.info("bool查询语句为:{}",queryBuilder);
188 return queryBuilder; 189 return queryBuilder;
  1 +package com.sunyo.wlpt.message.bus.service.model.es;
  2 +
  3 +import lombok.Data;
  4 +
  5 +import java.util.Date;
  6 +import java.util.List;
  7 +
  8 +@Data
  9 +public class ADMIN {
  10 + private Date CREATTIME;
  11 + private Long MSGID;
  12 + private List<RCVLOG> RCVLOG;
  13 + private List<RCVRS> RCVRS;
  14 +}
  1 +package com.sunyo.wlpt.message.bus.service.model.es;
  2 +
  3 +import lombok.Data;
  4 +
  5 +@Data
  6 +public class HEADER {
  7 + private String DDTM;
  8 + private String OPTYPE;
  9 + private String RCVR;
  10 + private Long SEQNO;
  11 + private String SNDR;
  12 + private String STYPE;
  13 + private String TYPE;
  14 +}
  1 +package com.sunyo.wlpt.message.bus.service.model.es;
  2 +
  3 +import lombok.Data;
  4 +
  5 +/**
  6 + * 存入es中的消息总线实体类
  7 + */
  8 +@Data
  9 +public class MESSAGEBUS {
  10 + private MSG MSG;
  11 + private ADMIN ADMIN;
  12 +}
  1 +package com.sunyo.wlpt.message.bus.service.model.es;
  2 +
  3 +import lombok.Data;
  4 +
  5 +@Data
  6 +public class MSG {
  7 + private Object BODY;
  8 + private HEADER HEADER;
  9 +}
  1 +package com.sunyo.wlpt.message.bus.service.model.es;
  2 +
  3 +import lombok.Data;
  4 +
  5 +import java.util.Date;
  6 +
  7 +@Data
  8 +public class RCVLOG {
  9 + private Date RVTM;
  10 + private String USER;
  11 +}
  1 +package com.sunyo.wlpt.message.bus.service.model.es;
  2 +
  3 +import lombok.Data;
  4 +
  5 +@Data
  6 +public class RCVRS {
  7 + private String USER;
  8 +}
@@ -55,7 +55,7 @@ public class ElasticSearchInfoServiceImpl implements ElasticSearchInfoService { @@ -55,7 +55,7 @@ public class ElasticSearchInfoServiceImpl implements ElasticSearchInfoService {
55 /** 55 /**
56 * 索引名称 56 * 索引名称
57 */ 57 */
58 - private static final String INDEX_NAME = "messagebus"; 58 + private static final String INDEX_NAME = "ebus";
59 59
60 @Override 60 @Override
61 public ResultJson deleteByPrimaryKey(String id) 61 public ResultJson deleteByPrimaryKey(String id)