作者 朱兆平

增加直接从mq获取报文并解析的处理方案

... ... @@ -49,10 +49,11 @@ spring:
type: com.alibaba.druid.pool.DruidDataSource
#oracle
driver-class-name: oracle.jdbc.OracleDriver
url: jdbc:oracle:thin:@10.50.3.69:1521:CGODB
# url: jdbc:oracle:thin:@192.168.1.253:1522:ORCLL
# url: jdbc:oracle:thin:@10.50.3.69:1521:CGODB
url: jdbc:oracle:thin:@192.168.1.253:1522:ORCLL
username: CGONMS
password: 1q2w3e4r
# password: 1q2w3e4r
password: vmvnv1v2
#spring datasource mysql,注意编码配置,缺少数据库编码配置容易引起中文入库乱码
# url: jdbc:mysql://127.0.0.1:3307/statistics?useUnicode=true&characterEncoding=utf8
# username: root
... ... @@ -113,7 +114,7 @@ pagehelper:
#debug配置,debug或者为true的时候,logback才会记录和写入日志文件
trace: false
debug: false
debug: true
logging:
file:
... ... @@ -151,4 +152,36 @@ devops:
singlewindow-tcs-recept: D:\TCSSingleWindow\recive
tianbo-tcs-recept: D:\Data\Receive
cfps-subscribe-dir: D:\系统部署\imf_Warehouse_reader\xmlFromImf
#10079
\ No newline at end of file
mq:
# 从新舱单服务器读取申报业务报文发送到业务报文发送队列 {nmmsXml-mq};从回执队列{tcs-mq}读取报文保存到本地服务器回执报文接收目录
# 从tcs服务器读取海关回执报文发送到回执报文发送队列 {tcs-mq};从新舱单报文申报队列{nmmsXml-mq}读取报文保存到本地服务器申报报文发送目录
dir:
#报文读取目录,读取本地文件发送到MQ [目录结尾要带/]
sendXml-dir: /Users/mrz/Downloads/rdp_temp/logs/回执报文样例/
#保存从mq取到的回执报文到本地回执目录 [目录结尾要带/]
saveXml-dir: /Users/mrz/Downloads/rdp_temp/logs/transmit/
queue:
#发送mq队列名称
send-to-mq: send
#读取mq队列名称
read-from-mq: response.agent
exchange:
#TCS回执发送到此交换,交换再把回执消息同步到其他配置的回执订阅队列
#交换名称
name: cus.rcv.broadcast
#交换类型
type: fanout
#routing key名称,此处为空,所有绑定交换的队列都被广播
routing-key:
connection:
ip: 218.28.199.134
port: 8004
vHost: NMMS
username: tianbo
password: vmvnv1v2VV
# ip: 192.168.1.63
# port: 5672
# vHost: NMMS
# username: mrz
# password: vmvnv1v2
\ No newline at end of file
... ...
... ... @@ -23,7 +23,6 @@ import java.time.Duration;
@EnableDiscoveryClient
@EnableTransactionManagement
@MapperScan("com.tianbo.analysis.dao")
@ComponentScan({"com.tianbo.analysis"})
public class BootApplication {
public static void main(String[] args) {
... ...
package com.tianbo.analysis.handle;
import com.tianbo.analysis.model.*;
import com.tianbo.analysis.tools.AWBTools;
import com.tianbo.util.Date.DateUtil;
import com.tianbo.util.XML.XMLXPath;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileExistsException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.xml.sax.SAXParseException;
import javax.annotation.PostConstruct;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.UnsupportedEncodingException;
import java.util.UUID;
/**
* @author mrz
* @e
* 海关新舱单回执解析xml文件
*/
@Data
@Component
@Slf4j
public class CustomResponseHandleThread implements Runnable{
//解析出错转移目录
@Value("${custom.errBakDir}")
private String errBakDir;
@Value("${custom.receptBakDir}")
private String receptBakDir;
private String responseMessage;
private static CustomResponseHandleThread customXmlHandle;
/**
* 单一窗口回执的技术回执
*/
public final static String DONT_NEED = "<Code>0</Code><Text>接收成功</Text>";
//海关新舱单回执报头
public final static String MessageID = "//Manifest/Head/MessageID";
public final static String FunctionCode = "//Manifest/Head/FunctionCode";
public final static String MessageType = "//Manifest/Head/MessageType";
public final static String SenderID = "//Manifest/Head/SenderID";
public final static String ReceiverID = "//Manifest/Head/ReceiverID";
public final static String SendTime = "//Manifest/Head/SendTime";
public final static String Version = "//Manifest/Head/Version";
/**
* 海关普通业务新舱单回执报体
*/
public final static String JourneyID = "//Manifest/Response/BorderTransportMeans/JourneyID";
public final static String WaybillMaster = "//Manifest/Response/Consignment/TransportContractDocument/ID";
public final static String WaybillSecond = "//Manifest/Response/Consignment/AssociatedTransportDocument/ID";
public final static String ResponseCode = "//ResponseType/Code";
public final static String ResponseText = "//ResponseType/Text";
/**
* 海关国际转运业务回执报体
*/
public final static String ImportJourneyID = "//Manifest/Response/ImportInformation/BorderTransportMeans/JourneyID";
public final static String ImportWaybillMaster = "//Manifest/Response/ImportInformation/Consignment/TransportContractDocument/ID";
public final static String ExportJourneyID = "//Manifest/Response/ExportInformation/BorderTransportMeans/JourneyID";
public final static String ExportWaybillMaster = "//Manifest/Response/ExportInformation/Consignment/TransportContractDocument/ID";
// public final static String TransResponseCode = "//Manifest/Response/ResponseType/Code";
// public final static String TransResponseText = "//Manifest/Response/ResponseType/Text";
public final static String TransResponseCode = "//ResponseType/Code";
public final static String TransResponseText = "//ResponseType/Text";
//通过@PostConstruct实现初始化bean之前进行的操作,解决service调用空指针问题
@PostConstruct
public void init() {
customXmlHandle = this;
customXmlHandle.errBakDir = this.errBakDir;
customXmlHandle.receptBakDir = this.receptBakDir;
// 初使化时将已静态化的testService实例化
}
@Override
public void run() {
log.info("线程开始");
String today = DateUtil.getTodayBy_yyyyMMdd();
String errDirByDay = customXmlHandle.errBakDir + "/" + today;
String successDir = customXmlHandle.receptBakDir + "/" + today;
if(responseMessage.contains(DONT_NEED)){
log.warn("{}报文为技术回执,剪切到错误备份目录,不解析",responseMessage);
errBak(responseMessage,errDirByDay);
log.info("报文为技术回执备份成功..开始解析下一文件,当前线程即将结束");
}
else
{
try{
int i =handelXmlDocument();
//操作成功,则转移剪切解析文件到备份目录,否则转移到error目录备份
if(i>0){
errBak(responseMessage,successDir);
log.info("报文解析入库成功");
}else {
errBak(responseMessage,errDirByDay);
}
}catch (Exception e){
log.error("报文备份异常",e);
errBak(responseMessage,errDirByDay);
e.printStackTrace();
}
}
log.info("线程结束");
}
/**
*
*
* @return
* @throws DocumentException
* @throws UnsupportedEncodingException
* @throws SAXParseException
* @throws FileNotFoundException
*/
private int handelXmlDocument() throws DocumentException,UnsupportedEncodingException,SAXParseException,FileNotFoundException {
int i = 0;
String secondSplit = "_";
SAXReader saxReader = new SAXReader();
Document document = saxReader.read(new ByteArrayInputStream(responseMessage.getBytes("UTF-8")));
Element contentRoot = document.getRootElement();
String flightNo = "UNKONW";
String flightDate = "20101010";
//开始解析
String msgType = XMLXPath.getSingleValueByPath(document,MessageType);
String journeyid = XMLXPath.getSingleValueByPath(document,JourneyID);
// XMLXPath.getSingleValueByPath(document, )
String awbA = XMLXPath.getSingleValueByPath(document,WaybillMaster);
//全格式的分单 如 17212345678_ADBD
String awbH = XMLXPath.getSingleValueByPath(document,WaybillSecond);
String resCode = XMLXPath.getSingleValueByPath(document,ResponseCode);
String resText = XMLXPath.getSingleValueByPath(document,ResponseText);
String messageId = XMLXPath.getSingleValueByPath(document, MessageID);
String sendTime = XMLXPath.getSingleValueByPath(document,SendTime);
String sendId = XMLXPath.getSingleValueByPath(document,SenderID);
String reciveId = XMLXPath.getSingleValueByPath(document,ReceiverID);
String version = XMLXPath.getSingleValueByPath(document,Version);
String functionCode = XMLXPath.getSingleValueByPath(document,FunctionCode);
CustomReception customReception = new CustomReception( msgType,
flightNo,
flightDate,
awbA,
awbH,
resCode,
resText,
messageId,
sendTime,
sendId,
reciveId,
version,
functionCode);
/**
* 如果回执中没有携带航班信息节点,说明是出错报文
* 到发送日志表根据messageid 找到相应的发送日志报文的航班及运单信息,再进行解析
*/
if(!StringUtils.isEmpty(journeyid)){
String[] flightList = journeyid.split("/");
if(flightList.length > 0){
flightNo = flightList[0];
flightDate = flightList[1];
customReception.setFlightNo(flightNo);
customReception.setFlightDate(flightDate);
}
}else {
CUSTOMSMESSAGE customsmessage = new CUSTOMSMESSAGE();
customReception = customsmessage.getWaybillInfoByCutomResponse(customReception);
}
switch (msgType){
case "MT9999":
if (customReception.getWayBillSecond()!=null && customReception.getWayBillSecond().contains(secondSplit)){
PREPARESECONDARY preparesecondary = new PREPARESECONDARY(customReception);
int pre_i = preparesecondary.secondAnalysisReception();
if (pre_i>0){
i=1;
}else {
ARRIVEDSECONDARY arrivedsecondary = new ARRIVEDSECONDARY(customReception);
int arr_i=arrivedsecondary.secondAnalysisReception();
if (arr_i>0){
i= 1;
}else {
Originmanifestsecondary originmanifestsecondary = new Originmanifestsecondary(customReception);
int org_i=originmanifestsecondary.secondAnalysisReception();
if (org_i>0){
i=1;
}
}
}
}else{
ORIGINMANIFESTMASTER originmanifestmaster = new ORIGINMANIFESTMASTER(customReception);
PREPAREMASTER preparemaster= new PREPAREMASTER(customReception);
ARRIVEDMASTER arrivedmaster9999 = new ARRIVEDMASTER(customReception);
if(originmanifestmaster.masterAnalysisReception()>0){
i=1;
}else if(preparemaster.masterAnalysisReception()>0){
i=1;
}else {
i=arrivedmaster9999.masterAnalysisReception();
}
}
AgentXmlHandle agentXmlHandle=new AgentXmlHandle();
agentXmlHandle.Http_resolver(customReception);
break;
case "MT3201":
if (customReception.getWayBillSecond()!=null && customReception.getWayBillSecond().contains(secondSplit)){
ARRIVEDSECONDARY arrivedsecondary = new ARRIVEDSECONDARY(customReception);
i=arrivedsecondary.secondAnalysisReception();
}else {
ARRIVEDMASTER arrivedmaster = new ARRIVEDMASTER(customReception);
i=arrivedmaster.masterAnalysisReception();
}
break;
case "MT5202":
if (customReception.getWayBillSecond()!=null && customReception.getWayBillSecond().contains(secondSplit)){
TALLYSECONDARY tallysecondary = new TALLYSECONDARY(customReception);
i=tallysecondary.secondAnalysisReception();
}else {
TALLYMASTER tallymaster= new TALLYMASTER(customReception);
i = tallymaster.masterAnalysisReception();
}
break;
case "MT5201":
if (customReception.getWayBillSecond()!=null && customReception.getWayBillSecond().contains(secondSplit)){
TALLYSECONDARY tallysecondary = new TALLYSECONDARY(customReception);
i=tallysecondary.secondAnalysisReception();
}else {
TALLYMASTER tallymaster= new TALLYMASTER(customReception);
i = tallymaster.masterAnalysisReception();
}
break;
case "MT4201":
DEPARTURESLOADING departuresloading = new DEPARTURESLOADING(customReception);
i=departuresloading.masterAnalysisReception();
break;
case "MT1201":
if (customReception.getWayBillSecond()!=null && customReception.getWayBillSecond().contains(secondSplit)){
Originmanifestsecondary originmanifestsecondary = new Originmanifestsecondary(customReception);
i=originmanifestsecondary.secondAnalysisReception();
}else {
ORIGINMANIFESTMASTER originmanifestmaster1201 = new ORIGINMANIFESTMASTER(customReception);
i = originmanifestmaster1201.masterAnalysisReception();
}
break;
case "MT2201":
if (customReception.getWayBillSecond()!=null && customReception.getWayBillSecond().contains(secondSplit)){
PREPARESECONDARY preparesecondary = new PREPARESECONDARY(customReception);
i = preparesecondary.secondAnalysisReception();
}else {
PREPAREMASTER preparemaster1201= new PREPAREMASTER(customReception);
i = preparemaster1201.masterAnalysisReception();
}
break;
case "MT6202":
INPORTALLOCATE inportallocate = new INPORTALLOCATE(customReception);
i = inportallocate.masterAnalysisReception();
break;
case "MT3202":
ALLOCATEARRIVAL allocatearrival = new ALLOCATEARRIVAL(customReception);
i= allocatearrival.masterAnalysisReception();
break;
case "MT8205":
i = transXmlHandel(document,customReception);
break;
case "MT8202":
MANIFESTLOAD manifestload = new MANIFESTLOAD(customReception);
i= manifestload.secondAnalysisReception();
break;
case "MT8203":
MANIFESTLOSTCHANGE manifestlostchange = new MANIFESTLOSTCHANGE(customReception);
i= manifestlostchange.secondAnalysisReception();
break;
default:
break;
}
updateCustomMessage(customReception);
return i;
}
/**
* 国际转运回执处理,特殊业务
* @param document
* @param customReception
* @return
*/
private int transXmlHandel(Document document,CustomReception customReception){
String importJourneyID = XMLXPath.getSingleValueByPath(document,ImportJourneyID);
String importFlightNo = "UNKONW";
String importFlightDate = "20101010";
if(!StringUtils.isEmpty(importJourneyID)){
importFlightNo = AWBTools.splitFlightAndDate(importJourneyID)[0];
importFlightDate = AWBTools.splitFlightAndDate(importJourneyID)[1];
}
String importWaybillMaster= XMLXPath.getSingleValueByPath(document,ImportWaybillMaster);
String transResponseCode=XMLXPath.getSingleValueByPath(document,TransResponseCode);
String transResponseText=XMLXPath.getSingleValueByPath(document,TransResponseText);
CustomReception transCustomReception = new CustomReception( customReception.getMessageType(),
importFlightNo,
importFlightDate,
importWaybillMaster,
transResponseCode,
transResponseText,
customReception.getMessageID(),
customReception.getSendTime(),
customReception.getSenderID(),
customReception.getReceiverID(),
customReception.getVersion(),
customReception.getFunctionCode());
INTERNATIONALTRANSIT internationaltransit = new INTERNATIONALTRANSIT(transCustomReception);
int i=internationaltransit.masterAnalysisReception();
return i;
}
private int updateCustomMessage(CustomReception customReception){
CUSTOMSMESSAGE customsmessage = new CUSTOMSMESSAGE(customReception);
return customsmessage.updateMessageByMessageid();
}
private void errBak(String responseText,String dir){
try {
File errfile = new File(dir + "/" + UUID.randomUUID().toString()+".xml");
FileUtils.writeStringToFile(errfile,responseText,"UTF-8");
log.info("数据库未找到与回执适配的信息备份报文");
}catch (FileExistsException e){
e.printStackTrace();
log.error("备份文件失败");
}catch (Exception e){
e.printStackTrace();
}
}
}
... ...
... ... @@ -470,7 +470,7 @@ public class ORIGINMANIFESTMASTER extends WlptBaseModel {
public int masterAnalysisReception() {
//更新主单回执
int i = originmanifestmasterMapper.updateRECEIPTION(this);
//获取单autoid
//获取单autoid
List<ORIGINMANIFESTMASTER> originmanifestmasterList = originmanifestmasterMapper.selectAutoIdByAwb(this);
if(!originmanifestmasterList.isEmpty()){
ORIGINMANIFESTMASTER originMaster = originmanifestmasterList.get(0);
... ...
package com.tianbo.analysis.model;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.tianbo.analysis.service.rabbit.RabbitGetMessage;
import com.tianbo.util.RabitMq.ConnectionUtil;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@Slf4j
public class RabbitMQ {
public static String status = "not runing";
private String queueName;
private String exchangeName;
private String routingKey;
private String mqIp;
private int mqPort;
private String mqVhost;
private String mqUsername;
private String mqPassword;
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public String getMqIp() {
return mqIp;
}
public void setMqIp(String mqIp) {
this.mqIp = mqIp;
}
public int getMqPort() {
return mqPort;
}
public void setMqPort(int mqPort) {
this.mqPort = mqPort;
}
public String getMqVhost() {
return mqVhost;
}
public void setMqVhost(String mqVhost) {
this.mqVhost = mqVhost;
}
public String getMqUsername() {
return mqUsername;
}
public void setMqUsername(String mqUsername) {
this.mqUsername = mqUsername;
}
public String getMqPassword() {
return mqPassword;
}
public void setMqPassword(String mqPassword) {
this.mqPassword = mqPassword;
}
public RabbitMQ(String mqIp, int mqPort, String mqVhost, String mqUsername, String mqPassword,String queueName) {
this.mqIp = mqIp;
this.mqPort = mqPort;
this.mqVhost = mqVhost;
this.mqUsername = mqUsername;
this.mqPassword = mqPassword;
this.queueName = queueName;
}
public void getResponseFromMq() {
try {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection(
mqIp,
mqPort,
mqVhost,
mqUsername,
mqPassword);
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
// channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true);
// channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName,exchangeName,routingKey);
channel.basicQos(10);
channel.basicConsume(queueName, false, new RabbitGetMessage(channel));
status = "runing";
}catch (IOException e){
status = "not runing";
e.printStackTrace();
log.error("MQ服务器连接失败,连接不上服务器:{},异常退出",e.toString());
}catch (TimeoutException e){
status = "not runing";
e.printStackTrace();
log.error("MQ服务器连接失败,连接超时:{},异常退出",e.toString());
}
}
}
... ...
package com.tianbo.analysis.service.rabbit;
import com.rabbitmq.client.*;
import com.tianbo.analysis.handle.CustomResponseHandleThread;
import com.tianbo.analysis.task.XMLThreadPoolFactory;
import com.tianbo.util.RabitMq.MqResponse;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
public class RabbitGetMessage extends DefaultConsumer {
private Channel ackChannel;
/**
* 初始化线程池
*/
private static ThreadPoolExecutor threadPool = XMLThreadPoolFactory.instance();
public RabbitGetMessage(Channel channel) {
super(channel);
this.ackChannel= channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
log.info("消费状态:consumerTag->{},envelope->{},properties->{},body->{}",consumerTag,envelope,properties,message);
ackChannel.basicAck(envelope.getDeliveryTag(),false);
//过滤报文头部
// message = message.replace("<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>","");
//todo:多线程处理消息
handleMessage(message);
}
private void handleMessage(String message){
CustomResponseHandleThread customResponseHandleThread = new CustomResponseHandleThread();
customResponseHandleThread.setResponseMessage(message);
try{
threadPool.execute(customResponseHandleThread);
}catch (RejectedExecutionException e){
e.printStackTrace();
log.error("线程池已满");
}catch (Exception e){
e.printStackTrace();
log.error("线程池异常");
}
}
}
... ...
package com.tianbo.analysis.task;
import com.tianbo.analysis.model.RabbitMQ;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MqGetMessageTask {
@Value("${mq.connection.ip}")
private String mqIp;
@Value("${mq.connection.port}")
private int mqPort;
@Value("${mq.connection.vHost}")
private String mqVhost;
@Value("${mq.connection.username}")
private String mqUsername;
@Value("${mq.connection.password}")
private String mqPassword;
@Value("${mq.queue.read-from-mq}")
private String queueName;
@Value("${mq.exchange.name}")
private String exchangeName;
@Value("${mq.exchange.routing-key}")
private String routingKey;
private final static String MQ_RUNING_STATE= "runing";
@Scheduled(fixedRate = 5000)
public void startTask(){
RabbitMQ rabbit = new RabbitMQ(mqIp,mqPort,mqVhost,mqUsername,mqPassword,queueName);
rabbit.setExchangeName(exchangeName);
rabbit.setRoutingKey(routingKey);
if (!MQ_RUNING_STATE.equals(RabbitMQ.status)){
rabbit.getResponseFromMq();
}
log.info("解析程序执行中。。。");
}
}
... ...
... ... @@ -38,7 +38,7 @@ public class TaskAnalysis {
*/
private final static int theadamount = 64;
@Scheduled(fixedRate = 5000)
// @Scheduled(fixedRate = 5000)
public void startTask(){
final SimpleDateFormat sdf = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
... ...
... ... @@ -11,9 +11,9 @@ public class XMLThreadPoolFactory {
public static ThreadPoolExecutor instance(){
if (threadPool==null){
XMLThreadFactory xmlThreadFactory = new XMLThreadFactory("xml");
threadPool = new ThreadPoolExecutor(64, 128,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024),
threadPool = new ThreadPoolExecutor(10, 128,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
xmlThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
... ...