作者 朱兆平

给九州发送数据,双CLIENT,同时发送货运数据 和 通关数据

@@ -52,8 +52,18 @@ public class Main { @@ -52,8 +52,18 @@ public class Main {
52 52
53 //数据仓库给九州的发送程序 53 //数据仓库给九州的发送程序
54 Timer timer = new Timer(); 54 Timer timer = new Timer();
55 - IMFNoDelaySender CangKu2Jiuzhou = new IMFNoDelaySender(client,"data/fid_index.txt"); 55 +
  56 + //从配置文件读取sql语句,发送货运数据
  57 + String sql_select=ConfigUtils.SQl;
  58 + String sqlmax = ConfigUtils.SQlMax;
  59 + IMFNoDelaySender CangKu2Jiuzhou = new IMFNoDelaySender(client,"data/fid_index.txt",sql_select,sqlmax);
56 CangKu2Jiuzhou.start(); 60 CangKu2Jiuzhou.start();
  61 +
  62 + //发送通关数据
  63 + String sql_select1=ConfigUtils.SQl1;
  64 + String sqlmax1 = ConfigUtils.SQlMax1;
  65 + IMFNoDelaySender CangKu2Jiuzhou_tg = new IMFNoDelaySender(client,"data/fid_index.txt",sql_select1,sqlmax1);
  66 + CangKu2Jiuzhou_tg.start();
57 // timer.schedule(CangKu2Jiuzhou, 10000L, 1200000L); 67 // timer.schedule(CangKu2Jiuzhou, 10000L, 1200000L);
58 68
59 } 69 }
@@ -28,10 +28,14 @@ public class IMFNoDelaySender extends Thread{ @@ -28,10 +28,14 @@ public class IMFNoDelaySender extends Thread{
28 private Dao dao = (DaoImpl)SystemBean.getBean("dao"); 28 private Dao dao = (DaoImpl)SystemBean.getBean("dao");
29 private IMFClient client; 29 private IMFClient client;
30 private String indexFileName; //记录发送数据的节点 30 private String indexFileName; //记录发送数据的节点
  31 + private String sql;
  32 + private String sqlMax;
31 33
32 - public IMFNoDelaySender(IMFClient client, String fileName) { 34 + public IMFNoDelaySender(IMFClient client, String fileName,String sql,String sqlMax) {
33 this.indexFileName = fileName; 35 this.indexFileName = fileName;
34 this.client = client; 36 this.client = client;
  37 + this.sql=sql;
  38 + this.sqlMax=sqlMax;
35 } 39 }
36 40
37 public void run() { 41 public void run() {
@@ -46,12 +50,16 @@ public class IMFNoDelaySender extends Thread{ @@ -46,12 +50,16 @@ public class IMFNoDelaySender extends Thread{
46 //初始化读取 50 //初始化读取
47 logger.info("job start:"); 51 logger.info("job start:");
48 int lastFID = Utils.readFIDIndex(this.indexFileName); 52 int lastFID = Utils.readFIDIndex(this.indexFileName);
49 - int maxFID = this.dao.getMaxFID(); 53 + int maxFID = this.dao.getMaxFID(sqlMax);
50 logger.info("lastFID=" + lastFID + " maxFID=" + maxFID); 54 logger.info("lastFID=" + lastFID + " maxFID=" + maxFID);
51 int is_ok = 0; 55 int is_ok = 0;
52 56
53 if (maxFID > lastFID) { 57 if (maxFID > lastFID) {
54 - List<MessageBak> list = this.dao.getRecordByFID(lastFID); //查询大于[FID]350的报文 58 + //这里修改从FID_INDEX文件读取的FID ,改为取搜索结果的rownum,此处FID代表ROWNUM
  59 + String sqlformat = String.format(sql, lastFID, lastFID, ConfigUtils.RECORD_COUNT);
  60 + logger.info(sqlformat);
  61 + List<MessageBak> list = this.dao.getRecordByFID(lastFID,sqlformat); //查询大于[FID]350的报文
  62 + //如果两个发送者,要用到这里
55 Map<Integer, XMLHeader> map = ConfigUtils.XML_HEADER_MAP; 63 Map<Integer, XMLHeader> map = ConfigUtils.XML_HEADER_MAP;
56 Iterator var7 = list.iterator(); 64 Iterator var7 = list.iterator();
57 65
@@ -65,7 +73,7 @@ public class IMFNoDelaySender extends Thread{ @@ -65,7 +73,7 @@ public class IMFNoDelaySender extends Thread{
65 if (m.getContent() != null) { 73 if (m.getContent() != null) {
66 int times = 0; 74 int times = 0;
67 //配置文件里面有几个sender 这里循环几次 75 //配置文件里面有几个sender 这里循环几次
68 - for(Iterator var10 = map.keySet().iterator(); var10.hasNext(); Thread.sleep((long)ConfigUtils.SEND_MESSAGE_INTERVAL)) { 76 + for(Iterator var10 = map.keySet().iterator(); var10.hasNext(); times++) {
69 Integer key = (Integer)var10.next(); 77 Integer key = (Integer)var10.next();
70 XMLHeader header = (XMLHeader)map.get(key); 78 XMLHeader header = (XMLHeader)map.get(key);
71 logger.info("key=" + key + " value=" + ((XMLHeader)map.get(key)).toString()); 79 logger.info("key=" + key + " value=" + ((XMLHeader)map.get(key)).toString());
@@ -15,11 +15,11 @@ public interface Dao { @@ -15,11 +15,11 @@ public interface Dao {
15 15
16 void delete(int var1); 16 void delete(int var1);
17 17
18 - List<MessageBak> getRecordByFID(int var1); 18 + List<MessageBak> getRecordByFID(int var1,String var2);
19 19
20 int getRecordCount(); 20 int getRecordCount();
21 21
22 - int getMaxFID(); 22 + int getMaxFID(String var1);
23 23
24 void update(int var1, int var2); 24 void update(int var1, int var2);
25 } 25 }
@@ -113,17 +113,13 @@ public class DaoImpl implements Dao { @@ -113,17 +113,13 @@ public class DaoImpl implements Dao {
113 this.jdbcTemplate.update("delete from T_ETL_MESSAGE where fid=" + fid); 113 this.jdbcTemplate.update("delete from T_ETL_MESSAGE where fid=" + fid);
114 } 114 }
115 115
116 - public int getMaxFID() {  
117 - String sqlmax = "select max(fid) from MESSAGE_BAK WHERE (FID>%s and FID<%s+%s) AND (TYPE='CLR' OR TYPE='ES1' OR TYPE='IS1' OR STYP = 'BSTA' OR STYP = 'FZE_RCF' OR STYP = 'FSU_FOH' OR STYP = 'FSU_DEP' OR STYP = 'COST' OR STYP = 'ABME' OR STYP = 'FZE_DEP' OR STYP = 'FSU_RCF') ORDER BY FID\n";  
118 - sqlmax = ConfigUtils.SQlMax; 116 + public int getMaxFID(String sqlmax) {
  117 +// String sqlmax = "select max(fid) from MESSAGE_BAK WHERE (FID>%s and FID<%s+%s) AND (TYPE='CLR' OR TYPE='ES1' OR TYPE='IS1' OR STYP = 'BSTA' OR STYP = 'FZE_RCF' OR STYP = 'FSU_FOH' OR STYP = 'FSU_DEP' OR STYP = 'COST' OR STYP = 'ABME' OR STYP = 'FZE_DEP' OR STYP = 'FSU_RCF') ORDER BY FID\n";
119 return this.jdbcTemplate.queryForInt(sqlmax); 118 return this.jdbcTemplate.queryForInt(sqlmax);
120 } 119 }
121 120
122 - public List<MessageBak> getRecordByFID(int fid) {  
123 - //从配置文件读取sql语句  
124 - String sql_select=ConfigUtils.SQl;  
125 - String sql = String.format(sql_select, fid, fid, ConfigUtils.RECORD_COUNT); //这里修改从FID_INDEX文件读取的FID ,改为取搜索结果的rownum,此处FID代表ROWNUM  
126 - logger.info(sql); 121 + public List<MessageBak> getRecordByFID(int fid,String sql) {
  122 +
127 final LobHandler lobHandler = new DefaultLobHandler(); 123 final LobHandler lobHandler = new DefaultLobHandler();
128 final ArrayList xmlList = new ArrayList(); 124 final ArrayList xmlList = new ArrayList();
129 125
@@ -32,6 +32,8 @@ public class ConfigUtils { @@ -32,6 +32,8 @@ public class ConfigUtils {
32 public static String SNDR = ""; 32 public static String SNDR = "";
33 public static String SQl=""; 33 public static String SQl="";
34 public static String SQlMax=""; 34 public static String SQlMax="";
  35 + public static String SQl1="";
  36 + public static String SQlMax1="";
35 public static Map<String, String> XTYPE_MAP = new HashMap(); 37 public static Map<String, String> XTYPE_MAP = new HashMap();
36 38
37 public ConfigUtils() { 39 public ConfigUtils() {
@@ -60,6 +62,8 @@ public class ConfigUtils { @@ -60,6 +62,8 @@ public class ConfigUtils {
60 IMF_PASSWORD = config.getProperty("imf_password").trim(); 62 IMF_PASSWORD = config.getProperty("imf_password").trim();
61 SQl = config.getProperty("sql"); 63 SQl = config.getProperty("sql");
62 SQlMax = config.getProperty("sqlmax"); 64 SQlMax = config.getProperty("sqlmax");
  65 + SQl1 = config.getProperty("sql1");
  66 + SQlMax1 = config.getProperty("sqlmax1");
63 String interval = config.getProperty("interval").trim(); 67 String interval = config.getProperty("interval").trim();
64 String record_count = config.getProperty("record_count").trim(); 68 String record_count = config.getProperty("record_count").trim();
65 logger.info(String.format("role=%s", ROLE)); 69 logger.info(String.format("role=%s", ROLE));