作者 朱兆平

宽松模式多线程优化

@@ -2,9 +2,13 @@ package com.sy.IMF; @@ -2,9 +2,13 @@ package com.sy.IMF;
2 2
3 import com.caac.imf.api.IMFClient; 3 import com.caac.imf.api.IMFClient;
4 import com.sy.bwAnalysis.AnalysisRoute; 4 import com.sy.bwAnalysis.AnalysisRoute;
  5 +import com.sy.utils.XMLThreadPoolFactory;
5 import org.apache.commons.lang.StringUtils; 6 import org.apache.commons.lang.StringUtils;
6 import org.apache.log4j.Logger; 7 import org.apache.log4j.Logger;
7 8
  9 +import java.util.concurrent.ThreadPoolExecutor;
  10 +import java.util.concurrent.TimeUnit;
  11 +
8 public class KAKO_Reader extends Thread{ 12 public class KAKO_Reader extends Thread{
9 protected static final Logger logger = Logger.getLogger(KAKO_Reader.class); 13 protected static final Logger logger = Logger.getLogger(KAKO_Reader.class);
10 private IMFClient client; 14 private IMFClient client;
@@ -22,13 +26,23 @@ public class KAKO_Reader extends Thread{ @@ -22,13 +26,23 @@ public class KAKO_Reader extends Thread{
22 logger.info("********读取线程状态true**********"); 26 logger.info("********读取线程状态true**********");
23 logger.info("********Client-INFO= "+client+"********"); 27 logger.info("********Client-INFO= "+client+"********");
24 Thread t =Thread.currentThread(); 28 Thread t =Thread.currentThread();
  29 + ThreadPoolExecutor threadPoolEs = XMLThreadPoolFactory.instance("kakou");
25 while(true) { 30 while(true) {
26 if (IMF_Tesk.LOGIN_OK){ 31 if (IMF_Tesk.LOGIN_OK){
27 String message = this.client.getMSG(); 32 String message = this.client.getMSG();
28 logger.info(t.toString()+"读取线程已获取到消息"); 33 logger.info(t.toString()+"读取线程已获取到消息");
29 if (message != null && StringUtils.isNotEmpty(message)) { 34 if (message != null && StringUtils.isNotEmpty(message)) {
30 // logger.info(message); 35 // logger.info(message);
31 - AnalysisRoute.analysis(message); 36 +
  37 + AnalysisRoute analysisRoute=new AnalysisRoute();
  38 + analysisRoute.setMessage(message);
  39 + threadPoolEs.execute(analysisRoute);
  40 + logger.info("[THREADPOOL-INFO]-当前运行线程总数量: " + threadPoolEs.getActiveCount());
  41 + logger.info("[THREADPOOL-INFO]-线程队列数量: " + threadPoolEs.getQueue().size());
  42 + logger.info("[THREADPOOL-INFO]-完成的线程总数量: " + threadPoolEs.getCompletedTaskCount());
  43 + logger.info("[THREADPOOL-INFO]-空闲线程释放时间(秒): " + threadPoolEs.getKeepAliveTime(TimeUnit.SECONDS));
  44 +
  45 + //AnalysisRoute.analysis(message);
32 // if(message.indexOf("<TYPE>CARM</TYPE>") > 0){ 46 // if(message.indexOf("<TYPE>CARM</TYPE>") > 0){
33 // this.client.sendMSG(); 47 // this.client.sendMSG();
34 // } 48 // }
@@ -41,7 +55,7 @@ public class KAKO_Reader extends Thread{ @@ -41,7 +55,7 @@ public class KAKO_Reader extends Thread{
41 } 55 }
42 56
43 try { 57 try {
44 - Thread.sleep(500L); 58 + Thread.sleep(200L);
45 } catch (InterruptedException var3) { 59 } catch (InterruptedException var3) {
46 logger.info("********读取线程循环获取消息异常---->"+var3.toString()); 60 logger.info("********读取线程循环获取消息异常---->"+var3.toString());
47 var3.printStackTrace(); 61 var3.printStackTrace();
@@ -15,9 +15,8 @@ import javax.annotation.PostConstruct; @@ -15,9 +15,8 @@ import javax.annotation.PostConstruct;
15 import java.util.Date; 15 import java.util.Date;
16 import java.util.List; 16 import java.util.List;
17 17
18 -  
19 @Component 18 @Component
20 -public class AnalysisRoute { 19 +public class AnalysisRoute implements Runnable{
21 20
22 @Autowired 21 @Autowired
23 private aironeExStockService exStockService; 22 private aironeExStockService exStockService;
@@ -50,14 +49,15 @@ public class AnalysisRoute { @@ -50,14 +49,15 @@ public class AnalysisRoute {
50 49
51 private static AnalysisRoute route; 50 private static AnalysisRoute route;
52 51
53 - private static MessageAnalysis analysis = new MessageAnalysis();  
54 - private static CLRAnalysis clrAnalysis = new CLRAnalysis();  
55 - private static ExStockAnalysis exStockAnalysis = new ExStockAnalysis();  
56 - private static ImStockAnalysis imStockAnalysis = new ImStockAnalysis();  
57 - private static GATAnaluysis gatAnaluysis = new GATAnaluysis();  
58 - private static GatherInfoAnalysis gatherInfoAnalysis = new GatherInfoAnalysis();  
59 - private static CommandInfoAnalysis commandInfoAnalysis = new CommandInfoAnalysis();  
60 - private static ResMessageAnalysis resMessageAnalysis = new ResMessageAnalysis(); 52 + public String getMessage() {
  53 + return message;
  54 + }
  55 +
  56 + public void setMessage(String message) {
  57 + this.message = message;
  58 + }
  59 +
  60 + private String message;
61 61
62 @PostConstruct 62 @PostConstruct
63 public void init(){ 63 public void init(){
@@ -77,7 +77,16 @@ public class AnalysisRoute { @@ -77,7 +77,16 @@ public class AnalysisRoute {
77 * 解析从IMF接收过来的报文 77 * 解析从IMF接收过来的报文
78 * @param string 报文内容 78 * @param string 报文内容
79 */ 79 */
80 - public static void analysis(String string) { 80 + public void analysis(String string) {
  81 + MessageAnalysis analysis = new MessageAnalysis();
  82 + CLRAnalysis clrAnalysis = new CLRAnalysis();
  83 + ExStockAnalysis exStockAnalysis = new ExStockAnalysis();
  84 + ImStockAnalysis imStockAnalysis = new ImStockAnalysis();
  85 + GATAnaluysis gatAnaluysis = new GATAnaluysis();
  86 + GatherInfoAnalysis gatherInfoAnalysis = new GatherInfoAnalysis();
  87 + CommandInfoAnalysis commandInfoAnalysis = new CommandInfoAnalysis();
  88 + ResMessageAnalysis resMessageAnalysis = new ResMessageAnalysis();
  89 +
81 PropertyConfigurator.configure("config/log4j.properties"); 90 PropertyConfigurator.configure("config/log4j.properties");
82 string = string.replace("Msg","MSG"); 91 string = string.replace("Msg","MSG");
83 Message message = analysis.readTicketsXml(string);; 92 Message message = analysis.readTicketsXml(string);;
@@ -199,6 +208,10 @@ public class AnalysisRoute { @@ -199,6 +208,10 @@ public class AnalysisRoute {
199 } 208 }
200 209
201 210
  211 + @Override
  212 + public void run() {
  213 + analysis(message);
  214 + }
202 } 215 }
203 216
204 217
@@ -7,7 +7,6 @@ import com.thoughtworks.xstream.io.xml.XmlFriendlyNameCoder; @@ -7,7 +7,6 @@ import com.thoughtworks.xstream.io.xml.XmlFriendlyNameCoder;
7 7
8 public class MessageAnalysis { 8 public class MessageAnalysis {
9 9
10 - public static Message message = null;  
11 10
12 public Message readTicketsXml(String str) { 11 public Message readTicketsXml(String str) {
13 return getMessage(str); 12 return getMessage(str);
@@ -20,8 +19,7 @@ public class MessageAnalysis { @@ -20,8 +19,7 @@ public class MessageAnalysis {
20 XStream.setupDefaultSecurity(xstream); 19 XStream.setupDefaultSecurity(xstream);
21 //对xstream对象设置默认的安全防护时,允许设置类 20 //对xstream对象设置默认的安全防护时,允许设置类
22 xstream.allowTypes(new Class[]{Message.class}); 21 xstream.allowTypes(new Class[]{Message.class});
23 - message = (Message) xstream.fromXML(str);  
24 - return message; 22 + return (Message) xstream.fromXML(str);
25 } 23 }
26 24
27 25
  1 +package com.sy.utils;
  2 +
  3 +
  4 +import java.util.ArrayList;
  5 +import java.util.Date;
  6 +import java.util.Iterator;
  7 +import java.util.List;
  8 +import java.util.concurrent.ThreadFactory;
  9 +
  10 +public class XMLThreadFactory implements ThreadFactory {
  11 +
  12 + private int counter;
  13 + private String name;
  14 + private List<String> stats;
  15 +
  16 + public XMLThreadFactory(String name)
  17 + {
  18 + counter = 1;
  19 + this.name = name;
  20 + stats = new ArrayList<String>();
  21 + }
  22 +
  23 + @Override
  24 + public Thread newThread(Runnable runnable)
  25 + {
  26 + Thread t = new Thread(runnable, name + "-Thread_" + counter);
  27 + counter++;
  28 + stats.add(String.format("Created thread %d with name %s on %s \n", t.getId(), t.getName(), new Date()));
  29 + //log.info("Created thread id: {} with name {} on {} \n", t.getId(), t.getName(), new Date());
  30 + //log.info(getStats());
  31 + return t;
  32 + }
  33 +
  34 + public String getStats()
  35 + {
  36 + StringBuffer buffer = new StringBuffer();
  37 + Iterator<String> it = stats.iterator();
  38 + while (it.hasNext())
  39 + {
  40 + buffer.append(it.next());
  41 + }
  42 + return buffer.toString();
  43 + }
  44 +
  45 +}
  1 +package com.sy.utils;
  2 +
  3 +import java.util.concurrent.LinkedBlockingQueue;
  4 +import java.util.concurrent.ThreadPoolExecutor;
  5 +import java.util.concurrent.TimeUnit;
  6 +
  7 +public class XMLThreadPoolFactory {
  8 +
  9 + private static ThreadPoolExecutor threadPool;
  10 +
  11 + public static ThreadPoolExecutor instance(String busstype){
  12 + if (threadPool==null){
  13 + XMLThreadFactory xmlThreadFactory = new XMLThreadFactory(busstype);
  14 + threadPool = new ThreadPoolExecutor(50, 200,
  15 + 1L, TimeUnit.SECONDS,
  16 + new LinkedBlockingQueue<Runnable>(4000),
  17 + xmlThreadFactory,
  18 + new ThreadPoolExecutor.AbortPolicy());
  19 +
  20 + }
  21 + return threadPool;
  22 + }
  23 +}
@@ -2,21 +2,26 @@ package com.sy; @@ -2,21 +2,26 @@ package com.sy;
2 2
3 import com.sy.bwAnalysis.AnalysisRoute; 3 import com.sy.bwAnalysis.AnalysisRoute;
4 import com.sy.logic.LiftBar; 4 import com.sy.logic.LiftBar;
  5 +import com.sy.utils.XMLThreadPoolFactory;
5 import org.junit.Test; 6 import org.junit.Test;
6 import org.junit.runner.RunWith; 7 import org.junit.runner.RunWith;
7 import org.springframework.boot.test.context.SpringBootTest; 8 import org.springframework.boot.test.context.SpringBootTest;
8 import org.springframework.test.context.junit4.SpringRunner; 9 import org.springframework.test.context.junit4.SpringRunner;
9 10
  11 +import java.util.concurrent.ThreadPoolExecutor;
  12 +
10 @RunWith(SpringRunner.class) 13 @RunWith(SpringRunner.class)
11 @SpringBootTest 14 @SpringBootTest
12 public class AnalysisImfApplicationTests { 15 public class AnalysisImfApplicationTests {
13 16
14 private static String message = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + 17 private static String message = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
15 "<MSG><META><SNDR>KAO</SNDR><RCVR></RCVR><SEQN>20220117192051</SEQN><DDTM>20220117192051</DDTM><TYPE>KAKO</TYPE><STYP>CARM</STYP></META><COMMAND_INFO AREA_ID=\"4604499001\" CHNL_NO=\"4604444412\" I_E_TYPE=\"E\" SEQ_NO=\"20220117192014000017\"><CHECK_RESULT>00000000100000000000</CHECK_RESULT><OP_HINT>直接放行</OP_HINT><SEAL><ESEAL_ID>CNSM2816998419</ESEAL_ID><SEAL_KEY>1234567890</SEAL_KEY><OPEN_TIMES/><ESEAL_IC_NO/></SEAL><SZ_MSG/></COMMAND_INFO></MSG>"; 18 "<MSG><META><SNDR>KAO</SNDR><RCVR></RCVR><SEQN>20220117192051</SEQN><DDTM>20220117192051</DDTM><TYPE>KAKO</TYPE><STYP>CARM</STYP></META><COMMAND_INFO AREA_ID=\"4604499001\" CHNL_NO=\"4604444412\" I_E_TYPE=\"E\" SEQ_NO=\"20220117192014000017\"><CHECK_RESULT>00000000100000000000</CHECK_RESULT><OP_HINT>直接放行</OP_HINT><SEAL><ESEAL_ID>CNSM2816998419</ESEAL_ID><SEAL_KEY>1234567890</SEAL_KEY><OPEN_TIMES/><ESEAL_IC_NO/></SEAL><SZ_MSG/></COMMAND_INFO></MSG>";
16 - 19 + private static ThreadPoolExecutor threadPoolEs = XMLThreadPoolFactory.instance("kakou");
  20 + private AnalysisRoute analysisRoute=new AnalysisRoute();
17 @Test 21 @Test
18 public void contextLoads() { 22 public void contextLoads() {
19 - AnalysisRoute.analysis(message); 23 + analysisRoute.setMessage(message);
  24 + threadPoolEs.execute(analysisRoute);
20 25
21 LiftBar.sendData("1","aaa",true); 26 LiftBar.sendData("1","aaa",true);
22 27
@@ -2,18 +2,26 @@ package com.sy; @@ -2,18 +2,26 @@ package com.sy;
2 2
3 import com.sy.bwAnalysis.AnalysisRoute; 3 import com.sy.bwAnalysis.AnalysisRoute;
4 import com.sy.logic.LiftBar; 4 import com.sy.logic.LiftBar;
  5 +import com.sy.utils.XMLThreadPoolFactory;
5 import org.apache.log4j.Logger; 6 import org.apache.log4j.Logger;
6 import org.junit.Test; 7 import org.junit.Test;
7 import org.junit.runner.RunWith; 8 import org.junit.runner.RunWith;
8 import org.springframework.boot.test.context.SpringBootTest; 9 import org.springframework.boot.test.context.SpringBootTest;
9 import org.springframework.test.context.junit4.SpringRunner; 10 import org.springframework.test.context.junit4.SpringRunner;
10 11
  12 +import java.util.concurrent.ThreadPoolExecutor;
  13 +import java.util.concurrent.TimeUnit;
  14 +
11 @RunWith(SpringRunner.class) 15 @RunWith(SpringRunner.class)
12 @SpringBootTest 16 @SpringBootTest
13 public class BeihuoTests { 17 public class BeihuoTests {
14 18
  19 +
15 private static final Logger logger = Logger.getLogger(LiftBar.class); 20 private static final Logger logger = Logger.getLogger(LiftBar.class);
16 21
  22 + private static ThreadPoolExecutor threadPoolEs = XMLThreadPoolFactory.instance("kakou");
  23 + private AnalysisRoute analysisRoute=new AnalysisRoute();
  24 +
17 //车牌号 25 //车牌号
18 private String voNo = "豫A61CR7"; 26 private String voNo = "豫A61CR7";
19 //备案重量 27 //备案重量
@@ -98,7 +106,9 @@ public class BeihuoTests { @@ -98,7 +106,9 @@ public class BeihuoTests {
98 .replace("${IETYPE}",ie) 106 .replace("${IETYPE}",ie)
99 .replace("${barcode}",barcode); 107 .replace("${barcode}",barcode);
100 //进港提货测试 108 //进港提货测试
101 - AnalysisRoute.analysis(IMPORT_XML); 109 +
  110 + analysisRoute.setMessage(IMPORT_XML);
  111 +
102 } 112 }
103 113
104 /** 114 /**
@@ -129,7 +139,14 @@ public class BeihuoTests { @@ -129,7 +139,14 @@ public class BeihuoTests {
129 .replace("${IETYPE}",ie) 139 .replace("${IETYPE}",ie)
130 .replace("${barcode}",barcode); 140 .replace("${barcode}",barcode);
131 //进港提货测试 141 //进港提货测试
132 - AnalysisRoute.analysis(IMPORT_XML); 142 + analysisRoute.setMessage(IMPORT_XML);
  143 + threadPoolEs.execute(analysisRoute);
  144 +
  145 + System.out.println("[THREAD-INFI]-线程运行线程总数量 = " + threadPoolEs.getActiveCount());
  146 + System.out.println("[THREAD-INFI]-线程队列数量 = " + threadPoolEs.getQueue().size());
  147 + System.out.println("threadPoolEs.getCompletedTaskCount() = " + threadPoolEs.getCompletedTaskCount());
  148 + System.out.println("threadPoolEs.getKeepAliveTime(TimeUnit.SECONDS) = " + threadPoolEs.getKeepAliveTime(TimeUnit.SECONDS));
  149 + System.out.println("threadPoolEs.getTaskCount() = " + threadPoolEs.getTaskCount());
133 } 150 }
134 151
135 152