作者 朱兆平

优化重发校验

@@ -27,7 +27,7 @@ public class MessageTransApplication { @@ -27,7 +27,7 @@ public class MessageTransApplication {
27 @Bean 27 @Bean
28 public TaskScheduler taskScheduler() { 28 public TaskScheduler taskScheduler() {
29 ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); 29 ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
30 - taskScheduler.setPoolSize(10); 30 + taskScheduler.setPoolSize(3);
31 return taskScheduler; 31 return taskScheduler;
32 } 32 }
33 33
@@ -3,6 +3,7 @@ package com.tianbo.messagebus.controller; @@ -3,6 +3,7 @@ package com.tianbo.messagebus.controller;
3 import com.alibaba.fastjson.JSON; 3 import com.alibaba.fastjson.JSON;
4 import com.alibaba.fastjson.JSONObject; 4 import com.alibaba.fastjson.JSONObject;
5 import com.tianbo.messagebus.service.MessageBusProcessor; 5 import com.tianbo.messagebus.service.MessageBusProcessor;
  6 +import lombok.extern.slf4j.Slf4j;
6 import org.apache.commons.lang.StringUtils; 7 import org.apache.commons.lang.StringUtils;
7 import org.springframework.beans.factory.annotation.Autowired; 8 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.web.bind.annotation.PostMapping; 9 import org.springframework.web.bind.annotation.PostMapping;
@@ -16,6 +17,7 @@ import java.io.Serializable; @@ -16,6 +17,7 @@ import java.io.Serializable;
16 import java.util.Date; 17 import java.util.Date;
17 18
18 @RestController() 19 @RestController()
  20 +@Slf4j
19 @RequestMapping("/") 21 @RequestMapping("/")
20 public class HeartbeatController implements Serializable { 22 public class HeartbeatController implements Serializable {
21 private static final long serialVersionUID = 1L; 23 private static final long serialVersionUID = 1L;
@@ -34,6 +36,7 @@ public class HeartbeatController implements Serializable { @@ -34,6 +36,7 @@ public class HeartbeatController implements Serializable {
34 36
35 @PostMapping("getmsg") 37 @PostMapping("getmsg")
36 public void getmsg(){ 38 public void getmsg(){
  39 + log.error("for test");
37 messageBusDemo.getMsg(); 40 messageBusDemo.getMsg();
38 } 41 }
39 42
@@ -13,6 +13,8 @@ import org.springframework.stereotype.Service; @@ -13,6 +13,8 @@ import org.springframework.stereotype.Service;
13 import org.springframework.util.LinkedMultiValueMap; 13 import org.springframework.util.LinkedMultiValueMap;
14 import org.springframework.util.MultiValueMap; 14 import org.springframework.util.MultiValueMap;
15 import org.springframework.util.StringUtils; 15 import org.springframework.util.StringUtils;
  16 +import org.springframework.web.client.HttpClientErrorException;
  17 +import org.springframework.web.client.RestClientException;
16 import org.springframework.web.client.RestTemplate; 18 import org.springframework.web.client.RestTemplate;
17 19
18 import javax.annotation.Resource; 20 import javax.annotation.Resource;
@@ -69,6 +71,10 @@ public class MessageBusProcessor { @@ -69,6 +71,10 @@ public class MessageBusProcessor {
69 */ 71 */
70 private static Boolean LOGIN_STATUS=false; 72 private static Boolean LOGIN_STATUS=false;
71 73
  74 + /**
  75 + * 失败重发请求次数
  76 + */
  77 + private static final int RETRY_TIMES= 3;
72 78
73 /** 79 /**
74 * HTTP请求框架 80 * HTTP请求框架
@@ -175,7 +181,18 @@ public class MessageBusProcessor { @@ -175,7 +181,18 @@ public class MessageBusProcessor {
175 } 181 }
176 } 182 }
177 183
178 - @Scheduled(fixedRate = 10000) 184 + /**
  185 + * 获取返回信息的状态码
  186 + * @param response response对象
  187 + * @return
  188 + */
  189 + private String getResponseDataCode(ResponseEntity<String> response){
  190 + JSONObject resJson = JSON.parseObject(response.getBody());
  191 + String code = resJson.getString("code");
  192 +
  193 + return code;
  194 + }
  195 + @Scheduled(fixedDelay = 10000)
179 public void heartBit() { 196 public void heartBit() {
180 if (!StringUtils.isEmpty(TOKEN) && LOGIN_STATUS){ 197 if (!StringUtils.isEmpty(TOKEN) && LOGIN_STATUS){
181 /* 198 /*
@@ -200,10 +217,12 @@ public class MessageBusProcessor { @@ -200,10 +217,12 @@ public class MessageBusProcessor {
200 * 提交HTTP访问,获取返回信息 217 * 提交HTTP访问,获取返回信息
201 */ 218 */
202 ResponseEntity<String> response = restTemplate.postForEntity(HEARTBEAT_URL, request, String.class); 219 ResponseEntity<String> response = restTemplate.postForEntity(HEARTBEAT_URL, request, String.class);
  220 +
203 // 输出结果 221 // 输出结果
204 - System.out.println(response.getBody()); 222 + log.info(response.getBody());
  223 +
205 if (response.getStatusCode().equals(HttpStatus.OK)) { 224 if (response.getStatusCode().equals(HttpStatus.OK)) {
206 - log.info("心跳成功"); 225 + log.info("心跳成功,token:[{}]",TOKEN);
207 } else { 226 } else {
208 log.error("心跳失败"); 227 log.error("心跳失败");
209 } 228 }
@@ -217,6 +236,8 @@ public class MessageBusProcessor { @@ -217,6 +236,8 @@ public class MessageBusProcessor {
217 */ 236 */
218 public Boolean sendMsg(MSG msg) { 237 public Boolean sendMsg(MSG msg) {
219 if (LOGIN_STATUS) { 238 if (LOGIN_STATUS) {
  239 + try{
  240 + log.info("开始转发消息:{}",msg.toString());
220 /* 241 /*
221 * 发起HTTP 登录请求 242 * 发起HTTP 登录请求
222 * 登录接口的请求头为application/json 243 * 登录接口的请求头为application/json
@@ -252,20 +273,22 @@ public class MessageBusProcessor { @@ -252,20 +273,22 @@ public class MessageBusProcessor {
252 273
253 HttpEntity<MSGS> request = new HttpEntity<MSGS>(msgs, headers); 274 HttpEntity<MSGS> request = new HttpEntity<MSGS>(msgs, headers);
254 ResponseEntity<String> response = restTemplate.postForEntity(SEND_MSG_URL, request, String.class); 275 ResponseEntity<String> response = restTemplate.postForEntity(SEND_MSG_URL, request, String.class);
255 - 276 + if (response.getStatusCode().equals(HttpStatus.OK)) {
256 JSONObject resJson = JSON.parseObject(response.getBody()); 277 JSONObject resJson = JSON.parseObject(response.getBody());
257 String code = resJson.getString("code"); 278 String code = resJson.getString("code");
258 -  
259 - System.out.println(response.getBody());  
260 -  
261 - if (response.getStatusCode().equals(HttpStatus.OK) && "200".equals(code)) { 279 + if ("200".equals(code)) {
262 log.info("消息发送成功"); 280 log.info("消息发送成功");
263 return true; 281 return true;
264 - } else {  
265 - log.error("消息发送失败"); 282 + }
  283 + }
  284 + log.info("消息发送失败->{}",response.getBody());
  285 + }catch (Exception e){
  286 + log.info("消息发送失败->{},失败原因:{}",msg.toString(),e.toString());
  287 + log.error("消息发送失败->{},失败原因:{}",msg.toString(),e.toString());
266 return false; 288 return false;
267 } 289 }
268 - 290 + }else {
  291 + log.info("未登陆,消息发送失败");
269 } 292 }
270 return false; 293 return false;
271 } 294 }
@@ -276,8 +299,8 @@ public class MessageBusProcessor { @@ -276,8 +299,8 @@ public class MessageBusProcessor {
276 * 299 *
277 * @return 300 * @return
278 */ 301 */
279 - @Async  
280 - @Scheduled(fixedRate = 300) 302 +// @Async
  303 + @Scheduled(fixedRate = 1000)
281 public JSONArray getMsg() { 304 public JSONArray getMsg() {
282 if(!LOGIN_STATUS){ 305 if(!LOGIN_STATUS){
283 login(); 306 login();
@@ -287,6 +310,7 @@ public class MessageBusProcessor { @@ -287,6 +310,7 @@ public class MessageBusProcessor {
287 * 发起HTTP 登录请求 310 * 发起HTTP 登录请求
288 * 登录接口的请求头为application/json 311 * 登录接口的请求头为application/json
289 */ 312 */
  313 + try{
290 HttpHeaders headers = new HttpHeaders(); 314 HttpHeaders headers = new HttpHeaders();
291 headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED); 315 headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
292 headers.setBearerAuth(TOKEN); 316 headers.setBearerAuth(TOKEN);
@@ -302,22 +326,24 @@ public class MessageBusProcessor { @@ -302,22 +326,24 @@ public class MessageBusProcessor {
302 */ 326 */
303 ResponseEntity<String> response = restTemplate.postForEntity(GET_MSG_URL, request, String.class); 327 ResponseEntity<String> response = restTemplate.postForEntity(GET_MSG_URL, request, String.class);
304 // 输出结果 328 // 输出结果
305 - log.info("获取到消息返回---{}---",response.getBody()); 329 +
  330 +
  331 + if (response.getStatusCode().equals(HttpStatus.OK)) {
306 /* 332 /*
307 * 从返回信息中确定是否获取到消息 333 * 从返回信息中确定是否获取到消息
308 */ 334 */
309 JSONObject resJson = JSON.parseObject(response.getBody()); 335 JSONObject resJson = JSON.parseObject(response.getBody());
310 String code = resJson.getString("code"); 336 String code = resJson.getString("code");
311 - if (response.getStatusCode().equals(HttpStatus.OK) && "200".equals(code)) {  
312 - 337 + if ("200".equals(code)){
313 JSONArray data = resJson.getJSONArray("data"); 338 JSONArray data = resJson.getJSONArray("data");
314 - log.info("消息接收成功,接收消息为>>>{}<<<",data.toString()); 339 + log.info("消息接收成功,接收消息数量>>>{}<<<",data.size());
315 340
316 for (int i = 0; i<data.size() ; i++) { 341 for (int i = 0; i<data.size() ; i++) {
317 /* 342 /*
318 取得是大数据小组的实体,他们的msg.body的封装是以对象实体object封装的。不是json字符串。 343 取得是大数据小组的实体,他们的msg.body的封装是以对象实体object封装的。不是json字符串。
319 */ 344 */
320 String msg = data.getObject(i,String.class); 345 String msg = data.getObject(i,String.class);
  346 + log.info("开始转发消息---{}---",msg);
321 JSONObject rootJson = JSON.parseObject(msg); 347 JSONObject rootJson = JSON.parseObject(msg);
322 JSONObject msgJson = rootJson.getJSONObject("MSG"); 348 JSONObject msgJson = rootJson.getJSONObject("MSG");
323 JSONObject body = msgJson.getJSONObject("BODY"); 349 JSONObject body = msgJson.getJSONObject("BODY");
@@ -329,44 +355,54 @@ public class MessageBusProcessor { @@ -329,44 +355,54 @@ public class MessageBusProcessor {
329 transMsg.setHEADER(msgHeader); 355 transMsg.setHEADER(msgHeader);
330 transMsg.setBODY(transBody); 356 transMsg.setBODY(transBody);
331 357
332 - transMsg.toString();  
333 -  
334 /* 358 /*
335 自定义对返回数据的处理 359 自定义对返回数据的处理
336 */ 360 */
337 log.info("开始转发消息"); 361 log.info("开始转发消息");
338 Boolean sendResult = sendMsg(transMsg); 362 Boolean sendResult = sendMsg(transMsg);
339 /** 363 /**
340 - * todo:消息失败处理 364 + * todo:转发消息失败处理
341 */ 365 */
342 if(!sendResult){ 366 if(!sendResult){
  367 + log.error("!!!!!!消息--->{}<---转发失败!!!!!!,尝试重发",transMsg.toString());
343 //todo:消息备份或者重发? 368 //todo:消息备份或者重发?
  369 + reSend(transMsg);
344 } 370 }
345 } 371 }
346 -  
347 -  
348 -  
349 -  
350 return data; 372 return data;
351 - 373 + }
352 } else { 374 } else {
353 log.error("消息获取失败"); 375 log.error("消息获取失败");
354 return new JSONArray(); 376 return new JSONArray();
355 } 377 }
  378 + }catch (Exception httpE){
  379 + log.info("消息获取失败,失败原因:{}",httpE.toString());
  380 + log.error("消息获取失败,失败原因:{}",httpE.toString());
  381 +
  382 + }
  383 + return new JSONArray();
356 } 384 }
357 385
358 /** 386 /**
359 * 读取备份消息并消息重发 387 * 读取备份消息并消息重发
360 * @return 388 * @return
361 */ 389 */
362 - public Boolean reSend(){  
363 - return false; 390 + public void reSend(MSG msg){
  391 + log.error("***进入重发***");
  392 + for (int i = 0; i < RETRY_TIMES; i++) {
  393 + Boolean sendResult = sendMsg(msg);
  394 + if (sendResult){
  395 + log.error("***重发成功***");
  396 + break;
  397 + }
  398 + }
  399 + log.error("***已尝试重发三次,重发失败***");
364 } 400 }
365 401
366 } 402 }
367 403
368 /** 404 /**
369 - * 消息发送实体类 405 + * 消息实体类
370 */ 406 */
371 class MSGS implements Serializable { 407 class MSGS implements Serializable {
372 private MSG MSG; 408 private MSG MSG;
@@ -405,6 +441,11 @@ class MSG { @@ -405,6 +441,11 @@ class MSG {
405 public void setBODY(String BODY) { 441 public void setBODY(String BODY) {
406 this.BODY = BODY; 442 this.BODY = BODY;
407 } 443 }
  444 +
  445 + @Override
  446 + public String toString() {
  447 + return this.BODY;
  448 + }
408 } 449 }
409 450
410 /** 451 /**