作者 朱兆平

多线程处理

@@ -4,9 +4,9 @@ @@ -4,9 +4,9 @@
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion> 5 <modelVersion>4.0.0</modelVersion>
6 <packaging>jar</packaging> 6 <packaging>jar</packaging>
7 - <groupId>com.tianbo</groupId>  
8 - <artifactId>messagebus-trans-message</artifactId>  
9 - <version>1.0-parse</version> 7 + <groupId>com.tianbo.messagebus</groupId>
  8 + <artifactId>cdhz-parse</artifactId>
  9 + <version>1.2-parse</version>
10 <description>消息转发服务</description> 10 <description>消息转发服务</description>
11 11
12 <parent> 12 <parent>
@@ -31,7 +31,7 @@ public class MessageTransApplication { @@ -31,7 +31,7 @@ public class MessageTransApplication {
31 @Bean 31 @Bean
32 public TaskScheduler taskScheduler() { 32 public TaskScheduler taskScheduler() {
33 ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); 33 ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
34 - taskScheduler.setPoolSize(100); 34 + taskScheduler.setPoolSize(3);
35 return taskScheduler; 35 return taskScheduler;
36 } 36 }
37 37
@@ -15,7 +15,7 @@ public class KafkaReciveFallback implements KafkaReciveApi { @@ -15,7 +15,7 @@ public class KafkaReciveFallback implements KafkaReciveApi {
15 15
16 @Override 16 @Override
17 public ResultJson<List<String>> recive(String username) { 17 public ResultJson<List<String>> recive(String username) {
18 - log.info("获取消息失败"); 18 + log.info("[FEGIN-ERR]获取消息失败");
19 return new ResultJson<>("400","获取消息失败",new ArrayList<>()); 19 return new ResultJson<>("400","获取消息失败",new ArrayList<>());
20 } 20 }
21 } 21 }
  1 +package com.tianbo.messagebus.service;
  2 +
  3 +
  4 +import com.alibaba.fastjson.JSON;
  5 +import com.alibaba.fastjson.JSONObject;
  6 +import com.tianbo.messagebus.controller.response.ResultJson;
  7 +import com.tianbo.messagebus.model.CUSTOM_RESPONSE;
  8 +import com.tianbo.messagebus.model.CustomReception;
  9 +import com.tianbo.messagebus.model.HEADER;
  10 +import com.tianbo.messagebus.myinterface.KafkaReciveApi;
  11 +import lombok.extern.slf4j.Slf4j;
  12 +import org.springframework.beans.factory.annotation.Autowired;
  13 +import org.springframework.beans.factory.annotation.Value;
  14 +import org.springframework.scheduling.annotation.Async;
  15 +import org.springframework.scheduling.annotation.EnableAsync;
  16 +import org.springframework.scheduling.annotation.Scheduled;
  17 +import org.springframework.stereotype.Service;
  18 +import org.springframework.util.StringUtils;
  19 +import java.util.List;
  20 +
  21 +@Service
  22 +@Slf4j
  23 +@EnableAsync
  24 +public class Custom_Response_Processor {
  25 +
  26 +
  27 + /**
  28 + * 账号名
  29 + */
  30 + @Value("${message-bus.auth.username}")
  31 + private String USER_NAME;
  32 +
  33 + @Autowired
  34 + KafkaReciveApi kafkaReciveApi;
  35 +
  36 + @Autowired
  37 + Custom_Response_Service custom_response_service;
  38 +
  39 +
  40 + /**
  41 + * feigin从服务直接获取消息
  42 + */
  43 + @Async
  44 + @Scheduled(fixedRate = 5000)
  45 + public void getDataFromFeigin(){
  46 + try{
  47 + //初始化数据库
  48 + CUSTOM_RESPONSE test = custom_response_service.selectByPrimaryKey("111");
  49 + log.info("1-开始执行获取任务,获取账号为:{}",USER_NAME);
  50 + if(!StringUtils.isEmpty(USER_NAME)){
  51 + analysis();
  52 + }
  53 + }catch (Exception e){
  54 + log.error("[MAIN-ERR]!!!处理消息出错:{}!!!",e.toString());
  55 + e.printStackTrace();
  56 + }
  57 +
  58 +
  59 + }
  60 +
  61 + public void analysis(){
  62 + ResultJson<List<String>> listResultJson = kafkaReciveApi.recive(USER_NAME);
  63 + log.info("2-获取结果为:{},数量为:{}",listResultJson.toString(),listResultJson.getData().size());
  64 + if ("200".equals(listResultJson.getCode()) && listResultJson.getData()!=null && listResultJson.getData().size()>0){
  65 + log.info("3-开始处理获取数据");
  66 + List<String> dataList = listResultJson.getData();
  67 + for (int i = 0; i <dataList.size() ; i++) {
  68 + String msg = dataList.get(i);
  69 + try{
  70 + analysisMessage(msg,i);
  71 + }catch (Exception e){
  72 + log.error("[Analysis-ERR]!!!{}报文内容解析异常:{},开始处理下一条消息!!!",msg,e.toString());
  73 + e.printStackTrace();
  74 + }
  75 +
  76 + }
  77 + }else {
  78 + log.info("[CONSUMER-RESULT] - 未获取到消息");
  79 + }
  80 + }
  81 +
  82 + public void analysisMessage(String msg,int i){
  83 + if (!StringUtils.isEmpty(msg)){
  84 + log.info("4-循环处理消息[{}]--->{}<---",i,msg);
  85 + JSONObject rootJson = JSON.parseObject(msg);
  86 + JSONObject msgJson = rootJson.getJSONObject("MSG");
  87 +
  88 + //回执实体
  89 + JSONObject body = msgJson.getJSONObject("BODY");
  90 +
  91 + //报头
  92 + HEADER msgHeader = msgJson.getObject("HEADER",HEADER.class);
  93 +
  94 + //判断类型
  95 + if ("CDHZ".equals(msgHeader.getSTYPE())){
  96 + analysisHZ(body);
  97 + }
  98 + }else {
  99 + log.error("[MSG-ERR]消息为空");
  100 + }
  101 +
  102 + }
  103 +
  104 + public void analysisHZ(JSONObject body){
  105 + log.info("@[一]@消息为舱单回执");
  106 + //判断回执类型
  107 + JSONObject manifest = body.getJSONObject("Manifest");
  108 + JSONObject head = manifest.getJSONObject("Head");
  109 +
  110 + if(head != null){
  111 + log.info("@[.]开始回执报头解析");
  112 + String messageType = head.getString("MessageType");
  113 + String messageID = head.getString("MessageID");
  114 + String sendTime = head.getString("SendTime");
  115 + String senderID = head.getString("SenderID");
  116 + String receiverID = head.getString("ReceiverID");
  117 + Integer version = head.getInteger("Version");
  118 + Integer functionCode = head.getInteger("FunctionCode");
  119 + log.info("@[MessageType:{}]回执报头解析完毕",messageType);
  120 +
  121 + if("MT2201".equals(messageType) || "MT9999".equals(messageType) || "MT3201".equals(messageType)){
  122 +
  123 + analysisBody(messageType,manifest,messageID,sendTime,senderID,receiverID,version,functionCode);
  124 + }
  125 + }else {
  126 + log.info("@[四零一]@缺少Manifest或Head节点");
  127 + }
  128 + }
  129 +
  130 + public void analysisBody(String messageType,
  131 + JSONObject manifest,
  132 + String messageID,
  133 + String sendTime,
  134 + String senderID,
  135 + String receiverID,
  136 + Integer version,
  137 + Integer functionCode
  138 + ){
  139 + log.info("@[二]@开始解析:{}",messageType);
  140 + CUSTOM_RESPONSE custom_response_nmms2 = new CUSTOM_RESPONSE();
  141 + messageType = "MT2201";
  142 + // 航班信息
  143 + JSONObject response = manifest.getJSONObject("Response");
  144 + if (response!=null){
  145 + JSONObject borderTransportMeans = response.getJSONObject("BorderTransportMeans");
  146 + if (borderTransportMeans!=null ){
  147 + String flightNo = "UNKONW";
  148 + String flightDate = "20101010";
  149 +
  150 + String journeyid = borderTransportMeans.getString("JourneyID");
  151 +
  152 + //运单信息
  153 + JSONObject consignment = response.getJSONObject("Consignment");
  154 + if (consignment!=null){
  155 + JSONObject responseType = consignment.getJSONObject("ResponseType");
  156 + JSONObject transportContractDocument = consignment.getJSONObject("TransportContractDocument");
  157 + JSONObject associatedTransportDocument = consignment.getJSONObject("AssociatedTransportDocument");
  158 +
  159 + Integer responseCode = 3;
  160 + String responseText = "回执报文未有信息";
  161 +
  162 + if (responseType!=null){
  163 + //回执代码
  164 + responseCode = responseType.getIntValue("Code");
  165 + //回执内容
  166 + responseText = responseType.getString("Text");
  167 +
  168 + }
  169 + String waybillMaster = "00000000000";
  170 + if (transportContractDocument!=null){
  171 + waybillMaster = transportContractDocument.getString("ID");
  172 + }
  173 +
  174 +
  175 + String waybillSecond="";
  176 +
  177 + if (associatedTransportDocument!=null){
  178 + waybillSecond = associatedTransportDocument.getString("ID");
  179 + }
  180 +
  181 +
  182 + CustomReception customReception = new CustomReception( messageType,
  183 + flightNo,
  184 + flightDate,
  185 + waybillMaster,
  186 + waybillSecond,
  187 + String.valueOf(responseCode),
  188 + responseText,
  189 + messageID,
  190 + sendTime,
  191 + senderID,
  192 + receiverID,
  193 + String.valueOf(version),
  194 + String.valueOf(functionCode));
  195 +
  196 +
  197 + /**
  198 + * 如果回执中没有携带航班信息节点,说明是出错报文
  199 + * 到发送日志表根据messageid 找到相应的发送日志报文的航班及运单信息,再进行解析
  200 + */
  201 + if(!StringUtils.isEmpty(journeyid)){
  202 +
  203 + String[] flightList = journeyid.split("/");
  204 + if(flightList.length > 0){
  205 + flightNo = flightList[0];
  206 + flightDate = flightList[1];
  207 +
  208 + log.info("@[三]@航班信息为:{}",journeyid);
  209 + customReception.setFlightNo(flightNo);
  210 + customReception.setFlightDate(flightDate);
  211 + }
  212 + custom_response_nmms2 = new CUSTOM_RESPONSE(customReception);
  213 + }else {
  214 + custom_response_nmms2 = new CUSTOM_RESPONSE(customReception);
  215 + custom_response_nmms2 = custom_response_service.getWaybillInfoByCutomResponse(custom_response_nmms2);
  216 + }
  217 + log.info("[(三.一)]{{}",custom_response_nmms2);
  218 +
  219 + int ii = custom_response_service.secondAnalysisReception(custom_response_nmms2);
  220 +
  221 + log.info("@[四]@回执解析完毕[{}]\n@@^PARSE SUCCESS^@@",ii);
  222 + }else {
  223 + log.info("@[四零零]@回执报文没有运单节点,解析失败.");
  224 + }
  225 + }else {
  226 + log.info("@[四零三]缺少航班信息节点,解析失败");
  227 + }
  228 +
  229 +
  230 + }else {
  231 + log.info("@[四零二]缺少回执内容节点,解析失败");
  232 + }
  233 + }
  234 +}
@@ -397,11 +397,9 @@ public class MessageBusProcessor { @@ -397,11 +397,9 @@ public class MessageBusProcessor {
397 /** 397 /**
398 * feigin从服务直接获取消息 398 * feigin从服务直接获取消息
399 */ 399 */
400 - @Scheduled(fixedRate = 5000) 400 +// @Scheduled(fixedRate = 5000)
401 public void getDataFromFeigin(){ 401 public void getDataFromFeigin(){
402 -  
403 try{ 402 try{
404 -  
405 //初始化数据库 403 //初始化数据库
406 CUSTOM_RESPONSE test = custom_response_service.selectByPrimaryKey("111"); 404 CUSTOM_RESPONSE test = custom_response_service.selectByPrimaryKey("111");
407 log.info("1-开始执行获取任务,获取账号为:{}",USER_NAME); 405 log.info("1-开始执行获取任务,获取账号为:{}",USER_NAME);
@@ -429,8 +427,11 @@ public class MessageBusProcessor { @@ -429,8 +427,11 @@ public class MessageBusProcessor {
429 log.info("@[一]@消息为舱单回执"); 427 log.info("@[一]@消息为舱单回执");
430 CUSTOM_RESPONSE custom_response_nmms2 = new CUSTOM_RESPONSE(); 428 CUSTOM_RESPONSE custom_response_nmms2 = new CUSTOM_RESPONSE();
431 //判断回执类型 429 //判断回执类型
432 - JSONObject Manifest = body.getJSONObject("Manifest");  
433 - JSONObject head = Manifest.getJSONObject("Head"); 430 + JSONObject manifest = body.getJSONObject("Manifest");
  431 + JSONObject head = manifest.getJSONObject("Head");
  432 +
  433 + if(head != null){
  434 + log.info("@[.]开始回执报头解析");
434 String messageType = head.getString("MessageType"); 435 String messageType = head.getString("MessageType");
435 String messageID = head.getString("MessageID"); 436 String messageID = head.getString("MessageID");
436 String sendTime = head.getString("SendTime"); 437 String sendTime = head.getString("SendTime");
@@ -438,35 +439,43 @@ public class MessageBusProcessor { @@ -438,35 +439,43 @@ public class MessageBusProcessor {
438 String receiverID = head.getString("ReceiverID"); 439 String receiverID = head.getString("ReceiverID");
439 Integer version = head.getInteger("Version"); 440 Integer version = head.getInteger("Version");
440 Integer functionCode = head.getInteger("FunctionCode"); 441 Integer functionCode = head.getInteger("FunctionCode");
  442 + log.info("@[MessageType:{}]回执报头解析完毕",messageType);
441 443
442 -  
443 - if("MT2201".equals(messageType) || "MT9999".equals(messageType)){ 444 + if("MT2201".equals(messageType) || "MT9999".equals(messageType) || "MT3201".equals(messageType)){
444 log.info("@[二]@开始解析:{}",messageType); 445 log.info("@[二]@开始解析:{}",messageType);
  446 + messageType = "MT2201";
445 // 航班信息 447 // 航班信息
446 - JSONObject response = Manifest.getJSONObject("Response"); 448 + JSONObject response = manifest.getJSONObject("Response");
  449 + if (response!=null){
447 JSONObject borderTransportMeans = response.getJSONObject("BorderTransportMeans"); 450 JSONObject borderTransportMeans = response.getJSONObject("BorderTransportMeans");
448 - 451 + if (borderTransportMeans!=null ){
449 String flightNo = "UNKONW"; 452 String flightNo = "UNKONW";
450 String flightDate = "20101010"; 453 String flightDate = "20101010";
451 454
452 String journeyid = borderTransportMeans.getString("JourneyID"); 455 String journeyid = borderTransportMeans.getString("JourneyID");
453 456
454 -  
455 -  
456 -  
457 //运单信息 457 //运单信息
458 JSONObject consignment = response.getJSONObject("Consignment"); 458 JSONObject consignment = response.getJSONObject("Consignment");
459 - 459 + if (consignment!=null){
460 JSONObject responseType = consignment.getJSONObject("ResponseType"); 460 JSONObject responseType = consignment.getJSONObject("ResponseType");
461 JSONObject transportContractDocument = consignment.getJSONObject("TransportContractDocument"); 461 JSONObject transportContractDocument = consignment.getJSONObject("TransportContractDocument");
462 JSONObject associatedTransportDocument = consignment.getJSONObject("AssociatedTransportDocument"); 462 JSONObject associatedTransportDocument = consignment.getJSONObject("AssociatedTransportDocument");
463 463
  464 + Integer responseCode = 3;
  465 + String responseText = "回执报文未有信息";
  466 +
  467 + if (responseType!=null){
464 //回执代码 468 //回执代码
465 - Integer responseCode = responseType.getIntValue("Code"); 469 + responseCode = responseType.getIntValue("Code");
466 //回执内容 470 //回执内容
467 - String responseText = responseType.getString("Text"); 471 + responseText = responseType.getString("Text");
  472 +
  473 + }
  474 + String waybillMaster = "00000000000";
  475 + if (transportContractDocument!=null){
  476 + waybillMaster = transportContractDocument.getString("ID");
  477 + }
468 478
469 - Long waybillMaster = transportContractDocument.getLong("ID");  
470 479
471 String waybillSecond=""; 480 String waybillSecond="";
472 481
@@ -515,15 +524,31 @@ public class MessageBusProcessor { @@ -515,15 +524,31 @@ public class MessageBusProcessor {
515 int ii = custom_response_service.secondAnalysisReception(custom_response_nmms2); 524 int ii = custom_response_service.secondAnalysisReception(custom_response_nmms2);
516 525
517 log.info("@[四]@回执解析完毕[{}]\n@@^PARSE SUCCESS^@@",ii); 526 log.info("@[四]@回执解析完毕[{}]\n@@^PARSE SUCCESS^@@",ii);
518 - 527 + }else {
  528 + log.info("@[四零零]@回执报文没有运单节点,解析失败.");
519 } 529 }
  530 + }else {
  531 + log.info("@[四零三]缺少航班信息节点,解析失败");
520 } 532 }
  533 +
  534 +
  535 + }else {
  536 + log.info("@[四零二]缺少回执内容节点,解析失败");
521 } 537 }
  538 +
522 } 539 }
  540 + }else {
  541 + log.info("@[四零一]@缺少Manifest或Head节点");
523 } 542 }
524 543
525 544
  545 +
  546 + }
  547 + }
  548 + }
  549 + }
526 }catch (Exception e){ 550 }catch (Exception e){
  551 + log.info("!!!回执解析异常:{}!!!",e.toString());
527 log.error("!!!处理消息出错:{}!!!",e.toString()); 552 log.error("!!!处理消息出错:{}!!!",e.toString());
528 e.printStackTrace(); 553 e.printStackTrace();
529 } 554 }