NmmsMqSendTask.java
4.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.tianbo.analysis.task;
import com.tianbo.analysis.handle.SendXml2MqThread;
import com.tianbo.util.Date.DateUtil;
import com.tianbo.util.IO.FileTool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 61回执解析定时任务
* 部署到61服务器 读取报文发送目录的XML,发送到nmms的新增send报文队列
*/
@Slf4j
@Component
public class NmmsMqSendTask {
//发送报文读取目录
@Value("${mq.dir.sendXml-dir}")
private String sendXmldir;
@Value("${mq.connection.ip}")
private String mqIp;
@Value("${mq.connection.port}")
private int mqPort;
@Value("${mq.connection.vHost}")
private String mqVhost;
@Value("${mq.connection.username}")
private String mqUsername;
@Value("${mq.connection.password}")
private String mqPassword;
@Value("${mq.queue.send-to-mq}")
private String queueName;
@Value("${mq.exchange.name}")
private String exchangeName;
@Value("${mq.exchange.type}")
private String exchangeType;
@Value("${mq.exchange.routing-key}")
private String routingName;
/**
* 线程数量
*/
private final static int theadamount = 10;
@Scheduled(fixedRate = 5000)
public void startTask(){
final SimpleDateFormat sdf = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
final String startTime = sdf.format(new Date());
//发送报文目录
String readDir = sendXmldir;
//初始化线程池
ThreadPoolExecutor threadPool = XMLThreadPoolFactory.instance();
try {
File fileDirectory = new File(readDir);
List<File> files = FileTool.readDirectoryFiles(fileDirectory);
//文件数量大于50个,每次只解析前50个
if (files!=null && !files.isEmpty() && files.size()>theadamount){
CountDownLatch latch = new CountDownLatch(theadamount);
log.trace("发送舱单报文任务开始{},剩余处理文件数量:{}",startTime,files.size());
for (int i=0;i<theadamount;i++){
threadJbob(files.get(i),latch,"transToCfps",threadPool);
}
latch.await();
}
//文件数量小于50个,全部一次解析完
else if (files!=null && !files.isEmpty() && files.size()<theadamount){
CountDownLatch latch = new CountDownLatch(files.size());
log.trace("发送舱单报文任务开始{},剩余处理文件数量文件数量:{}",startTime,files.size());
for (int i=0;i<files.size();i++){
threadJbob(files.get(i),latch,"transToCfps",threadPool);
}
latch.await();
}
}catch (Exception e){
e.printStackTrace();
log.error("获取目录文件出错",e);
}
log.info("发送舱单报文任务结束{}",sdf.format(new Date()));
}
private void threadJbob(File file,CountDownLatch latch,String transToCfps,ThreadPoolExecutor threadPool){
try{
SendXml2MqThread sendXml2MqThread = new SendXml2MqThread();
sendXml2MqThread.setXmlfile(file);
sendXml2MqThread.setLatch(latch);
sendXml2MqThread.setMqIp(mqIp);
sendXml2MqThread.setMqPort(mqPort);
sendXml2MqThread.setMqVhost(mqVhost);
sendXml2MqThread.setMqUsername(mqUsername);
sendXml2MqThread.setMqPassword(mqPassword);
sendXml2MqThread.setQueueName(queueName);
sendXml2MqThread.setExchangeName(exchangeName);
sendXml2MqThread.setExchangeType(exchangeType);
sendXml2MqThread.setRoutingName(routingName);
threadPool.execute(sendXml2MqThread);
}catch (Exception e){
log.error("线程解析出错{}",e);
}
}
}