package com.tianbo.analysis.task; import com.tianbo.util.Date.DateUtil; import com.tianbo.util.IO.FileTool; import com.tianbo.analysis.handle.CustomXmlHandleThread; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.concurrent.*; /** * 回执解析定时任务 */ @Slf4j @Component public class TaskAnalysis { //回执读取目录 @Value("${custom.receptDirectory}") private String receptDir; //回执转发目录 @Value("${custom.transmitDir}") private String transmitDir; /** * 线程数量 */ private final static int theadamount = 50; @Scheduled(fixedRate = 5000) public void startTask(){ final SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss"); final String startTime = sdf.format(new Date()); //设置转发文件夹 String today = DateUtil.getTodayBy_yyyyMMdd(); String transToCfps = transmitDir + "/" + today; //回执目录 String readDir = receptDir; //初始化线程池 ThreadPoolExecutor threadPool = XMLThreadPoolFactory.instance(); try { File fileDirectory = new File(readDir); List<File> files = FileTool.readDirectoryFiles(fileDirectory); //文件数量大于50个,每次只解析前50个 if (files!=null && !files.isEmpty() && files.size()>theadamount){ CountDownLatch latch = new CountDownLatch(theadamount); log.trace("解析任务开始{},剩余处理文件数量:{}",startTime,files.size()); for (int i=0;i<theadamount;i++){ threadJbob(files.get(i),latch,transToCfps,threadPool); } latch.await(); } //文件数量小于50个,全部一次解析完 else if (files!=null && !files.isEmpty() && files.size()<theadamount){ CountDownLatch latch = new CountDownLatch(files.size()); log.info("解析任务开始{},剩余处理文件数量文件数量:{}",startTime,files.size()); for (int i=0;i<files.size();i++){ threadJbob(files.get(i),latch,transToCfps,threadPool); } latch.await(); } // Iterator<File> it = files.iterator(); // while (it.hasNext()) { // if (threadPool.getActiveCount() < 100) { // File file = it.next(); // try { // FileUtils.copyFileToDirectory(file, new File(transToCfps)); // } catch (IOException e) { // e.printStackTrace(); // log.error("复制报文{}到回执转发目录失败", file.getName()); // } // // try { // CustomXmlHandleThread customXmlHandleThread = new CustomXmlHandleThread(); // customXmlHandleThread.setXmlfile(file); // threadPool.execute(customXmlHandleThread); // } catch (Exception e) { // e.printStackTrace(); // log.error("解析回执出错", e); // } // } // } }catch (Exception e){ e.printStackTrace(); log.error("获取目录文件出错",e); } log.info("解析任务结束{}",sdf.format(new Date())); } private void threadJbob(File file,CountDownLatch latch,String transToCfps,ThreadPoolExecutor threadPool){ try{ FileUtils.copyFileToDirectory(file, new File(transToCfps)); CustomXmlHandleThread customXmlHandleThread = new CustomXmlHandleThread(); customXmlHandleThread.setXmlfile(file); customXmlHandleThread.setLatch(latch); threadPool.execute(customXmlHandleThread); }catch (IOException e){ log.error("备份文件{}出错,错误代码:{}",file,e); }catch (Exception e){ log.error("线程解析出错{}",e); } } }