作者 朱兆平

增加直接从mq获取报文并解析的处理方案

... ... @@ -2,7 +2,7 @@
web:
upload-path: upload/
server:
port: 10002
port: 10003
servlet:
context-path: ${SERVER_CONTEXTPATH:}
spring:
... ... @@ -22,7 +22,7 @@ spring:
static-locations: classpath:/META-INF/resources/,classpath:/static,classpath:/resources/,classpath:/public/,file:${web.upload-path}
application:
name: WLTP-NMMS-CUSTOMMESSAGEPROCESS
name: WLTP-NMMS-MQ-CUSTOMMESSAGEPROCESS
jackson:
serialization:
... ... @@ -37,12 +37,20 @@ spring:
#eureka主机名,会在控制页面中显示
#DEV环境关闭注册。
features:
enabled: false
enabled: true
discovery:
enabled: false
enabled: true
service-registry:
auto-registration:
enabled: false
enabled: true
#feign的配置,连接超时及读取超时配置
feign:
client:
config:
default:
connectTimeout: 6000
readTimeout: 60000
loggerLevel: basic
eureka:
instance:
... ... @@ -51,15 +59,12 @@ eureka:
prefer-ip-address: true
instance-id: ${spring.cloud.client.ip-address}:${server.port}
hostname: ${spring.cloud.client.ip-address}
health-check-url-path: /actuator/health
lease-renewal-interval-in-seconds: 15
lease-expiration-duration-in-seconds: 45
metadata-map:
startup: ${random.int}
client:
#eureka注册中心服务器地址
service-url:
defaultZone: http://10.50.3.82:19527/eureka/
defaultZone: http://192.168.1.53:12345/eureka/
registry-fetch-interval-seconds: 30
#debug配置,debug或者为true的时候,logback才会记录和写入日志文件
... ... @@ -112,11 +117,11 @@ mq:
#发送mq队列名称
send-to-mq: send
#读取mq队列名称
read-from-mq: receive
read-from-mq: response.agent
exchange:
#TCS回执发送到此交换,交换再把回执消息同步到其他配置的回执订阅队列
#交换名称
name: air.rcv.broadcast
name: cus.rcv.broadcast
#交换类型
type: fanout
#routing key名称,此处为空,所有绑定交换的队列都被广播
... ... @@ -125,7 +130,7 @@ mq:
connection:
ip: 218.28.199.134
port: 8004
vHost: AIRTRANS
vHost: NMMS
username: tianbo
password: vmvnv1v2VV
# ip: 192.168.1.63
... ...
... ... @@ -11,9 +11,9 @@
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.tianbo</groupId>
<artifactId>customs</artifactId>
<artifactId>customs-with-mq</artifactId>
<version>1.0-SNAPSHOT</version>
<name>customMessageProcess</name>
<name>customMessageWithMQ</name>
<description>process customs xml files with RabbitMQ</description>
<properties>
<druid.version>1.1.9</druid.version>
... ... @@ -32,6 +32,10 @@
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<exclusions>
... ... @@ -97,6 +101,12 @@
<artifactId>util</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.tianbo</groupId>
<artifactId>util</artifactId>
<version>1.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<!--util依赖-->
<!--&lt;!&ndash;IMF需要的包&ndash;&gt;-->
... ...
... ... @@ -12,6 +12,7 @@ import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfigurat
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
... ... @@ -25,6 +26,7 @@ import java.time.Duration;
HibernateJpaAutoConfiguration.class})
@EnableScheduling
@EnableDiscoveryClient
@EnableFeignClients
@EnableTransactionManagement
//@MapperScan("com.tianbo.analysis.dao")
public class MQBootApplication {
... ...
package com.tianbo.analysis.handle;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import com.tianbo.analysis.service.CustomAnalysisService;
import com.tianbo.util.Date.DateUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileExistsException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.xml.sax.InputSource;
import javax.annotation.PostConstruct;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.util.UUID;
/**
* 报文分发到舱单回执解析服务线程
* 分发前判定报文是否为正常合规的XML报文
* 合规的过滤掉XML的头部信息(为了能将原报文内容转发到IMF),
* 并备份一份到bakupDir目录,并转发一份到新舱单转发回执到IMF发送端监听目录下
* 不合规的备份到errBakDir目录
*/
@Component
@Data
@Slf4j
public class CustomHandleThead implements Runnable{
private String msg;
private Channel ackChannel;
private Envelope envelope;
//备份目录
@Value("${custom.receptBakDir}")
private String bakupDir;
@Value("${custom.errBakDir}")
private String errBakDir;
//回执转发目录
@Value("${custom.transmitDir}")
private String transmitDir;
private static CustomHandleThead customHandleThead;
@Autowired
CustomAnalysisService customAnalysisService;
public CustomHandleThead(){
}
public CustomHandleThead(String msg) {
this.msg = msg;
}
public CustomHandleThead(String msg, Channel ackChannel,Envelope envelope) {
this.msg = msg;
this.ackChannel = ackChannel;
this.envelope = envelope;
}
@PostConstruct
public void init() {
customHandleThead = this;
customHandleThead.customAnalysisService = this.customAnalysisService;
customHandleThead.bakupDir = this.bakupDir;
customHandleThead.errBakDir = this.errBakDir;
customHandleThead.transmitDir = this.transmitDir;
// 初使化时将已静态化的testService实例化
}
@Override
public void run() {
try {
if (StringUtils.isNotBlank(msg) && isXmlDocument(msg)){
//过滤报文头部
msg = msg.replace("<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>","");
boolean result = customHandleThead.customAnalysisService.analysis(msg);
if (result){
ack();
}else {
ack();
}
}else {
//错误报文直接消费 不处理;并备份
ack();
}
}catch (Exception e){
nack();
e.printStackTrace();
}
}
private void ack(){
try {
ackChannel.basicAck(envelope.getDeliveryTag(),false);
log.warn("tag:{}已消费",envelope.getDeliveryTag());
}catch (IOException e){
e.printStackTrace();
}
}
private void nack(){
try {
ackChannel.basicNack(envelope.getDeliveryTag(), false, true);
}catch (IOException e){
e.printStackTrace();
}
}
private static boolean isXmlDocument(String rtnMsg) {
boolean flag = true;
try {
DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = documentBuilderFactory.newDocumentBuilder();
builder.parse(new InputSource(new StringReader(rtnMsg)));
} catch (Exception e) {
flag = false;
}
return flag;
}
public static void bakFile(String content,String filePath){
try {
String fileName = customHandleThead.bakupDir + UUID.randomUUID().toString()+".xml";
log.info("-----------{}报文保存成功----------",fileName);
File fileToDirectory = new File(fileName);
FileUtils.writeStringToFile(fileToDirectory,content,"UTF-8");
}catch (IOException e){
}
}
}
... ...
package com.tianbo.analysis.handle;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.tianbo.analysis.task.XMLThreadPoolFactory;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
public class RabbitGetMessage extends DefaultConsumer {
private Channel ackChannel;
/**
* 初始化线程池
*/
private static ThreadPoolExecutor threadPool = XMLThreadPoolFactory.instance();
public RabbitGetMessage(Channel channel) {
super(channel);
this.ackChannel= channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
// log.info("消费状态:consumerTag->{},envelope->{},properties->{},body->{}",consumerTag,envelope,properties,message);
// ackChannel.basicAck(envelope.getDeliveryTag(),false);
//todo:多线程处理消息
handleMessage(message,ackChannel,envelope);
}
private void handleMessage(String message,Channel channel,Envelope envelope){
CustomHandleThead customResponseHandleThread = new CustomHandleThead(message,channel,envelope);
try{
threadPool.execute(customResponseHandleThread);
}catch (RejectedExecutionException e){
e.printStackTrace();
log.error("线程池已满");
}catch (Exception e){
e.printStackTrace();
log.error("线程池异常");
}
}
}
... ...
package com.tianbo.analysis.model;
import lombok.Data;
@Data
public class CustomReception {
//回执报头
private String messageID;
private String functionCode;
private String messageType;
private String senderID;
private String receiverID;
private String sendTime;
private String version;
//一般业务回执
private String flightNo;
private String flightDate;
private String wayBillMaster;
private String wayBillSecond;
private String responseCode;
private String responseText;
//国际转运回执
private String importFlightDate;
private String importFlightNo;
private String importWaybillMaster;
private String exportFlightDate;
private String exportFlightNo;
private String exportWaybillMaster;
public CustomReception(String messageType,
String flightNo,
String flightDate,
String wayBillMaster,
String wayBillSecond,
String responseCode,
String responseText,
String messageID,
String sendTime,
String senderID,
String receiverID,
String version,
String functionCode){
this.messageType = messageType;
this.flightNo = flightNo;
this.flightDate = flightDate;
this.wayBillMaster = wayBillMaster;
this.wayBillSecond = wayBillSecond;
this.responseCode = responseCode;
this.responseText = responseText;
this.messageID = messageID;
this.sendTime = sendTime;
this.senderID =senderID;
this.receiverID = receiverID;
this.version = version;
this.functionCode = functionCode;
}
public CustomReception(String messageType,
String importFlightNo,
String importFlightDate,
String importWaybillMaster,
String transResponseCode,
String transResponseText,
String messageID,
String sendTime,
String senderID,
String receiverID,
String version,
String functionCode){
this.messageType = messageType;
this.importFlightNo = importFlightNo;
this.importFlightDate = importFlightDate;
this.importWaybillMaster = importWaybillMaster;
this.responseCode = transResponseCode;
this.responseText = transResponseText;
this.messageID = messageID;
this.sendTime = sendTime;
this.senderID =senderID;
this.receiverID = receiverID;
this.version = version;
this.functionCode = functionCode;
}
}
package com.tianbo.analysis.model;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.tianbo.analysis.handle.RabbitGetMessage;
import com.tianbo.util.RabitMq.ConnectionUtil;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@Slf4j
public class RabbitMQ {
public static String status = "not runing";
private String queueName;
private String exchangeName;
private String routingKey;
private String mqIp;
private int mqPort;
private String mqVhost;
private String mqUsername;
private String mqPassword;
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public String getMqIp() {
return mqIp;
}
public void setMqIp(String mqIp) {
this.mqIp = mqIp;
}
public int getMqPort() {
return mqPort;
}
public void setMqPort(int mqPort) {
this.mqPort = mqPort;
}
public String getMqVhost() {
return mqVhost;
}
public void setMqVhost(String mqVhost) {
this.mqVhost = mqVhost;
}
public String getMqUsername() {
return mqUsername;
}
public void setMqUsername(String mqUsername) {
this.mqUsername = mqUsername;
}
public String getMqPassword() {
return mqPassword;
}
public void setMqPassword(String mqPassword) {
this.mqPassword = mqPassword;
}
public RabbitMQ(String mqIp, int mqPort, String mqVhost, String mqUsername, String mqPassword, String queueName) {
this.mqIp = mqIp;
this.mqPort = mqPort;
this.mqVhost = mqVhost;
this.mqUsername = mqUsername;
this.mqPassword = mqPassword;
this.queueName = queueName;
}
public void getResponseFromMq() {
try {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection(
mqIp,
mqPort,
mqVhost,
mqUsername,
mqPassword);
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
// channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true);
// channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName,exchangeName,routingKey);
channel.basicQos(15);
channel.basicConsume(queueName, false, new RabbitGetMessage(channel));
status = "runing";
}catch (IOException e){
status = "not runing";
e.printStackTrace();
log.error("MQ服务器连接失败,连接不上服务器:{},异常退出",e.toString());
}catch (TimeoutException e){
status = "not runing";
e.printStackTrace();
log.error("MQ服务器连接失败,连接超时:{},异常退出",e.toString());
}catch (Exception e){
e.printStackTrace();
status = "not runing";
}
}
}
... ...
package com.tianbo.analysis.rabbitmq;
import com.rabbitmq.client.*;
import com.tianbo.analysis.bean.SpringBeanUtitl;
import com.tianbo.analysis.handle.CustomHandleThead;
import com.tianbo.analysis.service.CustomAnalysisService;
import com.tianbo.analysis.task.XMLThreadPoolFactory;
import com.tianbo.util.RabitMq.MqResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.regex.*;
/**
... ... @@ -22,12 +31,18 @@ public class GetResponse extends DefaultConsumer {
private static GetResponse getResponse;
/**
* 初始化线程池
*/
private static ThreadPoolExecutor threadPool = XMLThreadPoolFactory.instance();
public GetResponse(Channel channel) {
super(channel);
}
public GetResponse(Channel channel,String dir) {
super(channel);
this.receptDir = dir;
}
/**
* 处理回来的信息
... ... @@ -46,7 +61,9 @@ public class GetResponse extends DefaultConsumer {
// System.err.println("body: " + new String(body));
this.mqResponse = new MqResponse(consumerTag,envelope,properties,new String(body, StandardCharsets.UTF_8));
log.info("-----------获取到报文----------\n{}",mqResponse.getContent());
writeToReadDir(mqResponse.getContent());
handleMessage(mqResponse.getContent());
// writeToReadDir(mqResponse.getContent());
//写入回执目录
}
... ... @@ -71,4 +88,18 @@ public class GetResponse extends DefaultConsumer {
}
private void handleMessage(String message){
CustomHandleThead customResponseHandleThread = new CustomHandleThead();
customResponseHandleThread.setMsg(message);
try{
threadPool.execute(customResponseHandleThread);
}catch (RejectedExecutionException e){
e.printStackTrace();
log.error("线程池已满");
}catch (Exception e){
e.printStackTrace();
log.error("线程池异常");
}
}
}
... ...
... ... @@ -46,8 +46,8 @@ public class ReadMessage {
String exchangeName = "amq.topic";
String routingKey = "consumer.#";
// 声明(创建)队列
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true);
channel.queueDeclare(queueName, true, false, false, null);
// channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true);
// channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName,exchangeName,routingKey);
status = "runing";
channel.basicConsume(queueName, true, new GetResponse(channel,dir));
... ...
package com.tianbo.analysis.service;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(name = "wltp-nmms-custom-reciption-resolver")
public interface CustomAnalysisService {
@PostMapping("/cust/analysis")
boolean analysis(@RequestParam(value = "xml",required = true) String xml);
}
... ...
package com.tianbo.analysis.task;
import com.tianbo.analysis.model.RabbitMQ;
import com.tianbo.analysis.rabbitmq.ReadMessage;
import com.tianbo.util.Date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 新舱单从mq服务器的tcs回执队列中获取回执报文并落地保存
* 回执解析定时任务
* 从mq获取报文 落地保存
*/
@Slf4j
@Component
public class CustomMqGetTask {
//报文保存目录
@Value("${mq.dir.saveXml-dir}")
private String receptDir;
@Value("${mq.connection.ip}")
private String mqIp;
@Value("${mq.connection.port}")
private int mqPort;
@Value("${mq.connection.vHost}")
private String mqVhost;
@Value("${mq.connection.username}")
private String mqUsername;
@Value("${mq.connection.password}")
private String mqPassword;
@Value("${mq.queue.read-from-mq}")
private String queueName;
@Value("${mq.exchange.name}")
private String exchangeName;
@Value("${mq.exchange.routing-key}")
private String routingKey;
@Scheduled(fixedRate = 5000)
public void startTask(){
try {
//从mq获取回执
RabbitMQ readMessage = new RabbitMQ(mqIp,mqPort,mqVhost,mqUsername,mqPassword,queueName);
readMessage.setExchangeName(exchangeName);
readMessage.setRoutingKey(routingKey);
if(!"runing".equals(RabbitMQ.status)){
log.info("开始连接MQ服务器");
readMessage.getResponseFromMq();
log.info("MQ通道建立成功");
}
}catch (Exception e){
e.printStackTrace();
log.error("程序出错",e);
}
// log.info("获取消息结束");
}
}
... ...
... ... @@ -43,7 +43,7 @@ public class NmmsMqGetTask {
@Value("${mq.queue.read-from-mq}")
private String queueName;
@Scheduled(fixedRate = 5000)
// @Scheduled(fixedRate = 5000)
public void startTask(){
... ...
... ... @@ -59,7 +59,7 @@ public class NmmsMqSendTask {
*/
private final static int theadamount = 10;
@Scheduled(fixedRate = 5000)
// @Scheduled(fixedRate = 5000)
public void startTask(){
final SimpleDateFormat sdf = new SimpleDateFormat(
... ...
... ... @@ -11,9 +11,9 @@ public class XMLThreadPoolFactory {
public static ThreadPoolExecutor instance(){
if (threadPool==null){
XMLThreadFactory xmlThreadFactory = new XMLThreadFactory("xml");
threadPool = new ThreadPoolExecutor(10, 100,
threadPool = new ThreadPoolExecutor(15, 240,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024),
new LinkedBlockingQueue<Runnable>(),
xmlThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
... ...