作者 朱兆平

优化多线程全部业务ok

package com.tianbo.analysis.bean;
import com.netflix.discovery.converters.Auto;
import com.tianbo.analysis.dao.*;
import com.tianbo.analysis.service.CustomMessageService;
import com.tianbo.analysis.service.SendLogService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
... ... @@ -48,6 +50,12 @@ public class WlptBaseModel {
@Autowired
public TALLYSECONDARYMapper tallysecondaryMapper;
@Autowired
public CUSTOMSMESSAGEMapper customsmessageMapper;
@Autowired
public CustomMessageService customMessageService;
public WlptBaseModel() {
//处理多线程时 springboot 无法注入bean的问题
if (allocatearrivalMapper==null){
... ... @@ -89,6 +97,13 @@ public class WlptBaseModel {
if(tallysecondaryMapper==null){
tallysecondaryMapper = SpringBeanUtitl.getBean(TALLYSECONDARYMapper.class);
}
if(customsmessageMapper==null){
customsmessageMapper = SpringBeanUtitl.getBean(CUSTOMSMESSAGEMapper.class);
}
if(customMessageService==null){
customMessageService = SpringBeanUtitl.getBean(CustomMessageService.class);
}
}
public static void init(){
... ...
package com.tianbo.analysis.dao;
import com.tianbo.analysis.model.CUSTOMSMESSAGE;
import com.tianbo.analysis.model.CUSTOMSMESSAGEWithBLOBs;
import java.util.List;
public interface CUSTOMSMESSAGEMapper {
int insert(CUSTOMSMESSAGEWithBLOBs record);
int insertSelective(CUSTOMSMESSAGEWithBLOBs record);
CUSTOMSMESSAGEWithBLOBs selectMessage(String messageId);
List<CUSTOMSMESSAGEWithBLOBs> selectMessage(String messageId);
int updateMessage(CUSTOMSMESSAGEWithBLOBs record);
int updateMessageByMessageid(CUSTOMSMESSAGE record);
}
\ No newline at end of file
... ...
package com.tianbo.analysis.handle;
import com.tianbo.analysis.model.*;
import com.tianbo.analysis.service.CustomMessageService;
import com.tianbo.analysis.tools.AWBTools;
import com.tianbo.util.Date.DateUtil;
import com.tianbo.util.XML.XMLXPath;
... ... @@ -43,6 +44,7 @@ public class CustomXmlHandleThread implements Runnable{
private String errBakDir;
private File xmlfile;
private static CustomXmlHandleThread customXmlHandle;
//海关新舱单回执报头
public final static String MessageID = "//Manifest/Head/MessageID";
... ... @@ -60,8 +62,8 @@ public class CustomXmlHandleThread implements Runnable{
public final static String JourneyID = "//Manifest/Response/BorderTransportMeans/JourneyID";
public final static String WaybillMaster = "//Manifest/Response/Consignment/TransportContractDocument/ID";
public final static String WaybillSecond = "//Manifest/Response/Consignment/AssociatedTransportDocument/ID";
public final static String ResponseCode = "//Manifest/Response/Consignment/ResponseType/Code";
public final static String ResponseText = "//Manifest/Response/Consignment/ResponseType/Text";
public final static String ResponseCode = "//ResponseType/Code";
public final static String ResponseText = "//ResponseType/Text";
/**
* 海关国际转运业务回执报体
... ... @@ -70,8 +72,10 @@ public class CustomXmlHandleThread implements Runnable{
public final static String ImportWaybillMaster = "//Manifest/Response/ImportInformation/Consignment/TransportContractDocument/ID";
public final static String ExportJourneyID = "//Manifest/Response/ExportInformation/BorderTransportMeans/JourneyID";
public final static String ExportWaybillMaster = "//Manifest/Response/ExportInformation/Consignment/TransportContractDocument/ID";
public final static String TransResponseCode = "//Manifest/Response/ResponseType/Code";
public final static String TransResponseText = "//Manifest/Response/ResponseType/Text";
// public final static String TransResponseCode = "//Manifest/Response/ResponseType/Code";
// public final static String TransResponseText = "//Manifest/Response/ResponseType/Text";
public final static String TransResponseCode = "//ResponseType/Code";
public final static String TransResponseText = "//ResponseType/Text";
//通过@PostConstruct实现初始化bean之前进行的操作,解决service调用空指针问题
... ... @@ -96,7 +100,6 @@ public class CustomXmlHandleThread implements Runnable{
//操作成功,则转移剪切解析文件到备份目录,否则转移到error目录备份
if(i>0){
File bakupDirectory = new File(backdireByDay);
//复制一份到转发目录
//解析成功备份一份到备份目录
FileUtils.moveFileToDirectory(xmlfile,bakupDirectory,true);
}else {
... ... @@ -113,7 +116,6 @@ public class CustomXmlHandleThread implements Runnable{
log.info("线程:{}结束",xmlfile.getName());
latch.countDown();
log.info("剩余线程数量{}",latch.getCount());
}
/**
*
... ... @@ -132,24 +134,14 @@ public class CustomXmlHandleThread implements Runnable{
Document document = saxReader.read(xmlfile);
Element contentRoot = document.getRootElement();
String flightNo = "";
String flightDate = "";
String flightNo = "UNKONW";
String flightDate = "20101010";
//开始解析
String msgType = XMLXPath.getSingleValueByPath(document,MessageType);
String journeyid = XMLXPath.getSingleValueByPath(document,JourneyID);
// XMLXPath.getSingleValueByPath(document, )
if(!StringUtils.isEmpty(journeyid)){
String[] flightList = journeyid.split("/");
if(flightList.length > 0){
flightNo = flightList[0];
flightDate = flightList[1];
}
}
String awbA = XMLXPath.getSingleValueByPath(document,WaybillMaster);
//全格式的分单 如 17212345678_ADBD
... ... @@ -163,6 +155,8 @@ public class CustomXmlHandleThread implements Runnable{
String version = XMLXPath.getSingleValueByPath(document,Version);
String functionCode = XMLXPath.getSingleValueByPath(document,FunctionCode);
CustomReception customReception = new CustomReception( msgType,
flightNo,
flightDate,
... ... @@ -176,6 +170,25 @@ public class CustomXmlHandleThread implements Runnable{
reciveId,
version,
functionCode);
/**
* 如果回执中没有携带航班信息节点,说明是出错报文
* 到发送日志表根据messageid 找到相应的发送日志报文的航班及运单信息,再进行解析
*/
if(!StringUtils.isEmpty(journeyid)){
String[] flightList = journeyid.split("/");
if(flightList.length > 0){
flightNo = flightList[0];
flightDate = flightList[1];
customReception.setFlightNo(flightNo);
customReception.setFlightDate(flightDate);
}
}else {
CUSTOMSMESSAGE customsmessage = new CUSTOMSMESSAGE();
customReception = customsmessage.getWaybillInfoByCutomResponse(customReception);
}
switch (msgType){
case "MT9999":
ORIGINMANIFESTMASTER originmanifestmaster = new ORIGINMANIFESTMASTER(customReception);
... ... @@ -254,6 +267,7 @@ public class CustomXmlHandleThread implements Runnable{
}
updateCustomMessage(customReception);
return i;
}
... ... @@ -268,7 +282,7 @@ public class CustomXmlHandleThread implements Runnable{
String importJourneyID = XMLXPath.getSingleValueByPath(document,ImportJourneyID);
String importFlightNo = "UNKONW";
String importFlightDate = "00000000";
String importFlightDate = "20101010";
if(!StringUtils.isEmpty(importJourneyID)){
importFlightNo = AWBTools.splitFlightAndDate(importJourneyID)[0];
importFlightDate = AWBTools.splitFlightAndDate(importJourneyID)[1];
... ... @@ -298,6 +312,11 @@ public class CustomXmlHandleThread implements Runnable{
}
private int updateCustomMessage(CustomReception customReception){
CUSTOMSMESSAGE customsmessage = new CUSTOMSMESSAGE(customReception);
return customsmessage.updateMessageByMessageid();
}
private void errBak(File file){
try {
String today = DateUtil.getTodayBy_yyyyMMdd();
... ...
package com.tianbo.analysis.model;
import com.tianbo.analysis.bean.WlptBaseModel;
import com.tianbo.util.Date.DateUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.util.Date;
public class CUSTOMSMESSAGE {
@Data
@Slf4j
public class CUSTOMSMESSAGE extends WlptBaseModel {
private String autoid;
private String messageid;
... ... @@ -183,4 +190,31 @@ public class CUSTOMSMESSAGE {
public void setCreatetime(Date createtime) {
this.createtime = createtime;
}
public CUSTOMSMESSAGE(){
}
public CUSTOMSMESSAGE(CustomReception customReception) {
this.messageid = customReception.getMessageID();
this.messagetype = customReception.getMessageType();
this.receivetime = new Date();
this.flightno = customReception.getFlightNo();
this.flightdate = DateUtil.formatByyyyyMMdd(customReception.getFlightDate());
this.waybillnomaster = customReception.getWayBillMaster();
this.waybillnosecondary = customReception.getWayBillSecond();
this.responsecode = customReception.getResponseCode();
this.responsetext = customReception.getResponseText();
this.messagestatus = "3";
}
public CustomReception getWaybillInfoByCutomResponse(CustomReception customReception){
return customMessageService.getWaybillInfoByCutomResponse(customReception);
}
public int updateMessageByMessageid(){
return customsmessageMapper.updateMessageByMessageid(this);
}
}
\ No newline at end of file
... ...
package com.tianbo.analysis.model;
import com.tianbo.util.Date.DateUtil;
import java.util.Date;
public class CUSTOMSMESSAGEWithBLOBs extends CUSTOMSMESSAGE {
private String messagecontent;
... ... @@ -20,4 +24,5 @@ public class CUSTOMSMESSAGEWithBLOBs extends CUSTOMSMESSAGE {
public void setReceivecontent(String receivecontent) {
this.receivecontent = receivecontent == null ? null : receivecontent.trim();
}
}
\ No newline at end of file
... ...
package com.tianbo.analysis.service;
import com.tianbo.analysis.model.CUSTOMSMESSAGE;
import com.tianbo.analysis.model.CustomReception;
public interface CustomMessageService {
/**
* 根据回执的messageid 取得已发送舱单的相关信息
* @param customReception 主要传送发送时的messageid,即回执中的messagesid
* @return
*/
CustomReception getWaybillInfoByCutomResponse(CustomReception customReception);
}
... ...
package com.tianbo.analysis.service.imp;
import com.tianbo.analysis.dao.CUSTOMSMESSAGEMapper;
import com.tianbo.analysis.model.CUSTOMSMESSAGE;
import com.tianbo.analysis.model.CUSTOMSMESSAGEWithBLOBs;
import com.tianbo.analysis.model.CustomReception;
import com.tianbo.analysis.service.CustomMessageService;
import com.tianbo.util.Date.DateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class CustomMessageServiceImp implements CustomMessageService{
@Autowired
public CUSTOMSMESSAGEMapper customsmessageMapper;
@Override
public CustomReception getWaybillInfoByCutomResponse(CustomReception customReception){
List<CUSTOMSMESSAGEWithBLOBs> customsmessages = customsmessageMapper.selectMessage(customReception.getMessageID());
if(!customsmessages.isEmpty()){
CUSTOMSMESSAGE customsmessage = customsmessages.get(0);
customReception.setFlightDate(DateUtil.dateToString(customsmessage.getFlightdate(),"yyyyMMdd"));
customReception.setFlightNo(customsmessage.getFlightno());
customReception.setWayBillMaster(customsmessage.getWaybillnomaster());
customReception.setWayBillSecond(customsmessage.getWaybillnosecondary());
}
return customReception;
}
}
... ...
... ... @@ -57,7 +57,9 @@ public class ShareServiceImp {
// 发送日志 插入
CoustomAnalysisServiceImp coustomAnalysisServiceImp = new CoustomAnalysisServiceImp();
CUSTOMSMESSAGEWithBLOBs cus = customsmessageMapper.selectMessage(customReception.getMessageID());
// CUSTOMSMESSAGEWithBLOBs cus = customsmessageMapper.selectMessage(customReception.getMessageID());
CUSTOMSMESSAGEWithBLOBs cus = new CUSTOMSMESSAGEWithBLOBs();
switch (type){
case "MT9999":
... ... @@ -313,8 +315,8 @@ public class ShareServiceImp {
// messageId 更新
public int updateMessage(CUSTOMSMESSAGEWithBLOBs cus){
return customsmessageMapper.updateMessage(cus);
return 0;
// return customsmessageMapper.updateMessage(cus);
}
... ... @@ -325,7 +327,8 @@ public class ShareServiceImp {
cus.setMessageid(cr.getMessageID());
cus.setResponsetext(cr.getResponseText());
return customsmessageMapper.updateMessage(cus);
// return customsmessageMapper.updateMessage(cus);
return 0;
}
}
... ...
... ... @@ -31,6 +31,11 @@ public class TaskAnalysis {
@Value("${custom.transmitDir}")
private String transmitDir;
/**
* 线程数量
*/
private final static int theadamount = 50;
@Scheduled(fixedRate = 5000)
public void startTask(){
final SimpleDateFormat sdf = new SimpleDateFormat(
... ... @@ -51,16 +56,16 @@ public class TaskAnalysis {
File fileDirectory = new File(readDir);
List<File> files = FileTool.readDirectoryFiles(fileDirectory);
//文件数量大于50个,每次只解析前50个
if (files!=null && !files.isEmpty() && files.size()>10){
CountDownLatch latch = new CountDownLatch(10);
log.trace("解析任务开始{},文件数量:{}",startTime,10);
for (int i=0;i<10;i++){
if (files!=null && !files.isEmpty() && files.size()>theadamount){
CountDownLatch latch = new CountDownLatch(theadamount);
log.trace("解析任务开始{},文件数量:{}",startTime,theadamount);
for (int i=0;i<theadamount;i++){
threadJbob(files.get(i),latch,transToCfps,threadPool);
}
latch.await();
}
//文件数量小于50个,全部一次解析完
if (files!=null && !files.isEmpty() && files.size()<10){
else if (files!=null && !files.isEmpty() && files.size()<theadamount){
CountDownLatch latch = new CountDownLatch(files.size());
log.info("解析任务开始{},文件数量:{}",startTime,files.size());
for (int i=0;i<files.size();i++){
... ...
... ... @@ -11,7 +11,7 @@ public class XMLThreadPoolFactory {
public static ThreadPoolExecutor instance(){
if (threadPool==null){
XMLThreadFactory xmlThreadFactory = new XMLThreadFactory("xml");
threadPool = new ThreadPoolExecutor(5, 100,
threadPool = new ThreadPoolExecutor(10, 100,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024),
xmlThreadFactory,
... ...
... ... @@ -173,13 +173,17 @@
</insert>
<select id="selectMessage" parameterType="string" resultType="com.tianbo.analysis.model.CUSTOMSMESSAGEWithBLOBs">
select * from CUSTOMSMESSAGE where MESSAGEID = #{value}
select * from CUSTOMSMESSAGE where MESSAGEID = #{value,jdbcType=VARCHAR}
</select>
<update id="updateMessage" parameterType="com.tianbo.analysis.model.CUSTOMSMESSAGEWithBLOBs">
<update id="updateMessageByMessageid" parameterType="com.tianbo.analysis.model.CUSTOMSMESSAGE">
UPDATE CUSTOMSMESSAGE
SET RESPONSETEXT = #{responsetext, jdbcType=VARCHAR}
SET
RESPONSETEXT = #{responsetext, jdbcType=VARCHAR},
MESSAGESTATUS = #{messagestatus,jdbcType=VARCHAR},
RESPONSECODE = #{responsecode,jdbcType=VARCHAR},
RECEIVETIME = #{receivetime,jdbcType=TIMESTAMP }
WHERE
MESSAGEID = #{messageid, jdbcType=VARCHAR}
</update>
... ...