作者 朱兆平

升级为feign访问服务进行消息转发

... ... @@ -82,7 +82,6 @@ spring:
# max-idle: 10000
# min-idle: 5
# initial-size: 5
eureka:
instance:
#eureka服务器页面中status的请求路径
... ... @@ -105,7 +104,6 @@ eureka:
registry-fetch-interval-seconds: 30
management:
endpoints:
enabled-by-default: true
... ... @@ -166,3 +164,12 @@ message-bus:
info:
version: 1.0
description: "消息总线-消息转发服务。[转发大数据小组消息到总线上]"
feign:
hystrix:
enabled: false
client:
config:
default:
logger-level: FULL
... ...
... ... @@ -6,7 +6,7 @@
<packaging>jar</packaging>
<groupId>com.tianbo</groupId>
<artifactId>messagebus-trans-message</artifactId>
<version>1.0-SNAPSHOT</version>
<version>1.0-feign</version>
<description>消息转发服务</description>
<parent>
... ... @@ -41,10 +41,29 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson_version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
... ...
... ... @@ -4,6 +4,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
... ... @@ -12,6 +13,7 @@ import org.springframework.web.client.RestTemplate;
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@EnableScheduling
public class MessageTransApplication {
... ... @@ -27,7 +29,7 @@ public class MessageTransApplication {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(3);
taskScheduler.setPoolSize(100);
return taskScheduler;
}
... ...
... ... @@ -2,13 +2,18 @@ package com.tianbo.messagebus.controller;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tianbo.messagebus.controller.response.ResultJson;
import com.tianbo.messagebus.model.HEADER;
import com.tianbo.messagebus.model.MSG;
import com.tianbo.messagebus.model.MSGS;
import com.tianbo.messagebus.myinterface.KafkaReciveApi;
import com.tianbo.messagebus.myinterface.KafkaSendApi;
import com.tianbo.messagebus.service.MessageBusProcessor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
... ... @@ -22,22 +27,59 @@ import java.util.Date;
public class HeartbeatController implements Serializable {
private static final long serialVersionUID = 1L;
@Autowired
KafkaReciveApi kafkaReciveApi;
@Autowired
KafkaSendApi kafkaSendApi;
@Autowired
private HttpServletRequest request;
@Autowired
MessageBusProcessor messageBusDemo;
MessageBusProcessor messageBusProcessor;
@PostMapping("login")
public void login(){
messageBusDemo.login();
messageBusProcessor.login();
}
@PostMapping("getmsg")
public void getmsg(){
log.error("for test");
messageBusDemo.getMsg();
messageBusProcessor.getMsg();
}
@GetMapping("getOneMsg")
public void getonemsg(@RequestParam(value = "username") String username){
ResultJson resultJson= kafkaReciveApi.recive(username);
log.info("收到结果为:{}",resultJson.toString());
}
@PostMapping("send")
public void sendMsg(@RequestParam("msg") String msgBody){
MSGS msgs = new MSGS();
MSG msg = new MSG();
HEADER header = new HEADER();
header.setSNDR("TXD");
header.setDDTM("20210429103322081");
header.setSEQNO(2021042910);
header.setTYPE("HYXX");
header.setSTYPE("ISLI");
msg.setHEADER(header);
msg.setBODY(msgBody);
msgs.setMSG(msg);
ResultJson response = kafkaSendApi.send(msgs);
log.info(response.toString());
}
}
... ...
... ... @@ -85,4 +85,12 @@ public class ResultJson<T> implements Serializable{
public void setJwtToken(String jwtToken) {
this.jwtToken = jwtToken;
}
@Override
public String toString() {
return "ResultJson{" +
"code='" + code + '\'' +
", msg='" + msg + '\'' +
'}';
}
}
... ...
package com.tianbo.messagebus.model;
/**
* 消息发送头部信息
*/
public class HEADER {
/**
* 消息唯一标识,可以是ID
*/
private long SEQNO;
/**
* 发送的消息大类,在总线系统配置好后,分配给消息生产者
*/
private String TYPE;
/**
* 发送的消息子类,在总线系统配置好后,分配给消息生产者
*/
private String STYPE;
/**
* 消息增删改类型(IE=insert event,UE=update event,DE=delete event)
*/
private String OPTYPE;
/**
* 发送时间(如:20210320101421/yyyyMMddHHmmss)
*/
private String DDTM;
/**
* 你的账号名称
*/
private String SNDR;
/**
* 消息接收者
*/
private String RCVR;
public long getSEQNO() {
return SEQNO;
}
public void setSEQNO(long SEQNO) {
this.SEQNO = SEQNO;
}
public String getTYPE() {
return TYPE;
}
public void setTYPE(String TYPE) {
this.TYPE = TYPE;
}
public String getSTYPE() {
return STYPE;
}
public void setSTYPE(String STYPE) {
this.STYPE = STYPE;
}
public String getOPTYPE() {
return OPTYPE;
}
public void setOPTYPE(String OPTYPE) {
this.OPTYPE = OPTYPE;
}
public String getDDTM() {
return DDTM;
}
public void setDDTM(String DDTM) {
this.DDTM = DDTM;
}
public String getSNDR() {
return SNDR;
}
public void setSNDR(String SNDR) {
this.SNDR = SNDR;
}
public String getRCVR() {
return RCVR;
}
public void setRCVR(String RCVR) {
this.RCVR = RCVR;
}
}
... ...
package com.tianbo.messagebus.model;
public class MSG {
/**
* 具体消息头部信息
*/
private HEADER HEADER;
/**
* 具体消息支持JSON字符串或者XML
*/
private String BODY;
public HEADER getHEADER() {
return HEADER;
}
public void setHEADER(HEADER HEADER) {
this.HEADER = HEADER;
}
public String getBODY() {
return BODY;
}
public void setBODY(String BODY) {
this.BODY = BODY;
}
@Override
public String toString() {
return this.BODY;
}
}
... ...
package com.tianbo.messagebus.model;
import java.io.Serializable;
/**
* 消息实体类
*/
public class MSGS implements Serializable {
private static final long serialVersionUID = 2L;
private MSG MSG;
public MSG getMSG() {
return MSG;
}
public void setMSG(MSG MSG) {
this.MSG = MSG;
}
}
... ...
package com.tianbo.messagebus.myinterface;
import com.tianbo.messagebus.controller.response.ResultJson;
import feign.Headers;
import feign.Param;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@FeignClient(name = "kafka-server-consumer",
fallback = KafkaReciveFallback.class )
public interface KafkaReciveApi {
@ResponseBody
@RequestMapping(value = "/kafka/receive",method = RequestMethod.GET)
ResultJson<List<String>> recive(@RequestParam(value = "username",required = true) String username);
}
... ...
package com.tianbo.messagebus.myinterface;
import com.tianbo.messagebus.controller.response.ResultJson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Service("myKafkaRecive")
public class KafkaReciveFallback implements KafkaReciveApi {
@Override
public ResultJson<List<String>> recive(String username) {
log.info("获取消息失败");
return new ResultJson<>("400","获取消息失败",new ArrayList<>());
}
}
... ...
package com.tianbo.messagebus.myinterface;
import com.tianbo.messagebus.controller.response.ResultJson;
import com.tianbo.messagebus.model.MSGS;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@FeignClient(name = "kafka-server-producer",
fallback = KafkaSendFallback.class )
public interface KafkaSendApi {
@ResponseBody
@RequestMapping(value = "/kafka/send",method = RequestMethod.POST)
ResultJson send(@RequestBody MSGS msgs);
}
... ...
package com.tianbo.messagebus.myinterface;
import com.tianbo.messagebus.controller.response.ResultJson;
import com.tianbo.messagebus.model.MSGS;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service("myKafkaSend")
public class KafkaSendFallback implements KafkaSendApi {
@Override
public ResultJson send(MSGS msgs) {
log.info("发送消息失败");
return new ResultJson<>("400","发送消息失败");
}
}
... ...
... ... @@ -3,7 +3,14 @@ package com.tianbo.messagebus.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.tianbo.messagebus.controller.response.ResultJson;
import com.tianbo.messagebus.model.HEADER;
import com.tianbo.messagebus.model.MSG;
import com.tianbo.messagebus.model.MSGS;
import com.tianbo.messagebus.myinterface.KafkaReciveApi;
import com.tianbo.messagebus.myinterface.KafkaSendApi;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.scheduling.annotation.Async;
... ... @@ -13,15 +20,14 @@ import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import java.io.Serializable;
import java.util.List;
@Service
@EnableAsync
//@EnableAsync
@Slf4j
public class MessageBusProcessor {
... ... @@ -74,7 +80,7 @@ public class MessageBusProcessor {
/**
* 失败重发请求次数
*/
private static final int RETRY_TIMES= 3;
private static final int RETRY_TIMES= 100;
/**
* HTTP请求框架
... ... @@ -82,6 +88,12 @@ public class MessageBusProcessor {
@Resource
private RestTemplate restTemplate;
@Autowired
KafkaReciveApi kafkaReciveApi;
@Autowired
KafkaSendApi kafkaSendApi;
/**
* 发起登录,存储token
*
... ... @@ -192,7 +204,7 @@ public class MessageBusProcessor {
return code;
}
@Scheduled(fixedDelay = 10000)
// @Scheduled(fixedDelay = 10000)
public void heartBit() {
if (!StringUtils.isEmpty(TOKEN) && LOGIN_STATUS){
/*
... ... @@ -237,7 +249,7 @@ public class MessageBusProcessor {
public Boolean sendMsg(MSG msg) {
if (LOGIN_STATUS) {
try{
log.info("开始转发消息:{}",msg.toString());
log.info("………………开始发送消息:{}………………",msg.toString());
/*
* 发起HTTP 登录请求
* 登录接口的请求头为application/json
... ... @@ -277,7 +289,7 @@ public class MessageBusProcessor {
JSONObject resJson = JSON.parseObject(response.getBody());
String code = resJson.getString("code");
if ("200".equals(code)) {
log.info("消息发送成功");
log.info("………………消息发送成功………………");
return true;
}
}
... ... @@ -300,7 +312,7 @@ public class MessageBusProcessor {
* @return
*/
// @Async
@Scheduled(fixedRate = 1000)
// @Scheduled(fixedRate = 1000)
public JSONArray getMsg() {
if(!LOGIN_STATUS){
login();
... ... @@ -326,7 +338,7 @@ public class MessageBusProcessor {
*/
ResponseEntity<String> response = restTemplate.postForEntity(GET_MSG_URL, request, String.class);
// 输出结果
log.info("接口访问结果:{}",response);
if (response.getStatusCode().equals(HttpStatus.OK)) {
/*
... ... @@ -343,7 +355,7 @@ public class MessageBusProcessor {
取得是大数据小组的实体,他们的msg.body的封装是以对象实体object封装的。不是json字符串。
*/
String msg = data.getObject(i,String.class);
log.info("开始转发消息---{}---",msg);
log.info("循环处理消息[{}]---{}---",i,msg);
JSONObject rootJson = JSON.parseObject(msg);
JSONObject msgJson = rootJson.getJSONObject("MSG");
JSONObject body = msgJson.getJSONObject("BODY");
... ... @@ -384,178 +396,111 @@ public class MessageBusProcessor {
}
/**
* 读取备份消息并消息重发
* @return
* feigin从服务直接获取消息
*/
public void reSend(MSG msg){
log.error("***进入重发***");
for (int i = 0; i < RETRY_TIMES; i++) {
Boolean sendResult = sendMsg(msg);
if (sendResult){
log.error("***重发成功***");
break;
}
}
log.error("***已尝试重发三次,重发失败***");
}
@Scheduled(fixedRate = 1000)
public void getDataFromFeigin(){
log.info("1-开始执行获取任务");
ResultJson<List<String>> listResultJson = kafkaReciveApi.recive("HYYW");
log.info("2-获取结果为:{},数量为:{}",listResultJson.toString(),listResultJson.getData().size());
if ("200".equals(listResultJson.getCode()) && listResultJson.getData()!=null && listResultJson.getData().size()>0){
log.info("3-开始处理获取数据");
List<String> dataList = listResultJson.getData();
for (int i = 0; i <dataList.size() ; i++) {
String msg = dataList.get(i);
log.info("4-循环处理消息[{}]--->{}<---",i,msg);
JSONObject rootJson = JSON.parseObject(msg);
JSONObject msgJson = rootJson.getJSONObject("MSG");
JSONObject body = msgJson.getJSONObject("BODY");
}
HEADER msgHeader = msgJson.getObject("HEADER",HEADER.class);
/**
* 消息实体类
*/
class MSGS implements Serializable {
private MSG MSG;
MSG transMsg= new MSG();
String transBody = body.toJSONString();
transMsg.setHEADER(msgHeader);
transMsg.setBODY(transBody);
public MSG getMSG() {
return MSG;
log.info("5-开始转发消息");
boolean sendResult = sendMsgByFeign(transMsg);
if(!sendResult){
log.error("!!!!!!消息--->{}<---转发失败!!!!!!,尝试重发",transMsg.toString());
//todo:消息备份或者重发?
reTrySend(transMsg);
}
}
}
public void setMSG(MSG MSG) {
this.MSG = MSG;
}
}
class MSG {
/**
* 具体消息头部信
* feign从服务直接发送消
*/
private HEADER HEADER;
/**
* 具体消息支持JSON字符串或者XML
*/
private String BODY;
public HEADER getHEADER() {
return HEADER;
}
public boolean sendMsgByFeign(MSG msg){
MSGS msgs = new MSGS();
msg.getHEADER().setSNDR("HYYW");
public void setHEADER(HEADER HEADER) {
this.HEADER = HEADER;
}
msgs.setMSG(msg);
public String getBODY() {
return BODY;
}
ResultJson response = kafkaSendApi.send(msgs);
public void setBODY(String BODY) {
this.BODY = BODY;
if ("200".equals(response.getCode())){
log.info("………………6-消息发送成功{}………………",response.toString());
return true;
}
@Override
public String toString() {
return this.BODY;
log.info("400-消息发送失败->{}",response.toString());
return false;
}
}
/**
* 消息发送头部信息
*/
class HEADER {
/**
* 消息唯一标识,可以是ID
*/
private long SEQNO;
/**
* 发送的消息大类,在总线系统配置好后,分配给消息生产者
* feign重发消息
*/
private String TYPE;
/**
* 发送的消息子类,在总线系统配置好后,分配给消息生产者
*/
private String STYPE;
/**
* 消息增删改类型(IE=insert event,UE=update event,DE=delete event)
*/
private String OPTYPE;
/**
* 发送时间(如:20210320101421/yyyyMMddHHmmss)
*/
private String DDTM;
/**
* 你的账号名称
*/
private String SNDR;
/**
* 消息接收者
*/
private String RCVR;
public long getSEQNO() {
return SEQNO;
public void reTrySend(MSG msg){
log.error("***进入重发***");
for (int i = 0; i < RETRY_TIMES; i++) {
Boolean sendResult = sendMsgByFeign(msg);
if (sendResult){
log.error("***重发成功***");
break;
}
public void setSEQNO(long SEQNO) {
this.SEQNO = SEQNO;
if (i>85){
log.error("***重发{}次未成功,执行重新登录尝试***",i);
login();
break;
}
public String getTYPE() {
return TYPE;
log.error("***已尝试重发>>>{}<<<次,重发失败***",i);
}
public void setTYPE(String TYPE) {
this.TYPE = TYPE;
}
public String getSTYPE() {
return STYPE;
/**
* 读取备份消息并消息重发
* @return
*/
public void reSend(MSG msg){
log.error("***进入重发***");
for (int i = 0; i < RETRY_TIMES; i++) {
Boolean sendResult = sendMsg(msg);
if (sendResult){
log.error("***重发成功***");
break;
}
public void setSTYPE(String STYPE) {
this.STYPE = STYPE;
if (i>85){
log.error("***重发{}次未成功,执行重新登录尝试***",i);
login();
break;
}
public String getOPTYPE() {
return OPTYPE;
log.error("***已尝试重发>>>{}<<<次,重发失败***",i);
}
public void setOPTYPE(String OPTYPE) {
this.OPTYPE = OPTYPE;
}
public String getDDTM() {
return DDTM;
}
public void setDDTM(String DDTM) {
this.DDTM = DDTM;
}
public String getSNDR() {
return SNDR;
}
}
public void setSNDR(String SNDR) {
this.SNDR = SNDR;
}
public String getRCVR() {
return RCVR;
}
public void setRCVR(String RCVR) {
this.RCVR = RCVR;
}
}
/**
* 收发送接口返回结果实体类
*/
class ResultJson<T> implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 状态码 正确为200,其他为异常
*/
private String code;
/**
* 结果描述
*/
private String msg;
/**
* 结果数据
*/
private T data;
}
... ...