作者 朱兆平

rabbitMQ收发服务客户端

... ... @@ -103,35 +103,29 @@ devops:
mq:
# 从新舱单服务器读取申报业务报文发送到业务报文发送队列 {nmmsXml-mq};从回执队列{tcs-mq}读取报文保存到本地服务器回执报文接收目录
# 从tcs服务器读取海关回执报文发送到回执报文发送队列 {tcs-mq};从新舱单报文申报队列{nmmsXml-mq}读取报文保存到本地服务器申报报文发送目录
nmms:
#发送新舱单业务报文目录 [目录结尾要带/]
dir:
#报文读取目录,读取本地文件发送到MQ [目录结尾要带/]
sendXml-dir: /Users/mrz/Downloads/rdp_temp/logs/回执报文样例/
#保存从mq取到的回执报文到本地回执目录 [目录结尾要带/]
saveXml-dir: /Users/mrz/Downloads/rdp_temp/logs/transmit/
tcs:
#发送回执报文目录
sendXml-dir: /Users/mrz/Downloads/rdp_temp/logs/回执报文样例/
#保存从mq取到的新舱单业务报文到本地回执目录
saveNmmsXml-dir: /Users/mrz/Downloads/rdp_temp/logs/transmit/
queue:
#新舱单业务报文mq队列名称
nmms-mq: send
#读取回执报文mq队列名称
tcs-mq: response.agent
#发送mq队列名称
send-to-mq: send
#读取mq队列名称
read-from-mq: receive
exchange:
#TCS回执发送到此交换,交换再把回执消息同步到其他配置的回执订阅队列
tcs-ex:
#交换名称
name: cus.rcv.broadcast
#交换类型
type: fanout
#routing key名称,此处为空,所有绑定交换的队列都被广播
routing-key:
#交换名称
name: air.rcv.broadcast
#交换类型
type: fanout
#routing key名称,此处为空,所有绑定交换的队列都被广播
routing-key:
connection:
ip: 218.28.199.134
port: 8004
vHost: NMMS
vHost: AIRTRANS
username: tianbo
password: vmvnv1v2VV
# ip: 192.168.1.63
... ...
... ... @@ -4,7 +4,6 @@
*/
package com.tianbo.analysis;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
... ... @@ -14,7 +13,6 @@ import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.web.client.RestTemplate;
... ... @@ -29,10 +27,10 @@ import java.time.Duration;
@EnableDiscoveryClient
@EnableTransactionManagement
//@MapperScan("com.tianbo.analysis.dao")
public class BootApplication {
public class MQBootApplication {
public static void main(String[] args) {
SpringApplication.run(BootApplication.class, args);
SpringApplication.run(MQBootApplication.class, args);
}
@Bean
... ...
... ... @@ -63,25 +63,25 @@ public class SendXml2MqThread implements Runnable{
public void run() {
String filename = xmlfile.getName();
log.info("线程:{}开始",filename);
try{
//解析前先转发
int i = handelXmlDocument();
//发送成功删除,发送失败不管保留报文
if(i==1){
FileUtils.forceDelete(xmlfile);
if(xmlfile.canRead()){
try{
//解析前先转发
int i = handelXmlDocument();
//发送成功删除,发送失败不管保留报文
if(i==1){
FileUtils.forceDelete(xmlfile);
}
}catch (IOException ioe){
log.error("文件不存在",ioe);
ioe.printStackTrace();
} catch (Exception e){
log.error("错误的解析文件剪切失败,目标目录已存在同名文件",e);
e.printStackTrace();
}
}catch (IOException ioe){
log.error("文件不存在",ioe);
ioe.printStackTrace();
} catch (Exception e){
log.error("错误的解析文件剪切失败,目标目录已存在同名文件",e);
e.printStackTrace();
}
log.info("线程:{}结束",xmlfile.getName());
latch.countDown();
log.info("剩余线程数量{}",latch.getCount());
... ...
... ... @@ -61,9 +61,9 @@ public class GetResponse extends DefaultConsumer {
Matcher matcher = r.matcher(content);
String fileName = this.receptDir + UUID.randomUUID().toString()+".xml";
// 新舱单部署要去掉下面更改名称代码
if (matcher.find()){
fileName = this.receptDir+ matcher.group(1)+".xml";
}
// if (matcher.find()){
// fileName = this.receptDir+ matcher.group(1)+".xml";
// }
log.info("-----------{}报文保存成功----------",fileName);
File fileToDirectory = new File(fileName);
FileUtils.writeStringToFile(fileToDirectory,content,"UTF-8");
... ...
... ... @@ -22,7 +22,7 @@ import java.util.concurrent.ThreadPoolExecutor;
public class NmmsMqGetTask {
//报文保存目录
@Value("${mq.nmms.saveXml-dir}")
@Value("${mq.dir.saveXml-dir}")
private String receptDir;
@Value("${mq.connection.ip}")
... ... @@ -40,10 +40,10 @@ public class NmmsMqGetTask {
@Value("${mq.connection.password}")
private String mqPassword;
@Value("${mq.queue.tcs-mq}")
@Value("${mq.queue.read-from-mq}")
private String queueName;
@Scheduled(fixedRate = 5000)
@Scheduled(fixedRate = 5000)
public void startTask(){
... ...
... ... @@ -23,8 +23,8 @@ import java.util.concurrent.ThreadPoolExecutor;
@Component
public class NmmsMqSendTask {
//回执读取目录
@Value("${mq.nmms.sendXml-dir}")
//发送报文读取目录
@Value("${mq.dir.sendXml-dir}")
private String sendXmldir;
@Value("${mq.connection.ip}")
... ... @@ -42,15 +42,24 @@ public class NmmsMqSendTask {
@Value("${mq.connection.password}")
private String mqPassword;
@Value("${mq.queue.nmms-mq}")
@Value("${mq.queue.send-to-mq}")
private String queueName;
@Value("${mq.exchange.name}")
private String exchangeName;
@Value("${mq.exchange.type}")
private String exchangeType;
@Value("${mq.exchange.routing-key}")
private String routingName;
/**
* 线程数量
*/
private final static int theadamount = 10;
// @Scheduled(fixedRate = 5000)
@Scheduled(fixedRate = 5000)
public void startTask(){
final SimpleDateFormat sdf = new SimpleDateFormat(
... ... @@ -106,6 +115,9 @@ public class NmmsMqSendTask {
sendXml2MqThread.setMqUsername(mqUsername);
sendXml2MqThread.setMqPassword(mqPassword);
sendXml2MqThread.setQueueName(queueName);
sendXml2MqThread.setExchangeName(exchangeName);
sendXml2MqThread.setExchangeType(exchangeType);
sendXml2MqThread.setRoutingName(routingName);
threadPool.execute(sendXml2MqThread);
}catch (Exception e){
log.error("线程解析出错{}",e);
... ...
... ... @@ -19,7 +19,7 @@ import java.util.concurrent.ThreadPoolExecutor;
public class TCSMqGetTask {
//报文保存目录
@Value("${mq.tcs.saveNmmsXml-dir}")
@Value("${mq.dir.saveXml-dir}")
private String receptDir;
@Value("${mq.connection.ip}")
... ... @@ -37,8 +37,7 @@ public class TCSMqGetTask {
@Value("${mq.connection.password}")
private String mqPassword;
//这里配置新舱单的队列名称
@Value("${mq.queue.nmms-mq}")
@Value("${mq.queue.read-from-mq}")
private String queueName;
// @Scheduled(fixedRate = 5000)
... ...
... ... @@ -23,7 +23,7 @@ import java.util.concurrent.ThreadPoolExecutor;
public class TCSMqSendTask {
//回执读取目录
@Value("${mq.tcs.sendXml-dir}")
@Value("${mq.dir.sendXml-dir}")
private String sendXmldir;
@Value("${mq.connection.ip}")
... ... @@ -41,16 +41,16 @@ public class TCSMqSendTask {
@Value("${mq.connection.password}")
private String mqPassword;
@Value("${mq.queue.tcs-mq}")
@Value("${mq.queue.send-to-mq}")
private String queueName;
@Value("${mq.exchange.tcs-ex.name}")
@Value("${mq.exchange.name}")
private String exchangeName;
@Value("${mq.exchange.tcs-ex.type}")
@Value("${mq.exchange.type}")
private String exchangeType;
@Value("${mq.exchange.tcs-ex.routing-key}")
@Value("${mq.exchange.routing-key}")
private String routingName;
/**
* 线程数量
... ...