作者 朱兆平

update:所有回执进行解析入库到新舱单2.0回执库

FROM java:8u111
VOLUME /tmp
ADD target/*.jar app.jar
EXPOSE 11113
# Ubuntu 时区 同步主机与docker容器时间
RUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
RUN echo "Asia/Shanghai" > /etc/timezone && dpkg-reconfigure -f noninteractive tzdata
ENTRYPOINT ["java","-Xms128m", "-Xmx256m","-jar","/app.jar"]
... ...
... ... @@ -18,7 +18,7 @@ spring:
static-locations: classpath:/META-INF/resources/,classpath:/static,classpath:/resources/,file:${web.upload-path}
application:
name: message-bus-trans-message
name: wlpt2-cdhz-parse
redis:
# host: 127.0.0.1
host: 8.131.245.248
... ... @@ -138,7 +138,8 @@ logging:
name: system.log
config: config/logback-dev.xml
#转移到logback配置文件中
#level:
level:
com.tianbo.messagebus.myinterface: DEBUG
#org.apache.tomcat: info
#com.tianbo.warehouse.dao: DEBUG
#org.springframework.security: trace
... ... @@ -165,8 +166,9 @@ message-bus:
#报文接收地址
get-url: https://www.zzecargo.com/api/kafka-server-consumer/kafka/receive
auth:
username: NMMS
password: vmvnv1v2
username: yangyucheng
password: ZZdsly123!
consumer: NMMS
#心跳间隔时间默认10秒,单位毫秒
heartbit-interval: 10000
info:
... ...
... ... @@ -199,6 +199,7 @@
<springProfile name="dev">
<logger name="com.tianbo.messagebus.service.MessageBusProcessor" level="ALL" />
<logger name="com.tianbo.messagebus.dao" level="DEBUG" />
<logger name="com.tianbo.messagebus.myinterface" level="DEBUG" additivity="false" />
<root level="INFO">
<appender-ref ref="CONSOLE" />
<appender-ref ref="DEBUG_FILE" />
... ...
package com.tianbo.messagebus.bean;
import com.tianbo.messagebus.model.Auth;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
@Configuration
@Slf4j
public class DslyFeignClientConfiguration {
@Bean
public RequestInterceptor requestInterceptor() {
return new RequestInterceptor() {
@Override
public void apply(RequestTemplate template) {
// 替换 "yourBearerTokenValue" 为实际的 Bearer token 值
String token = "Bearer eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiI2MCIsIm5hbWUiOiLlubPlj7DnrqHnkIblkZgiLCJpZCI6NjAsImlhdCI6MTcwMjUyNzI1NywiYWNjb3VudCI6Inlhbmd5dWNoZW5nIn0.MJGCuVMSkjKHUrt35XDM1srDVSXLzZsMVXTGACeAiMw";
template.header("Authorization", token);
log.info("已设置Authorization的值为:{}",token);
log.info("request:{}",template.toString());
template.headers().forEach((name, values) -> {
System.out.println(name + ": " + values);
});
}
};
}
}
... ...
package com.tianbo.messagebus.bean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class ParseThreadConfig {
@Bean("PARSE-POOL")
public TaskExecutor executor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(24);
//配置最大线程数
executor.setMaxPoolSize(128);
//配置队列大小
executor.setQueueCapacity(2000);
//线程的名称前缀
executor.setThreadNamePrefix("PARSE-");
//线程活跃时间(秒)
executor.setKeepAliveSeconds(60);
//等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(false);
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}
... ...
... ... @@ -11,6 +11,11 @@ public class ResultJson<T> implements Serializable{
// 描述
private String msg = "";
// 描述
private String message = "";
private Boolean success;
private String error;
// 返回对象
private T data;
... ... @@ -86,11 +91,32 @@ public class ResultJson<T> implements Serializable{
this.jwtToken = jwtToken;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Boolean getSuccess() {
return success;
}
public void setSuccess(Boolean success) {
this.success = success;
}
@Override
public String toString() {
return "ResultJson{" +
"code='" + code + '\'' +
", msg='" + msg + '\'' +
", message='" + message + '\'' +
", success=" + success +
", error='" + error + '\'' +
", data=" + data +
", jwtToken='" + jwtToken + '\'' +
'}';
}
}
... ...
package com.tianbo.messagebus.model;
public class Auth {
/**
* 存储登录后的token
*/
public static String TOKEN = "";
/**
* 登录成功状态
*/
public static Boolean LOGIN_STATUS=false;
/**
* 失败重发请求次数
*/
public static final int RETRY_TIMES= 10;
}
... ...
package com.tianbo.messagebus.myinterface;
import com.tianbo.messagebus.bean.DslyFeignClientConfiguration;
import com.tianbo.messagebus.controller.response.ResultJson;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@FeignClient(name = "kafka-server-consumer",
fallbackFactory = KafkaReciveFallbackFactory.class )
@FeignClient(
name = "kafka-server-consumer",
url = "https://www.zzecargo.com",
fallbackFactory = KafkaReciveFallbackFactory.class,
configuration = {DslyFeignClientConfiguration.class})
public interface KafkaReciveApi {
@ResponseBody
@RequestMapping(value = "/kafka/receive",method = RequestMethod.GET)
ResultJson<List<String>> recive(@RequestParam(value = "username",required = true) String username);
@ResponseBody
@RequestMapping(value = "/api/kafka-server-consumer/kafka/receive",method = RequestMethod.POST)
ResultJson<List<String>> getMessages(@RequestParam(value = "username",required = true) String username);
}
... ...
package com.tianbo.messagebus.myinterface;
import com.tianbo.messagebus.controller.response.ResultJson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Slf4j
//@Service("myKafkaRecive")
public class KafkaReciveFallback implements KafkaReciveApi {
@Override
public ResultJson<List<String>> recive(String username) {
log.info("[FEGIN-ERR]获取消息失败");
return new ResultJson<>("400","获取消息失败",new ArrayList<>());
}
}
... ... @@ -21,6 +21,12 @@ public class KafkaReciveFallbackFactory implements FallbackFactory<KafkaReciveAp
log.info("[FEGIN-ERR]获取消息失败");
return new ResultJson<>("400","获取消息失败",new ArrayList<>());
}
@Override
public ResultJson<List<String>> getMessages(String username) {
log.info("[FEGIN-ERR]获取消息失败");
return new ResultJson<>("400","获取消息失败",new ArrayList<>());
}
};
}
}
... ...
... ... @@ -4,6 +4,7 @@ package com.tianbo.messagebus.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tianbo.messagebus.controller.response.ResultJson;
import com.tianbo.messagebus.model.Auth;
import com.tianbo.messagebus.model.CUSTOM_RESPONSE;
import com.tianbo.messagebus.model.CustomReception;
import com.tianbo.messagebus.model.HEADER;
... ... @@ -27,8 +28,10 @@ public class Custom_Response_Processor {
/**
* 账号名
*/
@Value("${message-bus.auth.username}")
private String USER_NAME;
@Value("${message-bus.auth.consumer}")
private String CONSUMER_NAME;
@Autowired
KafkaReciveApi kafkaReciveApi;
... ... @@ -41,13 +44,13 @@ public class Custom_Response_Processor {
* feigin从服务直接获取消息
*/
@Async
@Scheduled(fixedRate = 5000)
@Scheduled(fixedRate = 15000)
public void getDataFromFeigin(){
try{
//初始化数据库
CUSTOM_RESPONSE test = custom_response_service.selectByPrimaryKey("111");
log.info("1-开始执行获取任务,获取账号为:{}",USER_NAME);
if(!StringUtils.isEmpty(USER_NAME)){
log.info("1-开始执行获取任务,获取消息账号为:{}",CONSUMER_NAME);
if(!StringUtils.isEmpty(CONSUMER_NAME)){
analysis();
}
}catch (Exception e){
... ... @@ -62,18 +65,22 @@ public class Custom_Response_Processor {
* 通过总线消费服务获取回执数据
*/
public void analysis(){
try {
ResultJson<List<String>> listResultJson = kafkaReciveApi.recive(USER_NAME);
log.info("2-获取结果为:{}",listResultJson.toString());
if ("200".equals(listResultJson.getCode()) && listResultJson.getData()!=null && !listResultJson.getData().isEmpty()){
log.info("2-获取数量为:{}",listResultJson.getData().size());
responseResolve(listResultJson);
}else {
log.info("[CONSUMER-RESULT] - 未获取到消息,code:{},msg:{},data:{}",listResultJson.getCode(),listResultJson.getMsg(),listResultJson.getData());
if (!StringUtils.isEmpty(Auth.TOKEN)){
try {
ResultJson<List<String>> listResultJson = kafkaReciveApi.getMessages(CONSUMER_NAME);
log.info("2-获取结果为:{}",listResultJson.toString());
if ("200".equals(listResultJson.getCode()) && listResultJson.getData()!=null && !listResultJson.getData().isEmpty()){
log.info("2-获取数量为:{}",listResultJson.getData().size());
responseResolve(listResultJson);
}else {
log.info("[CONSUMER-RESULT] - 未获取到消息,code:{},msg:{},data:{}",listResultJson.getCode(),listResultJson.getMsg(),listResultJson.getData());
}
} catch (Exception e) {
log.error("[CONSUMER-ERR]!!!获取消息异常,开始获取下一条消息!!!->{}",e.toString());
e.printStackTrace();
}
} catch (Exception e) {
log.error("[CONSUMER-ERR]!!!获取消息异常,开始获取下一条消息!!!->{}",e.toString());
e.printStackTrace();
}else{
log.error("[CONSUMER-ERR]:未登录,无法获取消息");
}
}
... ... @@ -141,8 +148,10 @@ public class Custom_Response_Processor {
if("MT2201".equals(messageType) || "MT9999".equals(messageType) || "MT3201".equals(messageType)){
analysisBody(messageType,manifest,messageID,sendTime,senderID,receiverID,version,functionCode);
}
//所有回执类型都解析
analysisBody(messageType,manifest,messageID,sendTime,senderID,receiverID,version,functionCode);
}else {
log.info("[CDHZ]-@[四零一]@缺少Manifest或Head节点");
}
... ... @@ -159,7 +168,7 @@ public class Custom_Response_Processor {
){
log.info("@[二]@开始解析:{}",messageType);
CUSTOM_RESPONSE custom_response_nmms2 = new CUSTOM_RESPONSE();
messageType = "MT2201";
// messageType = "MT2201";
// 航班信息
JSONObject response = manifest.getJSONObject("Response");
if (response!=null){
... ...
package com.tianbo.messagebus.service;
/**
* @author mrz
*/
public interface LoginService {
/**
* 发起登录,存储token
*
* @return
*/
void login();
/**
* 定时心跳,维持在线状态,每10秒访问一次心跳接口
*/
void heartBit();
/**
* 开始心跳
*/
void startHeartBit();
}
... ...
package com.tianbo.messagebus.service.impl;
import com.tianbo.messagebus.service.LoginService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class LoginCommandRunner implements CommandLineRunner {
@Autowired
LoginService loginService;
@Override
public void run(String... args) throws Exception {
log.info("start loggin");
loginService.login();
// log.info("[HEART-BIT]-start heartBit");
// loginService.startHeartBit();
}
}
... ...
package com.tianbo.messagebus.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tianbo.messagebus.model.Auth;
import com.tianbo.messagebus.service.LoginService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
@Service
@Slf4j
public class LoginServiceImpl implements LoginService {
/** 账户登录地址
*/
@Value("${message-bus.url.login-url}")
private String LOGIN_URL;
/**
* 账号名
*/
@Value("${message-bus.auth.username}")
private String USER_NAME;
// private static final String USER_NAME = "HYYW";
/**
* 登陆密码
*/
@Value("${message-bus.auth.password}")
private String USER_PASS;
/**
* 心跳接口地址
*/
@Value("${message-bus.url.hearbit-url}")
private String HEARTBEAT_URL;
/**
* 心跳间隔时间 单位S
*/
@Value("${message-bus.heartbit-interval}")
private int HEARTBIT_INTERVAL;
/**
* HTTP请求框架
*/
@Resource
private RestTemplate restTemplate;
@Override
public void login() {
try {
/*
* 发起HTTP 登录请求
* 登录接口的请求头为application/x-www-form-urlencoded
*/
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
/*
* 请求参数
*/
MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>();
params.add("username", USER_NAME);
params.add("password", USER_PASS);
log.info("[LOGIN]-登录用户:[{}],登录密码:[{}]",USER_NAME,USER_PASS);
HttpEntity<MultiValueMap<String, String>> request = new HttpEntity<MultiValueMap<String, String>>(params, headers);
/*
* 提交HTTP访问,获取返回信息
*/
ResponseEntity<String> response = restTemplate.postForEntity(LOGIN_URL, request, String.class);
// 输出结果
log.info(response.getBody());
/*
校验是否登录成功
*/
if (response.getStatusCode().equals(HttpStatus.OK)) {
/**
* 从返回信息中确定是否登录成功,并取得token
* 返回格式
* {
"code":200,
"data":{
"account":"yangyucheng",
"token":"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiI2MCIsIm5hbWUiOiLmnajnjonmiJDmtYvor5UiLCJpZCI6NjAsImlhdCI6MTYxNzM0ODM3MiwiYWNjb3VudCI6Inlhbmd5dWNoZW5nIn0.ElAs7BtV1tu6ApQXuPXzgXUgvja76bjEb-zxqhUON48"
},
"message":"success",
"success":true,
"time":20210402152612604
}
*/
JSONObject resJson = JSON.parseObject(response.getBody());
JSONObject resData = resJson.getJSONObject("data");
String resCode = resJson.getString("code");
/*
校验并获取登陆成功后返回的token
*/
String authToken = resData.getString("token");
if ("200".equals(resCode) && StringUtils.hasLength(authToken) && authToken.length() > 10) {
Auth.LOGIN_STATUS = true;
//设置请求头部Authorization为token, token的类型为Bearer
Auth.TOKEN = authToken;
log.info("登录成功:token=[{}]",Auth.TOKEN);
}else {
log.error("登录失败");
}
} else {
log.error("登录失败");
}
} catch (Exception e) {
log.error("登录失败->{}",e.toString());
}
}
@Override
public void heartBit() {
if (StringUtils.hasLength(Auth.TOKEN) && Auth.LOGIN_STATUS){
/*
* 发起HTTP 登录请求
* 登录接口的请求头为application/x-www-form-urlencoded
*/
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
/*
* 设置获取到的token到头部信息Authorization节点中
*/
headers.setBearerAuth(Auth.TOKEN);
/*
* 心跳接口无参数,访问接口即可
*/
MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>();
HttpEntity<MultiValueMap<String, String>> request = new HttpEntity<MultiValueMap<String, String>>(params, headers);
/*
* 提交HTTP访问,获取返回信息
*/
ResponseEntity<String> response = restTemplate.postForEntity(HEARTBEAT_URL, request, String.class);
// 输出结果
log.debug(response.getBody());
if (response.getStatusCode().equals(HttpStatus.OK)) {
log.debug("[HEART-BIT]-心跳成功");
} else {
log.error("[HEART-BIT-ERR]-心跳失败");
}
}else {
log.error("[HEART-BIT-ERR]-未登录,心跳失败");
}
}
@Scheduled(fixedRate = 9000)
@Override
// @Async("HEART-POOL")
public void startHeartBit() {
log.info("[LOGINED]-开始心跳");
heartBit();
}
}
... ...