package com.tianbo.analysis.task; import com.tianbo.analysis.handle.SendXml2MqThread; import com.tianbo.util.IO.FileTool; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.io.File; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; /** * TCS回执发送 * 部署到tcs服务器 读取回执目录的XML,发送到回执报文队列response */ @Slf4j @Component public class TCSMqSendTask { //回执读取目录 @Value("${mq.dir.sendXml-dir}") private String sendXmldir; @Value("${mq.connection.ip}") private String mqIp; @Value("${mq.connection.port}") private int mqPort; @Value("${mq.connection.vHost}") private String mqVhost; @Value("${mq.connection.username}") private String mqUsername; @Value("${mq.connection.password}") private String mqPassword; @Value("${mq.queue.send-to-mq}") private String queueName; @Value("${mq.exchange.name}") private String exchangeName; @Value("${mq.exchange.type}") private String exchangeType; @Value("${mq.exchange.routing-key}") private String routingName; /** * 线程数量 */ private final static int theadamount = 10; @Scheduled(fixedRate = 5000) public void startTask(){ final SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss"); final String startTime = sdf.format(new Date()); //发送报文目录 String readDir = sendXmldir; //初始化线程池 ThreadPoolExecutor threadPool = XMLThreadPoolFactory.instance(); try { File fileDirectory = new File(readDir); List<File> files = FileTool.readDirectoryFiles(fileDirectory); //文件数量大于50个,每次只解析前50个 if (files!=null && !files.isEmpty() && files.size()>theadamount){ CountDownLatch latch = new CountDownLatch(theadamount); log.trace("发送任务开始{},剩余处理文件数量:{}",startTime,files.size()); for (int i=0;i<theadamount;i++){ threadJbob(files.get(i),latch,"transToCfps",threadPool); } latch.await(); } //文件数量小于50个,全部一次解析完 else if (files!=null && !files.isEmpty() && files.size()<theadamount){ CountDownLatch latch = new CountDownLatch(files.size()); log.trace("发送任务开始{},剩余处理文件数量文件数量:{}",startTime,files.size()); for (int i=0;i<files.size();i++){ threadJbob(files.get(i),latch,"transToCfps",threadPool); } latch.await(); } }catch (Exception e){ e.printStackTrace(); log.error("获取目录文件出错",e); } log.info("发送任务结束{}",sdf.format(new Date())); } private void threadJbob(File file,CountDownLatch latch,String transToCfps,ThreadPoolExecutor threadPool){ try{ SendXml2MqThread sendXml2MqThread = new SendXml2MqThread(); sendXml2MqThread.setXmlfile(file); sendXml2MqThread.setLatch(latch); sendXml2MqThread.setMqIp(mqIp); sendXml2MqThread.setMqPort(mqPort); sendXml2MqThread.setMqVhost(mqVhost); sendXml2MqThread.setMqUsername(mqUsername); sendXml2MqThread.setMqPassword(mqPassword); sendXml2MqThread.setQueueName(queueName); sendXml2MqThread.setExchangeName(exchangeName); sendXml2MqThread.setExchangeType(exchangeType); sendXml2MqThread.setRoutingName(routingName); threadPool.execute(sendXml2MqThread); }catch (Exception e){ log.error("线程解析出错{}",e); } } }