审查视图

src/main/java/com/tianbo/analysis/task/TCSMqSendTask.java 4.1 KB
朱兆平 authored
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
package com.tianbo.analysis.task;

import com.tianbo.analysis.handle.SendXml2MqThread;
import com.tianbo.util.IO.FileTool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * TCS回执发送
 * 部署到tcs服务器 读取回执目录的XML,发送到回执报文队列response
 */
@Slf4j
@Component
public class TCSMqSendTask {

    //回执读取目录
26
    @Value("${mq.dir.sendXml-dir}")
朱兆平 authored
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
    private  String sendXmldir;

    @Value("${mq.connection.ip}")
    private  String mqIp;

    @Value("${mq.connection.port}")
    private  int mqPort;

    @Value("${mq.connection.vHost}")
    private  String mqVhost;

    @Value("${mq.connection.username}")
    private  String mqUsername;

    @Value("${mq.connection.password}")
    private  String mqPassword;
44
    @Value("${mq.queue.send-to-mq}")
朱兆平 authored
45 46
    private String queueName;
47
    @Value("${mq.exchange.name}")
朱兆平 authored
48 49
    private String exchangeName;
50
    @Value("${mq.exchange.type}")
朱兆平 authored
51 52
    private String exchangeType;
53
    @Value("${mq.exchange.routing-key}")
朱兆平 authored
54 55 56 57 58 59
    private String routingName;
    /**
     * 线程数量
     */
    private final static int theadamount = 10;
60
    @Scheduled(fixedRate = 5000)
朱兆平 authored
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
    public void startTask(){

        final SimpleDateFormat sdf = new SimpleDateFormat(
                "yyyy-MM-dd HH:mm:ss");

        final  String startTime = sdf.format(new Date());

        //发送报文目录
        String readDir = sendXmldir;

        //初始化线程池
        ThreadPoolExecutor threadPool = XMLThreadPoolFactory.instance();

        try {
            File fileDirectory = new File(readDir);
            List<File> files = FileTool.readDirectoryFiles(fileDirectory);
            //文件数量大于50个,每次只解析前50个
            if (files!=null && !files.isEmpty() && files.size()>theadamount){
                CountDownLatch latch = new CountDownLatch(theadamount);
                log.trace("发送任务开始{},剩余处理文件数量:{}",startTime,files.size());
                for (int i=0;i<theadamount;i++){
                    threadJbob(files.get(i),latch,"transToCfps",threadPool);
                }
                latch.await();
            }
            //文件数量小于50个,全部一次解析完
            else if (files!=null && !files.isEmpty() && files.size()<theadamount){
                CountDownLatch latch = new CountDownLatch(files.size());
                log.trace("发送任务开始{},剩余处理文件数量文件数量:{}",startTime,files.size());
                for (int i=0;i<files.size();i++){
                    threadJbob(files.get(i),latch,"transToCfps",threadPool);
                }
                latch.await();
            }

        }catch (Exception e){
            e.printStackTrace();
            log.error("获取目录文件出错",e);
        }

        log.info("发送任务结束{}",sdf.format(new Date()));

    }

    private void threadJbob(File file,CountDownLatch latch,String transToCfps,ThreadPoolExecutor threadPool){
        try{
            SendXml2MqThread sendXml2MqThread = new SendXml2MqThread();
            sendXml2MqThread.setXmlfile(file);
            sendXml2MqThread.setLatch(latch);
            sendXml2MqThread.setMqIp(mqIp);
            sendXml2MqThread.setMqPort(mqPort);
            sendXml2MqThread.setMqVhost(mqVhost);
            sendXml2MqThread.setMqUsername(mqUsername);
            sendXml2MqThread.setMqPassword(mqPassword);
            sendXml2MqThread.setQueueName(queueName);
            sendXml2MqThread.setExchangeName(exchangeName);
            sendXml2MqThread.setExchangeType(exchangeType);
            sendXml2MqThread.setRoutingName(routingName);
            threadPool.execute(sendXml2MqThread);
        }catch (Exception e){
            log.error("线程解析出错{}",e);
        }

    }

}