作者 朱兆平

在eureka注册下的feign直接调用的fallback异常处理优化

@@ -62,7 +62,7 @@ spring: @@ -62,7 +62,7 @@ spring:
62 druid: 62 druid:
63 initial-size: 10 63 initial-size: 10
64 min-idle: 10 64 min-idle: 10
65 - max-active: 20 65 + max-active: 40
66 #获取连接等待超时时间 66 #获取连接等待超时时间
67 max-wait: 60000 67 max-wait: 60000
68 #一个连接在池中最小生存的时间 68 #一个连接在池中最小生存的时间
@@ -149,16 +149,23 @@ jwt: @@ -149,16 +149,23 @@ jwt:
149 message-bus: 149 message-bus:
150 url: 150 url:
151 #账户登录地址 151 #账户登录地址
152 - login-url: http://36.134.143.171:8083/api/zz-uaa/common/bus/login 152 + #login-url: http://36.134.143.171:8083/api/zz-uaa/common/bus/login
153 #心跳地址 153 #心跳地址
154 - hearbit-url: http://36.134.143.171:8083/api/cloud-user-center-heartbeat/heartbeat 154 + #hearbit-url: http://36.134.143.171:8083/api/cloud-user-center-heartbeat/heartbeat
155 #报文发送地址 155 #报文发送地址
156 - send-url: http://36.134.143.171:8083/api/kafka-server-producer/kafka/send 156 + #send-url: http://36.134.143.171:8083/api/kafka-server-producer/kafka/send
157 #报文接收地址 157 #报文接收地址
158 - get-url: http://36.134.143.171:8083/api/kafka-server-consumer/kafka/receive 158 + #get-url: http://36.134.143.171:8083/api/kafka-server-consumer/kafka/receive
  159 + login-url: https://www.zzecargo.com/api/zz-uaa/common/bus/login
  160 + #心跳地址
  161 + hearbit-url: https://www.zzecargo.com/api/cloud-user-center-heartbeat/heartbeat
  162 + #报文发送地址
  163 + send-url: https://www.zzecargo.com/api/kafka-server-producer/kafka/send
  164 + #报文接收地址
  165 + get-url: https://www.zzecargo.com/api/kafka-server-consumer/kafka/receive
159 auth: 166 auth:
160 - username: CETC  
161 - password: 111111 167 + username: NMMS
  168 + password: vmvnv1v2
162 #心跳间隔时间默认10秒,单位毫秒 169 #心跳间隔时间默认10秒,单位毫秒
163 heartbit-interval: 10000 170 heartbit-interval: 10000
164 info: 171 info:
@@ -170,6 +177,22 @@ feign: @@ -170,6 +177,22 @@ feign:
170 client: 177 client:
171 config: 178 config:
172 default: 179 default:
  180 + readTimeout: 30000
  181 + connectTimeout: 100000
173 logger-level: FULL 182 logger-level: FULL
  183 + httpclient:
  184 + connection-timeout: 6000
  185 +ribbon:
  186 + ###指的是建立连接所用的时间,适用于网络状况正常的情况下,两端连接所用的时间。
  187 + ReadTimeout: 500000
  188 + ###指的是建立连接后从服务器读取到可用资源所用的时间。
  189 + ConnectTimeout: 50000
  190 +hystrix:
  191 + command:
  192 + default:
  193 + execution:
  194 + isolation:
  195 + thread:
  196 + timeoutInMilliseconds: 10000 # 设置hystrix的超时时间为6000ms
174 197
175 198
@@ -6,8 +6,8 @@ @@ -6,8 +6,8 @@
6 <packaging>jar</packaging> 6 <packaging>jar</packaging>
7 <groupId>com.tianbo.messagebus</groupId> 7 <groupId>com.tianbo.messagebus</groupId>
8 <artifactId>cdhz-parse</artifactId> 8 <artifactId>cdhz-parse</artifactId>
9 - <version>1.3-parse</version>  
10 - <description>消息转发服务</description> 9 + <version>1.4-parse</version>
  10 + <description>回执解析服务</description>
11 11
12 <parent> 12 <parent>
13 <groupId>org.springframework.boot</groupId> 13 <groupId>org.springframework.boot</groupId>
1 package com.tianbo.messagebus.myinterface; 1 package com.tianbo.messagebus.myinterface;
2 2
3 import com.tianbo.messagebus.controller.response.ResultJson; 3 import com.tianbo.messagebus.controller.response.ResultJson;
4 -import feign.Headers;  
5 -import feign.Param;  
6 import org.springframework.cloud.openfeign.FeignClient; 4 import org.springframework.cloud.openfeign.FeignClient;
7 import org.springframework.web.bind.annotation.*; 5 import org.springframework.web.bind.annotation.*;
8 6
9 import java.util.List; 7 import java.util.List;
10 8
11 @FeignClient(name = "kafka-server-consumer", 9 @FeignClient(name = "kafka-server-consumer",
12 - fallback = KafkaReciveFallback.class ) 10 + fallbackFactory = KafkaReciveFallbackFactory.class )
13 public interface KafkaReciveApi { 11 public interface KafkaReciveApi {
14 12
15 @ResponseBody 13 @ResponseBody
@@ -10,7 +10,7 @@ import java.util.ArrayList; @@ -10,7 +10,7 @@ import java.util.ArrayList;
10 import java.util.List; 10 import java.util.List;
11 11
12 @Slf4j 12 @Slf4j
13 -@Service("myKafkaRecive") 13 +//@Service("myKafkaRecive")
14 public class KafkaReciveFallback implements KafkaReciveApi { 14 public class KafkaReciveFallback implements KafkaReciveApi {
15 15
16 @Override 16 @Override
  1 +package com.tianbo.messagebus.myinterface;
  2 +
  3 +import com.tianbo.messagebus.controller.response.ResultJson;
  4 +import feign.hystrix.FallbackFactory;
  5 +import lombok.extern.slf4j.Slf4j;
  6 +import org.springframework.stereotype.Component;
  7 +
  8 +import java.util.ArrayList;
  9 +import java.util.List;
  10 +
  11 +@Component
  12 +@Slf4j
  13 +public class KafkaReciveFallbackFactory implements FallbackFactory<KafkaReciveApi> {
  14 +
  15 + @Override
  16 + public KafkaReciveApi create(Throwable cause) {
  17 + log.error("获取消费者消息接口调用出错",cause);
  18 + return new KafkaReciveApi() {
  19 + @Override
  20 + public ResultJson<List<String>> recive(String username) {
  21 + log.info("[FEGIN-ERR]获取消息失败");
  22 + return new ResultJson<>("400","获取消息失败",new ArrayList<>());
  23 + }
  24 + };
  25 + }
  26 +}
@@ -64,8 +64,9 @@ public class Custom_Response_Processor { @@ -64,8 +64,9 @@ public class Custom_Response_Processor {
64 public void analysis(){ 64 public void analysis(){
65 try { 65 try {
66 ResultJson<List<String>> listResultJson = kafkaReciveApi.recive(USER_NAME); 66 ResultJson<List<String>> listResultJson = kafkaReciveApi.recive(USER_NAME);
67 - log.info("2-获取结果为:{},数量为:{}",listResultJson.toString(),listResultJson.getData().size()); 67 + log.info("2-获取结果为:{}",listResultJson.toString());
68 if ("200".equals(listResultJson.getCode()) && listResultJson.getData()!=null && !listResultJson.getData().isEmpty()){ 68 if ("200".equals(listResultJson.getCode()) && listResultJson.getData()!=null && !listResultJson.getData().isEmpty()){
  69 + log.info("2-获取数量为:{}",listResultJson.getData().size());
69 responseResolve(listResultJson); 70 responseResolve(listResultJson);
70 }else { 71 }else {
71 log.info("[CONSUMER-RESULT] - 未获取到消息,code:{},msg:{},data:{}",listResultJson.getCode(),listResultJson.getMsg(),listResultJson.getData()); 72 log.info("[CONSUMER-RESULT] - 未获取到消息,code:{},msg:{},data:{}",listResultJson.getCode(),listResultJson.getMsg(),listResultJson.getData());
@@ -86,7 +87,7 @@ public class Custom_Response_Processor { @@ -86,7 +87,7 @@ public class Custom_Response_Processor {
86 for (int i = 0; i <dataList.size() ; i++) { 87 for (int i = 0; i <dataList.size() ; i++) {
87 String msg = dataList.get(i); 88 String msg = dataList.get(i);
88 try{ 89 try{
89 - log.info("3.1-开始解析数据-[{}]",msg); 90 + log.info("[LOOP-START]-3.1-开始解析数据-[{}]",msg);
90 analysisMessage(msg,i); 91 analysisMessage(msg,i);
91 }catch (Exception e){ 92 }catch (Exception e){
92 log.error("[Resolve-ERR]!!!{}报文内容解析异常:{},开始处理下一条消息!!!",msg,e.toString()); 93 log.error("[Resolve-ERR]!!!{}报文内容解析异常:{},开始处理下一条消息!!!",msg,e.toString());
@@ -97,14 +98,14 @@ public class Custom_Response_Processor { @@ -97,14 +98,14 @@ public class Custom_Response_Processor {
97 } 98 }
98 99
99 public void analysisMessage(String msg,int i){ 100 public void analysisMessage(String msg,int i){
  101 + log.info("[MSG]-4-循环处理消息[{}]--->{}<---",i,msg);
100 if (!StringUtils.isEmpty(msg)){ 102 if (!StringUtils.isEmpty(msg)){
101 - log.info("4-循环处理消息[{}]--->{}<---",i,msg);  
102 JSONObject rootJson = JSON.parseObject(msg); 103 JSONObject rootJson = JSON.parseObject(msg);
103 JSONObject msgJson = rootJson.getJSONObject("MSG"); 104 JSONObject msgJson = rootJson.getJSONObject("MSG");
104 //报头 105 //报头
105 HEADER msgHeader = msgJson.getObject("HEADER",HEADER.class); 106 HEADER msgHeader = msgJson.getObject("HEADER",HEADER.class);
106 107
107 - log.info("4.1-消息序号[SEQN]:{},[DDTM]:{}",msgHeader.getSEQNO(),msgHeader.getDDTM()); 108 + log.info("[MSG]-4.1-消息序号[SEQN]:{},[DDTM]:{}",msgHeader.getSEQNO(),msgHeader.getDDTM());
108 109
109 //回执实体 110 //回执实体
110 JSONObject body = msgJson.getJSONObject("BODY"); 111 JSONObject body = msgJson.getJSONObject("BODY");
@@ -122,13 +123,13 @@ public class Custom_Response_Processor { @@ -122,13 +123,13 @@ public class Custom_Response_Processor {
122 } 123 }
123 124
124 public void analysisHZ(JSONObject body){ 125 public void analysisHZ(JSONObject body){
125 - log.info("@[一]@消息为舱单回执"); 126 + log.info("[CDHZ]-@[一]@消息为舱单回执");
126 //判断回执类型 127 //判断回执类型
127 JSONObject manifest = body.getJSONObject("Manifest"); 128 JSONObject manifest = body.getJSONObject("Manifest");
128 JSONObject head = manifest.getJSONObject("Head"); 129 JSONObject head = manifest.getJSONObject("Head");
129 130
130 if(head != null){ 131 if(head != null){
131 - log.info("@[.]开始回执报头解析"); 132 + log.info("[CDHZ]-@[.]开始回执报头解析");
132 String messageType = head.getString("MessageType"); 133 String messageType = head.getString("MessageType");
133 String messageID = head.getString("MessageID"); 134 String messageID = head.getString("MessageID");
134 String sendTime = head.getString("SendTime"); 135 String sendTime = head.getString("SendTime");
@@ -136,14 +137,14 @@ public class Custom_Response_Processor { @@ -136,14 +137,14 @@ public class Custom_Response_Processor {
136 String receiverID = head.getString("ReceiverID"); 137 String receiverID = head.getString("ReceiverID");
137 Integer version = head.getInteger("Version"); 138 Integer version = head.getInteger("Version");
138 Integer functionCode = head.getInteger("FunctionCode"); 139 Integer functionCode = head.getInteger("FunctionCode");
139 - log.info("@[MessageType:{}]回执报头解析完毕",messageType); 140 + log.info("[CDHZ]-@[MessageType:{}]回执报头解析完毕",messageType);
140 141
141 if("MT2201".equals(messageType) || "MT9999".equals(messageType) || "MT3201".equals(messageType)){ 142 if("MT2201".equals(messageType) || "MT9999".equals(messageType) || "MT3201".equals(messageType)){
142 143
143 analysisBody(messageType,manifest,messageID,sendTime,senderID,receiverID,version,functionCode); 144 analysisBody(messageType,manifest,messageID,sendTime,senderID,receiverID,version,functionCode);
144 } 145 }
145 }else { 146 }else {
146 - log.info("@[四零一]@缺少Manifest或Head节点"); 147 + log.info("[CDHZ]-@[四零一]@缺少Manifest或Head节点");
147 } 148 }
148 } 149 }
149 150