作者 朱兆平

舱单收发MQ节点

  1 +package com.tianbo.analysis.task;
  2 +
  3 +import com.tianbo.analysis.handle.CustomXmlHandleThread;
  4 +import com.tianbo.analysis.rabbitmq.ReadMessage;
  5 +import com.tianbo.util.Date.DateUtil;
  6 +import com.tianbo.util.IO.FileTool;
  7 +import lombok.extern.slf4j.Slf4j;
  8 +import org.springframework.beans.factory.annotation.Value;
  9 +import org.springframework.scheduling.annotation.Scheduled;
  10 +import org.springframework.stereotype.Component;
  11 +
  12 +import java.io.File;
  13 +import java.text.SimpleDateFormat;
  14 +import java.util.Date;
  15 +import java.util.List;
  16 +import java.util.concurrent.CountDownLatch;
  17 +import java.util.concurrent.ThreadPoolExecutor;
  18 +
  19 +/**
  20 + * 新舱单从mq服务器的tcs回执队列中获取回执报文并落地保存
  21 + * 回执解析定时任务
  22 + * 从mq获取报文 落地保存
  23 + */
  24 +@Slf4j
  25 +@Component
  26 +public class NmmsMqGetTask {
  27 +
  28 + //报文保存目录
  29 + @Value("${mq.nmms.saveXml-dir}")
  30 + private String receptDir;
  31 +
  32 + @Value("${mq.connection.ip}")
  33 + private String mqIp;
  34 +
  35 + @Value("${mq.connection.port}")
  36 + private int mqPort;
  37 +
  38 + @Value("${mq.connection.vHost}")
  39 + private String mqVhost;
  40 +
  41 + @Value("${mq.connection.username}")
  42 + private String mqUsername;
  43 +
  44 + @Value("${mq.connection.password}")
  45 + private String mqPassword;
  46 +
  47 + @Value("${mq.queue.tcs-mq}")
  48 + private String queueName;
  49 +
  50 + // @Scheduled(fixedRate = 5000)
  51 + public void startTask(){
  52 +
  53 +
  54 + final SimpleDateFormat sdf = new SimpleDateFormat(
  55 + "yyyy-MM-dd HH:mm:ss");
  56 +
  57 + final String startTime = sdf.format(new Date());
  58 +
  59 + //设置转发文件夹
  60 + String today = DateUtil.getTodayBy_yyyyMMdd();
  61 + //回执目录
  62 + String readDir = receptDir;
  63 +
  64 + //初始化线程池
  65 + ThreadPoolExecutor threadPool = XMLThreadPoolFactory.instance();
  66 +
  67 + try {
  68 + //从mq获取回执
  69 + ReadMessage readMessage = new ReadMessage();
  70 + readMessage.setMqIp(mqIp);
  71 + readMessage.setMqPort(mqPort);
  72 + readMessage.setMqVhost(mqVhost);
  73 + readMessage.setMqUsername(mqUsername);
  74 + readMessage.setMqPassword(mqPassword);
  75 + readMessage.setQueueName(queueName);
  76 + if(!"runing".equals(ReadMessage.status)){
  77 + readMessage.getResponseFromMq(readDir);
  78 + }
  79 +
  80 + }catch (Exception e){
  81 + e.printStackTrace();
  82 + log.error("获取目录文件出错",e);
  83 + }
  84 +
  85 + log.info("解析任务结束{}",sdf.format(new Date()));
  86 +
  87 + }
  88 +
  89 +}
  1 +package com.tianbo.analysis.task;
  2 +
  3 +import com.tianbo.analysis.handle.CustomXmlHandleThread;
  4 +import com.tianbo.analysis.handle.SendXml2MqThread;
  5 +import com.tianbo.util.Date.DateUtil;
  6 +import com.tianbo.util.IO.FileTool;
  7 +import lombok.extern.slf4j.Slf4j;
  8 +import org.springframework.beans.factory.annotation.Value;
  9 +import org.springframework.scheduling.annotation.Scheduled;
  10 +import org.springframework.stereotype.Component;
  11 +
  12 +import java.io.File;
  13 +import java.text.SimpleDateFormat;
  14 +import java.util.Date;
  15 +import java.util.List;
  16 +import java.util.concurrent.CountDownLatch;
  17 +import java.util.concurrent.ThreadPoolExecutor;
  18 +
  19 +/**
  20 + * 61回执解析定时任务
  21 + * 部署到61服务器 读取报文发送目录的XML,发送到nmms的新增send报文队列
  22 + */
  23 +@Slf4j
  24 +@Component
  25 +public class NmmsMqSendTask {
  26 +
  27 + //回执读取目录
  28 + @Value("${mq.nmms.sendXml-dir}")
  29 + private String sendXmldir;
  30 +
  31 + @Value("${mq.connection.ip}")
  32 + private String mqIp;
  33 +
  34 + @Value("${mq.connection.port}")
  35 + private int mqPort;
  36 +
  37 + @Value("${mq.connection.vHost}")
  38 + private String mqVhost;
  39 +
  40 + @Value("${mq.connection.username}")
  41 + private String mqUsername;
  42 +
  43 + @Value("${mq.connection.password}")
  44 + private String mqPassword;
  45 +
  46 + @Value("${mq.queue.nmms-mq}")
  47 + private String queueName;
  48 +
  49 + /**
  50 + * 线程数量
  51 + */
  52 + private final static int theadamount = 10;
  53 +
  54 + // @Scheduled(fixedRate = 5000)
  55 + public void startTask(){
  56 +
  57 + final SimpleDateFormat sdf = new SimpleDateFormat(
  58 + "yyyy-MM-dd HH:mm:ss");
  59 +
  60 + final String startTime = sdf.format(new Date());
  61 +
  62 + //发送报文目录
  63 + String readDir = sendXmldir;
  64 +
  65 + //初始化线程池
  66 + ThreadPoolExecutor threadPool = XMLThreadPoolFactory.instance();
  67 +
  68 + try {
  69 + File fileDirectory = new File(readDir);
  70 + List<File> files = FileTool.readDirectoryFiles(fileDirectory);
  71 + //文件数量大于50个,每次只解析前50个
  72 + if (files!=null && !files.isEmpty() && files.size()>theadamount){
  73 + CountDownLatch latch = new CountDownLatch(theadamount);
  74 + log.trace("发送舱单报文任务开始{},剩余处理文件数量:{}",startTime,files.size());
  75 + for (int i=0;i<theadamount;i++){
  76 + threadJbob(files.get(i),latch,"transToCfps",threadPool);
  77 + }
  78 + latch.await();
  79 + }
  80 + //文件数量小于50个,全部一次解析完
  81 + else if (files!=null && !files.isEmpty() && files.size()<theadamount){
  82 + CountDownLatch latch = new CountDownLatch(files.size());
  83 + log.trace("发送舱单报文任务开始{},剩余处理文件数量文件数量:{}",startTime,files.size());
  84 + for (int i=0;i<files.size();i++){
  85 + threadJbob(files.get(i),latch,"transToCfps",threadPool);
  86 + }
  87 + latch.await();
  88 + }
  89 +
  90 + }catch (Exception e){
  91 + e.printStackTrace();
  92 + log.error("获取目录文件出错",e);
  93 + }
  94 +
  95 + log.info("发送舱单报文任务结束{}",sdf.format(new Date()));
  96 +
  97 + }
  98 +
  99 + private void threadJbob(File file,CountDownLatch latch,String transToCfps,ThreadPoolExecutor threadPool){
  100 + try{
  101 + SendXml2MqThread sendXml2MqThread = new SendXml2MqThread();
  102 + sendXml2MqThread.setXmlfile(file);
  103 + sendXml2MqThread.setLatch(latch);
  104 + sendXml2MqThread.setMqIp(mqIp);
  105 + sendXml2MqThread.setMqPort(mqPort);
  106 + sendXml2MqThread.setMqVhost(mqVhost);
  107 + sendXml2MqThread.setMqUsername(mqUsername);
  108 + sendXml2MqThread.setMqPassword(mqPassword);
  109 + sendXml2MqThread.setQueueName(queueName);
  110 + threadPool.execute(sendXml2MqThread);
  111 + }catch (Exception e){
  112 + log.error("线程解析出错{}",e);
  113 + }
  114 +
  115 + }
  116 +
  117 +}