SendXml2MqThread.java
3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package com.tianbo.analysis.handle;
import com.tianbo.analysis.model.*;
import com.tianbo.analysis.tools.AWBTools;
import com.tianbo.util.Date.DateUtil;
import com.tianbo.util.RabitMq.MQSendMsg;
import com.tianbo.util.RabitMq.exchange.ExSendMsg;
import com.tianbo.util.XML.XMLXPath;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileExistsException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.xml.sax.InputSource;
import org.xml.sax.SAXParseException;
import javax.annotation.PostConstruct;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.io.*;
import java.util.concurrent.CountDownLatch;
/**
* @author mrz
* @e
* 海关新舱单回执解析xml文件
*/
@Data
@Component
@Slf4j
public class SendXml2MqThread implements Runnable{
private CountDownLatch latch;
private File xmlfile;
private String mqIp;
private int mqPort;
private String mqVhost;
private String mqUsername;
private String mqPassword;
private String queueName;
private String exchangeName;
private String exchangeType;
private String routingName;
@Override
public void run() {
String filename = xmlfile.getName();
log.info("线程:{}开始",filename);
if(xmlfile.canRead()){
try{
//解析前先转发
int i = handelXmlDocument();
//发送成功删除,发送失败不管保留报文
if(i==1){
FileUtils.forceDelete(xmlfile);
}
}catch (IOException ioe){
log.error("文件不存在",ioe);
ioe.printStackTrace();
} catch (Exception e){
log.error("错误的解析文件剪切失败,目标目录已存在同名文件",e);
e.printStackTrace();
}
}
log.info("线程:{}结束",xmlfile.getName());
latch.countDown();
log.info("剩余线程数量{}",latch.getCount());
}
/**
*
*
* @return
* @throws DocumentException
* @throws UnsupportedEncodingException
* @throws SAXParseException
* @throws FileNotFoundException
*/
private int handelXmlDocument() throws DocumentException,UnsupportedEncodingException,SAXParseException,FileNotFoundException, IOException {
int i = 0;
String content = FileUtils.readFileToString(xmlfile,"UTF-8");
//校验报文格式是否是XML
if (isXmlDocument(content)){
/**
* 发送消息到交换上
*/
boolean success = ExSendMsg.sendMsg(
exchangeName,
exchangeType,
routingName,
queueName,
content,
mqIp,
mqPort,
mqVhost,
mqUsername,
mqPassword
);
return success ? 1 : 0;
}else {
log.info("[{}]报文格式未通过XML校验,报文删除",content);
return 1;
}
}
/**
* 校验字符窜是否是XML格式
* @param rtnMsg
* @return
*/
private static boolean isXmlDocument(String rtnMsg){
boolean flag = true;
try {
DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = documentBuilderFactory.newDocumentBuilder();
builder.parse( new InputSource( new StringReader( rtnMsg )));
} catch (Exception e) {
flag = false;
}
return flag;
}
}