|
|
package com.tianbo.messagebus.service;
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONArray;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.http.*;
|
|
|
import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.scheduling.annotation.EnableAsync;
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.util.LinkedMultiValueMap;
|
|
|
import org.springframework.util.MultiValueMap;
|
|
|
import org.springframework.util.StringUtils;
|
|
|
import org.springframework.web.client.RestTemplate;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
import java.io.Serializable;
|
|
|
|
|
|
@Service
|
|
|
@EnableAsync
|
|
|
@Slf4j
|
|
|
public class MessageBusProcessor {
|
|
|
|
|
|
/**http://10.161.4.20:8083/
|
|
|
* 账户登录地址
|
|
|
*/
|
|
|
@Value("${message-bus.url.login-url}")
|
|
|
private String LOGIN_URL;
|
|
|
/**
|
|
|
* 账号名
|
|
|
*/
|
|
|
@Value("${message-bus.auth.username}")
|
|
|
private String USER_NAME;
|
|
|
// private static final String USER_NAME = "HYYW";
|
|
|
/**
|
|
|
* 登陆密码
|
|
|
*/
|
|
|
@Value("${message-bus.auth.password}")
|
|
|
private String USER_PASS;
|
|
|
// private static final String USER_PASS = "ZZecargo123";
|
|
|
/**
|
|
|
* 心跳接口地址
|
|
|
*/
|
|
|
@Value("${message-bus.url.hearbit-url}")
|
|
|
private String HEARTBEAT_URL;
|
|
|
/**
|
|
|
* 心跳间隔时间 单位S
|
|
|
*/
|
|
|
@Value("${message-bus.heartbit-interval}")
|
|
|
private int HEARTBIT_INTERVAL;
|
|
|
/**
|
|
|
* 发送报文地址
|
|
|
*/
|
|
|
@Value("${message-bus.url.send-url}")
|
|
|
private String SEND_MSG_URL;
|
|
|
/**
|
|
|
* 接收报文地址
|
|
|
*/
|
|
|
@Value("${message-bus.url.get-url}")
|
|
|
private String GET_MSG_URL;
|
|
|
/**
|
|
|
* 存储登录后的token
|
|
|
*/
|
|
|
private static String TOKEN = "";
|
|
|
/**
|
|
|
* 登录成功状态
|
|
|
*/
|
|
|
private static Boolean LOGIN_STATUS=false;
|
|
|
|
|
|
|
|
|
/**
|
|
|
* HTTP请求框架
|
|
|
*/
|
|
|
@Resource
|
|
|
private RestTemplate restTemplate;
|
|
|
|
|
|
/**
|
|
|
* 发起登录,存储token
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
public Boolean login() {
|
|
|
try {
|
|
|
/*
|
|
|
* 发起HTTP 登录请求
|
|
|
* 登录接口的请求头为application/x-www-form-urlencoded
|
|
|
*/
|
|
|
HttpHeaders headers = new HttpHeaders();
|
|
|
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
|
|
|
|
|
|
/*
|
|
|
* 请求参数
|
|
|
*/
|
|
|
MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>();
|
|
|
params.add("username", USER_NAME);
|
|
|
params.add("password", USER_PASS);
|
|
|
|
|
|
HttpEntity<MultiValueMap<String, String>> request = new HttpEntity<MultiValueMap<String, String>>(params, headers);
|
|
|
|
|
|
/*
|
|
|
* 提交HTTP访问,获取返回信息
|
|
|
*/
|
|
|
ResponseEntity<String> response = restTemplate.postForEntity(LOGIN_URL, request, String.class);
|
|
|
// 输出结果
|
|
|
log.info(response.getBody());
|
|
|
|
|
|
|
|
|
/*
|
|
|
校验是否登录成功
|
|
|
*/
|
|
|
if (response.getStatusCode().equals(HttpStatus.OK)) {
|
|
|
/**
|
|
|
* 从返回信息中确定是否登录成功,并取得token
|
|
|
* 返回格式
|
|
|
* {
|
|
|
"code":200,
|
|
|
"data":{
|
|
|
"account":"yangyucheng",
|
|
|
"token":"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiI2MCIsIm5hbWUiOiLmnajnjonmiJDmtYvor5UiLCJpZCI6NjAsImlhdCI6MTYxNzM0ODM3MiwiYWNjb3VudCI6Inlhbmd5dWNoZW5nIn0.ElAs7BtV1tu6ApQXuPXzgXUgvja76bjEb-zxqhUON48"
|
|
|
},
|
|
|
"message":"success",
|
|
|
"success":true,
|
|
|
"time":20210402152612604
|
|
|
}
|
|
|
*/
|
|
|
JSONObject resJson = JSON.parseObject(response.getBody());
|
|
|
JSONObject resData = resJson.getJSONObject("data");
|
|
|
String resCode = resJson.getString("code");
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
校验并获取登陆成功后返回的token
|
|
|
*/
|
|
|
String authToken = resData.getString("token");
|
|
|
if ("200".equals(resCode) && !StringUtils.isEmpty(authToken) && authToken.length() > 10) {
|
|
|
LOGIN_STATUS = true;
|
|
|
|
|
|
//设置请求头部Authorization为token, token的类型为Bearer
|
|
|
TOKEN = authToken;
|
|
|
/*
|
|
|
登录成功开始心跳
|
|
|
*/
|
|
|
// startHeartBit();
|
|
|
log.info("登录成功");
|
|
|
return true;
|
|
|
}else {
|
|
|
log.error("登录失败");
|
|
|
return false;
|
|
|
}
|
|
|
} else {
|
|
|
log.error("登录失败");
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
log.error("登录失败->{}",e.toString());
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 定时心跳,维持在线状态,每10秒访问一次心跳接口
|
|
|
*/
|
|
|
public void startHeartBit() {
|
|
|
try {
|
|
|
while (true) {
|
|
|
heartBit();
|
|
|
Thread.sleep(HEARTBIT_INTERVAL);
|
|
|
}
|
|
|
} catch (InterruptedException e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Scheduled(fixedRate = 10000)
|
|
|
public void heartBit() {
|
|
|
if (!StringUtils.isEmpty(TOKEN) && LOGIN_STATUS){
|
|
|
/*
|
|
|
* 发起HTTP 登录请求
|
|
|
* 登录接口的请求头为application/x-www-form-urlencoded
|
|
|
*/
|
|
|
HttpHeaders headers = new HttpHeaders();
|
|
|
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
|
|
|
/*
|
|
|
* 设置获取到的token到头部信息Authorization节点中
|
|
|
*/
|
|
|
headers.setBearerAuth(TOKEN);
|
|
|
|
|
|
/*
|
|
|
* 心跳接口无参数,访问接口即可
|
|
|
*/
|
|
|
MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>();
|
|
|
|
|
|
HttpEntity<MultiValueMap<String, String>> request = new HttpEntity<MultiValueMap<String, String>>(params, headers);
|
|
|
|
|
|
/*
|
|
|
* 提交HTTP访问,获取返回信息
|
|
|
*/
|
|
|
ResponseEntity<String> response = restTemplate.postForEntity(HEARTBEAT_URL, request, String.class);
|
|
|
// 输出结果
|
|
|
System.out.println(response.getBody());
|
|
|
if (response.getStatusCode().equals(HttpStatus.OK)) {
|
|
|
log.info("心跳成功");
|
|
|
} else {
|
|
|
log.error("心跳失败");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 发送消息
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
public Boolean sendMsg(MSG msg) {
|
|
|
if (LOGIN_STATUS) {
|
|
|
/*
|
|
|
* 发起HTTP 登录请求
|
|
|
* 登录接口的请求头为application/json
|
|
|
*/
|
|
|
HttpHeaders headers = new HttpHeaders();
|
|
|
headers.setContentType(MediaType.APPLICATION_JSON);
|
|
|
|
|
|
MSGS msgs = new MSGS();
|
|
|
msg.getHEADER().setSNDR("HYYW");
|
|
|
|
|
|
msgs.setMSG(msg);
|
|
|
|
|
|
|
|
|
/*
|
|
|
设置要发送的实体类并将实体转换成Json字符串,这里以MAP类型举例
|
|
|
*/
|
|
|
// Map<String, String> dataModel = new HashMap<>();
|
|
|
// dataModel.put("flightNo", "CV987");
|
|
|
// dataModel.put("flightDate", "MAY01");
|
|
|
// dataModel.put("waybillNo", "172-12345678");
|
|
|
// dataModel.put("weight", "20.01");
|
|
|
// dataModel.put("piece", "2");
|
|
|
// msg.getMSG().setBODY(JSON.toJSONString(dataModel));
|
|
|
|
|
|
/*
|
|
|
* 设置获取到的token到头部信息Authorization节点中
|
|
|
*/
|
|
|
headers.setBearerAuth(TOKEN);
|
|
|
|
|
|
/*
|
|
|
* 发起消息接口访问,发送消息
|
|
|
*/
|
|
|
|
|
|
HttpEntity<MSGS> request = new HttpEntity<MSGS>(msgs, headers);
|
|
|
ResponseEntity<String> response = restTemplate.postForEntity(SEND_MSG_URL, request, String.class);
|
|
|
|
|
|
JSONObject resJson = JSON.parseObject(response.getBody());
|
|
|
String code = resJson.getString("code");
|
|
|
|
|
|
System.out.println(response.getBody());
|
|
|
|
|
|
if (response.getStatusCode().equals(HttpStatus.OK) && "200".equals(code)) {
|
|
|
log.info("消息发送成功");
|
|
|
return true;
|
|
|
} else {
|
|
|
log.error("消息发送失败");
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
|
|
|
/**
|
|
|
* 收取消息
|
|
|
*
|
|
|
* @return
|
|
|
*/
|
|
|
@Async
|
|
|
@Scheduled(fixedRate = 300)
|
|
|
public JSONArray getMsg() {
|
|
|
if(!LOGIN_STATUS){
|
|
|
login();
|
|
|
return null;
|
|
|
}
|
|
|
/*
|
|
|
* 发起HTTP 登录请求
|
|
|
* 登录接口的请求头为application/json
|
|
|
*/
|
|
|
HttpHeaders headers = new HttpHeaders();
|
|
|
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
|
|
|
headers.setBearerAuth(TOKEN);
|
|
|
/*
|
|
|
* 请求参数拼装
|
|
|
*/
|
|
|
MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>();
|
|
|
params.add("username", "HYYW");
|
|
|
HttpEntity<MultiValueMap<String, String>> request = new HttpEntity<MultiValueMap<String, String>>(params, headers);
|
|
|
|
|
|
/*
|
|
|
* 提交HTTP访问,获取返回信息
|
|
|
*/
|
|
|
ResponseEntity<String> response = restTemplate.postForEntity(GET_MSG_URL, request, String.class);
|
|
|
// 输出结果
|
|
|
log.info("获取到消息返回---{}---",response.getBody());
|
|
|
/*
|
|
|
* 从返回信息中确定是否获取到消息
|
|
|
*/
|
|
|
JSONObject resJson = JSON.parseObject(response.getBody());
|
|
|
String code = resJson.getString("code");
|
|
|
if (response.getStatusCode().equals(HttpStatus.OK) && "200".equals(code)) {
|
|
|
|
|
|
JSONArray data = resJson.getJSONArray("data");
|
|
|
log.info("消息接收成功,接收消息为>>>{}<<<",data.toString());
|
|
|
|
|
|
for (int i = 0; i<data.size() ; i++) {
|
|
|
/*
|
|
|
取得是大数据小组的实体,他们的msg.body的封装是以对象实体object封装的。不是json字符串。
|
|
|
*/
|
|
|
String msg = data.getObject(i,String.class);
|
|
|
JSONObject rootJson = JSON.parseObject(msg);
|
|
|
JSONObject msgJson = rootJson.getJSONObject("MSG");
|
|
|
JSONObject body = msgJson.getJSONObject("BODY");
|
|
|
|
|
|
HEADER msgHeader = msgJson.getObject("HEADER",HEADER.class);
|
|
|
|
|
|
MSG transMsg= new MSG();
|
|
|
String transBody = body.toJSONString();
|
|
|
transMsg.setHEADER(msgHeader);
|
|
|
transMsg.setBODY(transBody);
|
|
|
|
|
|
transMsg.toString();
|
|
|
|
|
|
/*
|
|
|
自定义对返回数据的处理
|
|
|
*/
|
|
|
log.info("开始转发消息");
|
|
|
Boolean sendResult = sendMsg(transMsg);
|
|
|
/**
|
|
|
* todo:消息失败处理
|
|
|
*/
|
|
|
if(!sendResult){
|
|
|
//todo:消息备份或者重发?
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return data;
|
|
|
|
|
|
} else {
|
|
|
log.error("消息获取失败");
|
|
|
return new JSONArray();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 读取备份消息并消息重发
|
|
|
* @return
|
|
|
*/
|
|
|
public Boolean reSend(){
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 消息发送实体类
|
|
|
*/
|
|
|
class MSGS implements Serializable {
|
|
|
private MSG MSG;
|
|
|
|
|
|
public MSG getMSG() {
|
|
|
return MSG;
|
|
|
}
|
|
|
|
|
|
public void setMSG(MSG MSG) {
|
|
|
this.MSG = MSG;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
class MSG {
|
|
|
/**
|
|
|
* 具体消息头部信息
|
|
|
*/
|
|
|
private HEADER HEADER;
|
|
|
/**
|
|
|
* 具体消息支持JSON字符串或者XML
|
|
|
*/
|
|
|
private String BODY;
|
|
|
|
|
|
public HEADER getHEADER() {
|
|
|
return HEADER;
|
|
|
}
|
|
|
|
|
|
public void setHEADER(HEADER HEADER) {
|
|
|
this.HEADER = HEADER;
|
|
|
}
|
|
|
|
|
|
public String getBODY() {
|
|
|
return BODY;
|
|
|
}
|
|
|
|
|
|
public void setBODY(String BODY) {
|
|
|
this.BODY = BODY;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 消息发送头部信息
|
|
|
*/
|
|
|
class HEADER {
|
|
|
/**
|
|
|
* 消息唯一标识,可以是ID
|
|
|
*/
|
|
|
private long SEQNO;
|
|
|
/**
|
|
|
* 发送的消息大类,在总线系统配置好后,分配给消息生产者
|
|
|
*/
|
|
|
private String TYPE;
|
|
|
/**
|
|
|
* 发送的消息子类,在总线系统配置好后,分配给消息生产者
|
|
|
*/
|
|
|
private String STYPE;
|
|
|
/**
|
|
|
* 消息增删改类型(IE=insert event,UE=update event,DE=delete event)
|
|
|
*/
|
|
|
private String OPTYPE;
|
|
|
/**
|
|
|
* 发送时间(如:20210320101421/yyyyMMddHHmmss)
|
|
|
*/
|
|
|
private String DDTM;
|
|
|
/**
|
|
|
* 你的账号名称
|
|
|
*/
|
|
|
private String SNDR;
|
|
|
/**
|
|
|
* 消息接收者
|
|
|
*/
|
|
|
private String RCVR;
|
|
|
|
|
|
public long getSEQNO() {
|
|
|
return SEQNO;
|
|
|
}
|
|
|
|
|
|
public void setSEQNO(long SEQNO) {
|
|
|
this.SEQNO = SEQNO;
|
|
|
}
|
|
|
|
|
|
public String getTYPE() {
|
|
|
return TYPE;
|
|
|
}
|
|
|
|
|
|
public void setTYPE(String TYPE) {
|
|
|
this.TYPE = TYPE;
|
|
|
}
|
|
|
|
|
|
public String getSTYPE() {
|
|
|
return STYPE;
|
|
|
}
|
|
|
|
|
|
public void setSTYPE(String STYPE) {
|
|
|
this.STYPE = STYPE;
|
|
|
}
|
|
|
|
|
|
public String getOPTYPE() {
|
|
|
return OPTYPE;
|
|
|
}
|
|
|
|
|
|
public void setOPTYPE(String OPTYPE) {
|
|
|
this.OPTYPE = OPTYPE;
|
|
|
}
|
|
|
|
|
|
public String getDDTM() {
|
|
|
return DDTM;
|
|
|
}
|
|
|
|
|
|
public void setDDTM(String DDTM) {
|
|
|
this.DDTM = DDTM;
|
|
|
}
|
|
|
|
|
|
public String getSNDR() {
|
|
|
return SNDR;
|
|
|
}
|
|
|
|
|
|
public void setSNDR(String SNDR) {
|
|
|
this.SNDR = SNDR;
|
|
|
}
|
|
|
|
|
|
public String getRCVR() {
|
|
|
return RCVR;
|
|
|
}
|
|
|
|
|
|
public void setRCVR(String RCVR) {
|
|
|
this.RCVR = RCVR;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 收发送接口返回结果实体类
|
|
|
*/
|
|
|
|
|
|
class ResultJson<T> implements Serializable {
|
|
|
private static final long serialVersionUID = 1L;
|
|
|
|
|
|
/**
|
|
|
* 状态码 正确为200,其他为异常
|
|
|
*/
|
|
|
private String code;
|
|
|
/**
|
|
|
* 结果描述
|
|
|
*/
|
|
|
private String msg;
|
|
|
|
|
|
/**
|
|
|
* 结果数据
|
|
|
*/
|
|
|
private T data;
|
|
|
} |
...
|
...
|
|