diff --git a/config/application.yml b/config/application.yml index ebf4002..e1c4516 100755 --- a/config/application.yml +++ b/config/application.yml @@ -62,7 +62,7 @@ spring: druid: initial-size: 10 min-idle: 10 - max-active: 20 + max-active: 40 #获取连接等待超时时间 max-wait: 60000 #一个连接在池中最小生存的时间 @@ -149,16 +149,23 @@ jwt: message-bus: url: #账户登录地址 - login-url: http://36.134.143.171:8083/api/zz-uaa/common/bus/login + #login-url: http://36.134.143.171:8083/api/zz-uaa/common/bus/login #心跳地址 - hearbit-url: http://36.134.143.171:8083/api/cloud-user-center-heartbeat/heartbeat + #hearbit-url: http://36.134.143.171:8083/api/cloud-user-center-heartbeat/heartbeat #报文发送地址 - send-url: http://36.134.143.171:8083/api/kafka-server-producer/kafka/send + #send-url: http://36.134.143.171:8083/api/kafka-server-producer/kafka/send #报文接收地址 - get-url: http://36.134.143.171:8083/api/kafka-server-consumer/kafka/receive + #get-url: http://36.134.143.171:8083/api/kafka-server-consumer/kafka/receive + login-url: https://www.zzecargo.com/api/zz-uaa/common/bus/login + #心跳地址 + hearbit-url: https://www.zzecargo.com/api/cloud-user-center-heartbeat/heartbeat + #报文发送地址 + send-url: https://www.zzecargo.com/api/kafka-server-producer/kafka/send + #报文接收地址 + get-url: https://www.zzecargo.com/api/kafka-server-consumer/kafka/receive auth: - username: CETC - password: 111111 + username: NMMS + password: vmvnv1v2 #心跳间隔时间默认10秒,单位毫秒 heartbit-interval: 10000 info: @@ -170,6 +177,22 @@ feign: client: config: default: + readTimeout: 30000 + connectTimeout: 100000 logger-level: FULL + httpclient: + connection-timeout: 6000 +ribbon: + ###指的是建立连接所用的时间,适用于网络状况正常的情况下,两端连接所用的时间。 + ReadTimeout: 500000 + ###指的是建立连接后从服务器读取到可用资源所用的时间。 + ConnectTimeout: 50000 +hystrix: + command: + default: + execution: + isolation: + thread: + timeoutInMilliseconds: 10000 # 设置hystrix的超时时间为6000ms diff --git a/pom.xml b/pom.xml index c7c8aeb..7aacfa3 100644 --- a/pom.xml +++ b/pom.xml @@ -6,8 +6,8 @@ <packaging>jar</packaging> <groupId>com.tianbo.messagebus</groupId> <artifactId>cdhz-parse</artifactId> - <version>1.3-parse</version> - <description>消息转发服务</description> + <version>1.4-parse</version> + <description>回执解析服务</description> <parent> <groupId>org.springframework.boot</groupId> diff --git a/src/main/java/com/tianbo/messagebus/myinterface/KafkaReciveApi.java b/src/main/java/com/tianbo/messagebus/myinterface/KafkaReciveApi.java index 2e1a5bd..c56444d 100644 --- a/src/main/java/com/tianbo/messagebus/myinterface/KafkaReciveApi.java +++ b/src/main/java/com/tianbo/messagebus/myinterface/KafkaReciveApi.java @@ -1,15 +1,13 @@ package com.tianbo.messagebus.myinterface; import com.tianbo.messagebus.controller.response.ResultJson; -import feign.Headers; -import feign.Param; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.*; import java.util.List; @FeignClient(name = "kafka-server-consumer", - fallback = KafkaReciveFallback.class ) + fallbackFactory = KafkaReciveFallbackFactory.class ) public interface KafkaReciveApi { @ResponseBody diff --git a/src/main/java/com/tianbo/messagebus/myinterface/KafkaReciveFallback.java b/src/main/java/com/tianbo/messagebus/myinterface/KafkaReciveFallback.java index c83bd9e..6cb95ee 100644 --- a/src/main/java/com/tianbo/messagebus/myinterface/KafkaReciveFallback.java +++ b/src/main/java/com/tianbo/messagebus/myinterface/KafkaReciveFallback.java @@ -10,7 +10,7 @@ import java.util.ArrayList; import java.util.List; @Slf4j -@Service("myKafkaRecive") +//@Service("myKafkaRecive") public class KafkaReciveFallback implements KafkaReciveApi { @Override diff --git a/src/main/java/com/tianbo/messagebus/myinterface/KafkaReciveFallbackFactory.java b/src/main/java/com/tianbo/messagebus/myinterface/KafkaReciveFallbackFactory.java new file mode 100644 index 0000000..a4d372d --- /dev/null +++ b/src/main/java/com/tianbo/messagebus/myinterface/KafkaReciveFallbackFactory.java @@ -0,0 +1,26 @@ +package com.tianbo.messagebus.myinterface; + +import com.tianbo.messagebus.controller.response.ResultJson; +import feign.hystrix.FallbackFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +@Component +@Slf4j +public class KafkaReciveFallbackFactory implements FallbackFactory<KafkaReciveApi> { + + @Override + public KafkaReciveApi create(Throwable cause) { + log.error("获取消费者消息接口调用出错",cause); + return new KafkaReciveApi() { + @Override + public ResultJson<List<String>> recive(String username) { + log.info("[FEGIN-ERR]获取消息失败"); + return new ResultJson<>("400","获取消息失败",new ArrayList<>()); + } + }; + } +} diff --git a/src/main/java/com/tianbo/messagebus/service/Custom_Response_Processor.java b/src/main/java/com/tianbo/messagebus/service/Custom_Response_Processor.java index 1b9d6a9..0bf0dac 100644 --- a/src/main/java/com/tianbo/messagebus/service/Custom_Response_Processor.java +++ b/src/main/java/com/tianbo/messagebus/service/Custom_Response_Processor.java @@ -64,8 +64,9 @@ public class Custom_Response_Processor { public void analysis(){ try { ResultJson<List<String>> listResultJson = kafkaReciveApi.recive(USER_NAME); - log.info("2-获取结果为:{},数量为:{}",listResultJson.toString(),listResultJson.getData().size()); + log.info("2-获取结果为:{}",listResultJson.toString()); if ("200".equals(listResultJson.getCode()) && listResultJson.getData()!=null && !listResultJson.getData().isEmpty()){ + log.info("2-获取数量为:{}",listResultJson.getData().size()); responseResolve(listResultJson); }else { log.info("[CONSUMER-RESULT] - 未获取到消息,code:{},msg:{},data:{}",listResultJson.getCode(),listResultJson.getMsg(),listResultJson.getData()); @@ -86,7 +87,7 @@ public class Custom_Response_Processor { for (int i = 0; i <dataList.size() ; i++) { String msg = dataList.get(i); try{ - log.info("3.1-开始解析数据-[{}]",msg); + log.info("[LOOP-START]-3.1-开始解析数据-[{}]",msg); analysisMessage(msg,i); }catch (Exception e){ log.error("[Resolve-ERR]!!!{}报文内容解析异常:{},开始处理下一条消息!!!",msg,e.toString()); @@ -97,14 +98,14 @@ public class Custom_Response_Processor { } public void analysisMessage(String msg,int i){ + log.info("[MSG]-4-循环处理消息[{}]--->{}<---",i,msg); if (!StringUtils.isEmpty(msg)){ - log.info("4-循环处理消息[{}]--->{}<---",i,msg); JSONObject rootJson = JSON.parseObject(msg); JSONObject msgJson = rootJson.getJSONObject("MSG"); //报头 HEADER msgHeader = msgJson.getObject("HEADER",HEADER.class); - log.info("4.1-消息序号[SEQN]:{},[DDTM]:{}",msgHeader.getSEQNO(),msgHeader.getDDTM()); + log.info("[MSG]-4.1-消息序号[SEQN]:{},[DDTM]:{}",msgHeader.getSEQNO(),msgHeader.getDDTM()); //回执实体 JSONObject body = msgJson.getJSONObject("BODY"); @@ -122,13 +123,13 @@ public class Custom_Response_Processor { } public void analysisHZ(JSONObject body){ - log.info("@[一]@消息为舱单回执"); + log.info("[CDHZ]-@[一]@消息为舱单回执"); //判断回执类型 JSONObject manifest = body.getJSONObject("Manifest"); JSONObject head = manifest.getJSONObject("Head"); if(head != null){ - log.info("@[.]开始回执报头解析"); + log.info("[CDHZ]-@[.]开始回执报头解析"); String messageType = head.getString("MessageType"); String messageID = head.getString("MessageID"); String sendTime = head.getString("SendTime"); @@ -136,14 +137,14 @@ public class Custom_Response_Processor { String receiverID = head.getString("ReceiverID"); Integer version = head.getInteger("Version"); Integer functionCode = head.getInteger("FunctionCode"); - log.info("@[MessageType:{}]回执报头解析完毕",messageType); + log.info("[CDHZ]-@[MessageType:{}]回执报头解析完毕",messageType); if("MT2201".equals(messageType) || "MT9999".equals(messageType) || "MT3201".equals(messageType)){ analysisBody(messageType,manifest,messageID,sendTime,senderID,receiverID,version,functionCode); } }else { - log.info("@[四零一]@缺少Manifest或Head节点"); + log.info("[CDHZ]-@[四零一]@缺少Manifest或Head节点"); } }