正在显示
13 个修改的文件
包含
410 行增加
和
166 行删除
| @@ -82,7 +82,6 @@ spring: | @@ -82,7 +82,6 @@ spring: | ||
| 82 | # max-idle: 10000 | 82 | # max-idle: 10000 |
| 83 | # min-idle: 5 | 83 | # min-idle: 5 |
| 84 | # initial-size: 5 | 84 | # initial-size: 5 |
| 85 | - | ||
| 86 | eureka: | 85 | eureka: |
| 87 | instance: | 86 | instance: |
| 88 | #eureka服务器页面中status的请求路径 | 87 | #eureka服务器页面中status的请求路径 |
| @@ -105,7 +104,6 @@ eureka: | @@ -105,7 +104,6 @@ eureka: | ||
| 105 | registry-fetch-interval-seconds: 30 | 104 | registry-fetch-interval-seconds: 30 |
| 106 | 105 | ||
| 107 | 106 | ||
| 108 | - | ||
| 109 | management: | 107 | management: |
| 110 | endpoints: | 108 | endpoints: |
| 111 | enabled-by-default: true | 109 | enabled-by-default: true |
| @@ -166,3 +164,12 @@ message-bus: | @@ -166,3 +164,12 @@ message-bus: | ||
| 166 | info: | 164 | info: |
| 167 | version: 1.0 | 165 | version: 1.0 |
| 168 | description: "消息总线-消息转发服务。[转发大数据小组消息到总线上]" | 166 | description: "消息总线-消息转发服务。[转发大数据小组消息到总线上]" |
| 167 | +feign: | ||
| 168 | + hystrix: | ||
| 169 | + enabled: false | ||
| 170 | + client: | ||
| 171 | + config: | ||
| 172 | + default: | ||
| 173 | + logger-level: FULL | ||
| 174 | + | ||
| 175 | + |
| @@ -6,7 +6,7 @@ | @@ -6,7 +6,7 @@ | ||
| 6 | <packaging>jar</packaging> | 6 | <packaging>jar</packaging> |
| 7 | <groupId>com.tianbo</groupId> | 7 | <groupId>com.tianbo</groupId> |
| 8 | <artifactId>messagebus-trans-message</artifactId> | 8 | <artifactId>messagebus-trans-message</artifactId> |
| 9 | - <version>1.0-SNAPSHOT</version> | 9 | + <version>1.0-feign</version> |
| 10 | <description>消息转发服务</description> | 10 | <description>消息转发服务</description> |
| 11 | 11 | ||
| 12 | <parent> | 12 | <parent> |
| @@ -41,10 +41,29 @@ | @@ -41,10 +41,29 @@ | ||
| 41 | <scope>provided</scope> | 41 | <scope>provided</scope> |
| 42 | </dependency> | 42 | </dependency> |
| 43 | <dependency> | 43 | <dependency> |
| 44 | + <groupId>org.springframework.cloud</groupId> | ||
| 45 | + <artifactId>spring-cloud-starter-openfeign</artifactId> | ||
| 46 | + </dependency> | ||
| 47 | + <dependency> | ||
| 48 | + <groupId>org.springframework.cloud</groupId> | ||
| 49 | + <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> | ||
| 50 | + </dependency> | ||
| 51 | + <dependency> | ||
| 44 | <groupId>com.alibaba</groupId> | 52 | <groupId>com.alibaba</groupId> |
| 45 | <artifactId>fastjson</artifactId> | 53 | <artifactId>fastjson</artifactId> |
| 46 | <version>${fastjson_version}</version> | 54 | <version>${fastjson_version}</version> |
| 47 | </dependency> | 55 | </dependency> |
| 56 | + <dependency> | ||
| 57 | + <groupId>org.springframework.boot</groupId> | ||
| 58 | + <artifactId>spring-boot-starter-test</artifactId> | ||
| 59 | + <scope>test</scope> | ||
| 60 | + <exclusions> | ||
| 61 | + <exclusion> | ||
| 62 | + <groupId>org.junit.vintage</groupId> | ||
| 63 | + <artifactId>junit-vintage-engine</artifactId> | ||
| 64 | + </exclusion> | ||
| 65 | + </exclusions> | ||
| 66 | + </dependency> | ||
| 48 | </dependencies> | 67 | </dependencies> |
| 49 | <dependencyManagement> | 68 | <dependencyManagement> |
| 50 | <dependencies> | 69 | <dependencies> |
| @@ -65,4 +84,4 @@ | @@ -65,4 +84,4 @@ | ||
| 65 | </plugin> | 84 | </plugin> |
| 66 | </plugins> | 85 | </plugins> |
| 67 | </build> | 86 | </build> |
| 68 | -</project> | ||
| 87 | +</project> |
| @@ -4,6 +4,7 @@ import org.springframework.boot.SpringApplication; | @@ -4,6 +4,7 @@ import org.springframework.boot.SpringApplication; | ||
| 4 | import org.springframework.boot.autoconfigure.SpringBootApplication; | 4 | import org.springframework.boot.autoconfigure.SpringBootApplication; |
| 5 | import org.springframework.boot.web.client.RestTemplateBuilder; | 5 | import org.springframework.boot.web.client.RestTemplateBuilder; |
| 6 | import org.springframework.cloud.netflix.eureka.EnableEurekaClient; | 6 | import org.springframework.cloud.netflix.eureka.EnableEurekaClient; |
| 7 | +import org.springframework.cloud.openfeign.EnableFeignClients; | ||
| 7 | import org.springframework.context.annotation.Bean; | 8 | import org.springframework.context.annotation.Bean; |
| 8 | import org.springframework.scheduling.TaskScheduler; | 9 | import org.springframework.scheduling.TaskScheduler; |
| 9 | import org.springframework.scheduling.annotation.EnableScheduling; | 10 | import org.springframework.scheduling.annotation.EnableScheduling; |
| @@ -12,6 +13,7 @@ import org.springframework.web.client.RestTemplate; | @@ -12,6 +13,7 @@ import org.springframework.web.client.RestTemplate; | ||
| 12 | 13 | ||
| 13 | @SpringBootApplication | 14 | @SpringBootApplication |
| 14 | @EnableEurekaClient | 15 | @EnableEurekaClient |
| 16 | +@EnableFeignClients | ||
| 15 | @EnableScheduling | 17 | @EnableScheduling |
| 16 | public class MessageTransApplication { | 18 | public class MessageTransApplication { |
| 17 | 19 | ||
| @@ -27,7 +29,7 @@ public class MessageTransApplication { | @@ -27,7 +29,7 @@ public class MessageTransApplication { | ||
| 27 | @Bean | 29 | @Bean |
| 28 | public TaskScheduler taskScheduler() { | 30 | public TaskScheduler taskScheduler() { |
| 29 | ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); | 31 | ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); |
| 30 | - taskScheduler.setPoolSize(3); | 32 | + taskScheduler.setPoolSize(100); |
| 31 | return taskScheduler; | 33 | return taskScheduler; |
| 32 | } | 34 | } |
| 33 | 35 |
| @@ -2,13 +2,18 @@ package com.tianbo.messagebus.controller; | @@ -2,13 +2,18 @@ package com.tianbo.messagebus.controller; | ||
| 2 | 2 | ||
| 3 | import com.alibaba.fastjson.JSON; | 3 | import com.alibaba.fastjson.JSON; |
| 4 | import com.alibaba.fastjson.JSONObject; | 4 | import com.alibaba.fastjson.JSONObject; |
| 5 | +import com.tianbo.messagebus.controller.response.ResultJson; | ||
| 6 | +import com.tianbo.messagebus.model.HEADER; | ||
| 7 | +import com.tianbo.messagebus.model.MSG; | ||
| 8 | +import com.tianbo.messagebus.model.MSGS; | ||
| 9 | +import com.tianbo.messagebus.myinterface.KafkaReciveApi; | ||
| 10 | +import com.tianbo.messagebus.myinterface.KafkaSendApi; | ||
| 5 | import com.tianbo.messagebus.service.MessageBusProcessor; | 11 | import com.tianbo.messagebus.service.MessageBusProcessor; |
| 6 | import lombok.extern.slf4j.Slf4j; | 12 | import lombok.extern.slf4j.Slf4j; |
| 7 | import org.apache.commons.lang.StringUtils; | 13 | import org.apache.commons.lang.StringUtils; |
| 8 | import org.springframework.beans.factory.annotation.Autowired; | 14 | import org.springframework.beans.factory.annotation.Autowired; |
| 9 | -import org.springframework.web.bind.annotation.PostMapping; | ||
| 10 | -import org.springframework.web.bind.annotation.RequestMapping; | ||
| 11 | -import org.springframework.web.bind.annotation.RestController; | 15 | +import org.springframework.beans.factory.annotation.Qualifier; |
| 16 | +import org.springframework.web.bind.annotation.*; | ||
| 12 | 17 | ||
| 13 | import javax.annotation.Resource; | 18 | import javax.annotation.Resource; |
| 14 | import javax.servlet.http.HttpServletRequest; | 19 | import javax.servlet.http.HttpServletRequest; |
| @@ -22,22 +27,59 @@ import java.util.Date; | @@ -22,22 +27,59 @@ import java.util.Date; | ||
| 22 | public class HeartbeatController implements Serializable { | 27 | public class HeartbeatController implements Serializable { |
| 23 | private static final long serialVersionUID = 1L; | 28 | private static final long serialVersionUID = 1L; |
| 24 | 29 | ||
| 30 | + @Autowired | ||
| 31 | + KafkaReciveApi kafkaReciveApi; | ||
| 32 | + | ||
| 33 | + @Autowired | ||
| 34 | + KafkaSendApi kafkaSendApi; | ||
| 25 | 35 | ||
| 26 | @Autowired | 36 | @Autowired |
| 27 | private HttpServletRequest request; | 37 | private HttpServletRequest request; |
| 28 | 38 | ||
| 29 | @Autowired | 39 | @Autowired |
| 30 | - MessageBusProcessor messageBusDemo; | 40 | + MessageBusProcessor messageBusProcessor; |
| 31 | 41 | ||
| 32 | @PostMapping("login") | 42 | @PostMapping("login") |
| 33 | public void login(){ | 43 | public void login(){ |
| 34 | - messageBusDemo.login(); | 44 | + messageBusProcessor.login(); |
| 35 | } | 45 | } |
| 36 | 46 | ||
| 37 | @PostMapping("getmsg") | 47 | @PostMapping("getmsg") |
| 38 | public void getmsg(){ | 48 | public void getmsg(){ |
| 39 | log.error("for test"); | 49 | log.error("for test"); |
| 40 | - messageBusDemo.getMsg(); | 50 | + messageBusProcessor.getMsg(); |
| 51 | + } | ||
| 52 | + | ||
| 53 | + @GetMapping("getOneMsg") | ||
| 54 | + public void getonemsg(@RequestParam(value = "username") String username){ | ||
| 55 | + ResultJson resultJson= kafkaReciveApi.recive(username); | ||
| 56 | + log.info("收到结果为:{}",resultJson.toString()); | ||
| 57 | + } | ||
| 58 | + | ||
| 59 | + @PostMapping("send") | ||
| 60 | + public void sendMsg(@RequestParam("msg") String msgBody){ | ||
| 61 | + MSGS msgs = new MSGS(); | ||
| 62 | + MSG msg = new MSG(); | ||
| 63 | + | ||
| 64 | + HEADER header = new HEADER(); | ||
| 65 | + header.setSNDR("TXD"); | ||
| 66 | + header.setDDTM("20210429103322081"); | ||
| 67 | + header.setSEQNO(2021042910); | ||
| 68 | + header.setTYPE("HYXX"); | ||
| 69 | + header.setSTYPE("ISLI"); | ||
| 70 | + | ||
| 71 | + | ||
| 72 | + | ||
| 73 | + msg.setHEADER(header); | ||
| 74 | + msg.setBODY(msgBody); | ||
| 75 | + | ||
| 76 | + | ||
| 77 | + | ||
| 78 | + | ||
| 79 | + msgs.setMSG(msg); | ||
| 80 | + | ||
| 81 | + ResultJson response = kafkaSendApi.send(msgs); | ||
| 82 | + log.info(response.toString()); | ||
| 41 | } | 83 | } |
| 42 | 84 | ||
| 43 | } | 85 | } |
| @@ -85,4 +85,12 @@ public class ResultJson<T> implements Serializable{ | @@ -85,4 +85,12 @@ public class ResultJson<T> implements Serializable{ | ||
| 85 | public void setJwtToken(String jwtToken) { | 85 | public void setJwtToken(String jwtToken) { |
| 86 | this.jwtToken = jwtToken; | 86 | this.jwtToken = jwtToken; |
| 87 | } | 87 | } |
| 88 | + | ||
| 89 | + @Override | ||
| 90 | + public String toString() { | ||
| 91 | + return "ResultJson{" + | ||
| 92 | + "code='" + code + '\'' + | ||
| 93 | + ", msg='" + msg + '\'' + | ||
| 94 | + '}'; | ||
| 95 | + } | ||
| 88 | } | 96 | } |
| 1 | +package com.tianbo.messagebus.model; | ||
| 2 | + | ||
| 3 | +/** | ||
| 4 | + * 消息发送头部信息 | ||
| 5 | + */ | ||
| 6 | +public class HEADER { | ||
| 7 | + /** | ||
| 8 | + * 消息唯一标识,可以是ID | ||
| 9 | + */ | ||
| 10 | + private long SEQNO; | ||
| 11 | + /** | ||
| 12 | + * 发送的消息大类,在总线系统配置好后,分配给消息生产者 | ||
| 13 | + */ | ||
| 14 | + private String TYPE; | ||
| 15 | + /** | ||
| 16 | + * 发送的消息子类,在总线系统配置好后,分配给消息生产者 | ||
| 17 | + */ | ||
| 18 | + private String STYPE; | ||
| 19 | + /** | ||
| 20 | + * 消息增删改类型(IE=insert event,UE=update event,DE=delete event) | ||
| 21 | + */ | ||
| 22 | + private String OPTYPE; | ||
| 23 | + /** | ||
| 24 | + * 发送时间(如:20210320101421/yyyyMMddHHmmss) | ||
| 25 | + */ | ||
| 26 | + private String DDTM; | ||
| 27 | + /** | ||
| 28 | + * 你的账号名称 | ||
| 29 | + */ | ||
| 30 | + private String SNDR; | ||
| 31 | + /** | ||
| 32 | + * 消息接收者 | ||
| 33 | + */ | ||
| 34 | + private String RCVR; | ||
| 35 | + | ||
| 36 | + public long getSEQNO() { | ||
| 37 | + return SEQNO; | ||
| 38 | + } | ||
| 39 | + | ||
| 40 | + public void setSEQNO(long SEQNO) { | ||
| 41 | + this.SEQNO = SEQNO; | ||
| 42 | + } | ||
| 43 | + | ||
| 44 | + public String getTYPE() { | ||
| 45 | + return TYPE; | ||
| 46 | + } | ||
| 47 | + | ||
| 48 | + public void setTYPE(String TYPE) { | ||
| 49 | + this.TYPE = TYPE; | ||
| 50 | + } | ||
| 51 | + | ||
| 52 | + public String getSTYPE() { | ||
| 53 | + return STYPE; | ||
| 54 | + } | ||
| 55 | + | ||
| 56 | + public void setSTYPE(String STYPE) { | ||
| 57 | + this.STYPE = STYPE; | ||
| 58 | + } | ||
| 59 | + | ||
| 60 | + public String getOPTYPE() { | ||
| 61 | + return OPTYPE; | ||
| 62 | + } | ||
| 63 | + | ||
| 64 | + public void setOPTYPE(String OPTYPE) { | ||
| 65 | + this.OPTYPE = OPTYPE; | ||
| 66 | + } | ||
| 67 | + | ||
| 68 | + public String getDDTM() { | ||
| 69 | + return DDTM; | ||
| 70 | + } | ||
| 71 | + | ||
| 72 | + public void setDDTM(String DDTM) { | ||
| 73 | + this.DDTM = DDTM; | ||
| 74 | + } | ||
| 75 | + | ||
| 76 | + public String getSNDR() { | ||
| 77 | + return SNDR; | ||
| 78 | + } | ||
| 79 | + | ||
| 80 | + public void setSNDR(String SNDR) { | ||
| 81 | + this.SNDR = SNDR; | ||
| 82 | + } | ||
| 83 | + | ||
| 84 | + public String getRCVR() { | ||
| 85 | + return RCVR; | ||
| 86 | + } | ||
| 87 | + | ||
| 88 | + public void setRCVR(String RCVR) { | ||
| 89 | + this.RCVR = RCVR; | ||
| 90 | + } | ||
| 91 | +} |
| 1 | +package com.tianbo.messagebus.model; | ||
| 2 | + | ||
| 3 | +public class MSG { | ||
| 4 | + /** | ||
| 5 | + * 具体消息头部信息 | ||
| 6 | + */ | ||
| 7 | + private HEADER HEADER; | ||
| 8 | + /** | ||
| 9 | + * 具体消息支持JSON字符串或者XML | ||
| 10 | + */ | ||
| 11 | + private String BODY; | ||
| 12 | + | ||
| 13 | + public HEADER getHEADER() { | ||
| 14 | + return HEADER; | ||
| 15 | + } | ||
| 16 | + | ||
| 17 | + public void setHEADER(HEADER HEADER) { | ||
| 18 | + this.HEADER = HEADER; | ||
| 19 | + } | ||
| 20 | + | ||
| 21 | + public String getBODY() { | ||
| 22 | + return BODY; | ||
| 23 | + } | ||
| 24 | + | ||
| 25 | + public void setBODY(String BODY) { | ||
| 26 | + this.BODY = BODY; | ||
| 27 | + } | ||
| 28 | + | ||
| 29 | + @Override | ||
| 30 | + public String toString() { | ||
| 31 | + return this.BODY; | ||
| 32 | + } | ||
| 33 | +} |
| 1 | +package com.tianbo.messagebus.model; | ||
| 2 | + | ||
| 3 | +import java.io.Serializable; | ||
| 4 | + | ||
| 5 | +/** | ||
| 6 | + * 消息实体类 | ||
| 7 | + */ | ||
| 8 | +public class MSGS implements Serializable { | ||
| 9 | + | ||
| 10 | + private static final long serialVersionUID = 2L; | ||
| 11 | + | ||
| 12 | + private MSG MSG; | ||
| 13 | + | ||
| 14 | + public MSG getMSG() { | ||
| 15 | + return MSG; | ||
| 16 | + } | ||
| 17 | + | ||
| 18 | + public void setMSG(MSG MSG) { | ||
| 19 | + this.MSG = MSG; | ||
| 20 | + } | ||
| 21 | +} |
| 1 | +package com.tianbo.messagebus.myinterface; | ||
| 2 | + | ||
| 3 | +import com.tianbo.messagebus.controller.response.ResultJson; | ||
| 4 | +import feign.Headers; | ||
| 5 | +import feign.Param; | ||
| 6 | +import org.springframework.cloud.openfeign.FeignClient; | ||
| 7 | +import org.springframework.web.bind.annotation.*; | ||
| 8 | + | ||
| 9 | +import java.util.List; | ||
| 10 | + | ||
| 11 | +@FeignClient(name = "kafka-server-consumer", | ||
| 12 | + fallback = KafkaReciveFallback.class ) | ||
| 13 | +public interface KafkaReciveApi { | ||
| 14 | + | ||
| 15 | + @ResponseBody | ||
| 16 | + @RequestMapping(value = "/kafka/receive",method = RequestMethod.GET) | ||
| 17 | + ResultJson<List<String>> recive(@RequestParam(value = "username",required = true) String username); | ||
| 18 | +} |
| 1 | +package com.tianbo.messagebus.myinterface; | ||
| 2 | + | ||
| 3 | + | ||
| 4 | +import com.tianbo.messagebus.controller.response.ResultJson; | ||
| 5 | +import lombok.extern.slf4j.Slf4j; | ||
| 6 | +import org.springframework.stereotype.Component; | ||
| 7 | +import org.springframework.stereotype.Service; | ||
| 8 | + | ||
| 9 | +import java.util.ArrayList; | ||
| 10 | +import java.util.List; | ||
| 11 | + | ||
| 12 | +@Slf4j | ||
| 13 | +@Service("myKafkaRecive") | ||
| 14 | +public class KafkaReciveFallback implements KafkaReciveApi { | ||
| 15 | + | ||
| 16 | + @Override | ||
| 17 | + public ResultJson<List<String>> recive(String username) { | ||
| 18 | + log.info("获取消息失败"); | ||
| 19 | + return new ResultJson<>("400","获取消息失败",new ArrayList<>()); | ||
| 20 | + } | ||
| 21 | +} |
| 1 | +package com.tianbo.messagebus.myinterface; | ||
| 2 | + | ||
| 3 | +import com.tianbo.messagebus.controller.response.ResultJson; | ||
| 4 | +import com.tianbo.messagebus.model.MSGS; | ||
| 5 | +import org.springframework.cloud.openfeign.FeignClient; | ||
| 6 | +import org.springframework.web.bind.annotation.*; | ||
| 7 | + | ||
| 8 | +import java.util.List; | ||
| 9 | + | ||
| 10 | +@FeignClient(name = "kafka-server-producer", | ||
| 11 | + fallback = KafkaSendFallback.class ) | ||
| 12 | +public interface KafkaSendApi { | ||
| 13 | + | ||
| 14 | + @ResponseBody | ||
| 15 | + @RequestMapping(value = "/kafka/send",method = RequestMethod.POST) | ||
| 16 | + ResultJson send(@RequestBody MSGS msgs); | ||
| 17 | +} |
| 1 | +package com.tianbo.messagebus.myinterface; | ||
| 2 | + | ||
| 3 | + | ||
| 4 | +import com.tianbo.messagebus.controller.response.ResultJson; | ||
| 5 | +import com.tianbo.messagebus.model.MSGS; | ||
| 6 | +import lombok.extern.slf4j.Slf4j; | ||
| 7 | +import org.springframework.stereotype.Service; | ||
| 8 | + | ||
| 9 | +import java.util.List; | ||
| 10 | + | ||
| 11 | +@Slf4j | ||
| 12 | +@Service("myKafkaSend") | ||
| 13 | +public class KafkaSendFallback implements KafkaSendApi { | ||
| 14 | + | ||
| 15 | + @Override | ||
| 16 | + public ResultJson send(MSGS msgs) { | ||
| 17 | + log.info("发送消息失败"); | ||
| 18 | + return new ResultJson<>("400","发送消息失败"); | ||
| 19 | + } | ||
| 20 | +} |
| @@ -3,7 +3,14 @@ package com.tianbo.messagebus.service; | @@ -3,7 +3,14 @@ package com.tianbo.messagebus.service; | ||
| 3 | import com.alibaba.fastjson.JSON; | 3 | import com.alibaba.fastjson.JSON; |
| 4 | import com.alibaba.fastjson.JSONArray; | 4 | import com.alibaba.fastjson.JSONArray; |
| 5 | import com.alibaba.fastjson.JSONObject; | 5 | import com.alibaba.fastjson.JSONObject; |
| 6 | +import com.tianbo.messagebus.controller.response.ResultJson; | ||
| 7 | +import com.tianbo.messagebus.model.HEADER; | ||
| 8 | +import com.tianbo.messagebus.model.MSG; | ||
| 9 | +import com.tianbo.messagebus.model.MSGS; | ||
| 10 | +import com.tianbo.messagebus.myinterface.KafkaReciveApi; | ||
| 11 | +import com.tianbo.messagebus.myinterface.KafkaSendApi; | ||
| 6 | import lombok.extern.slf4j.Slf4j; | 12 | import lombok.extern.slf4j.Slf4j; |
| 13 | +import org.springframework.beans.factory.annotation.Autowired; | ||
| 7 | import org.springframework.beans.factory.annotation.Value; | 14 | import org.springframework.beans.factory.annotation.Value; |
| 8 | import org.springframework.http.*; | 15 | import org.springframework.http.*; |
| 9 | import org.springframework.scheduling.annotation.Async; | 16 | import org.springframework.scheduling.annotation.Async; |
| @@ -13,15 +20,14 @@ import org.springframework.stereotype.Service; | @@ -13,15 +20,14 @@ import org.springframework.stereotype.Service; | ||
| 13 | import org.springframework.util.LinkedMultiValueMap; | 20 | import org.springframework.util.LinkedMultiValueMap; |
| 14 | import org.springframework.util.MultiValueMap; | 21 | import org.springframework.util.MultiValueMap; |
| 15 | import org.springframework.util.StringUtils; | 22 | import org.springframework.util.StringUtils; |
| 16 | -import org.springframework.web.client.HttpClientErrorException; | ||
| 17 | -import org.springframework.web.client.RestClientException; | ||
| 18 | import org.springframework.web.client.RestTemplate; | 23 | import org.springframework.web.client.RestTemplate; |
| 19 | 24 | ||
| 20 | import javax.annotation.Resource; | 25 | import javax.annotation.Resource; |
| 21 | import java.io.Serializable; | 26 | import java.io.Serializable; |
| 27 | +import java.util.List; | ||
| 22 | 28 | ||
| 23 | @Service | 29 | @Service |
| 24 | -@EnableAsync | 30 | +//@EnableAsync |
| 25 | @Slf4j | 31 | @Slf4j |
| 26 | public class MessageBusProcessor { | 32 | public class MessageBusProcessor { |
| 27 | 33 | ||
| @@ -74,7 +80,7 @@ public class MessageBusProcessor { | @@ -74,7 +80,7 @@ public class MessageBusProcessor { | ||
| 74 | /** | 80 | /** |
| 75 | * 失败重发请求次数 | 81 | * 失败重发请求次数 |
| 76 | */ | 82 | */ |
| 77 | - private static final int RETRY_TIMES= 3; | 83 | + private static final int RETRY_TIMES= 100; |
| 78 | 84 | ||
| 79 | /** | 85 | /** |
| 80 | * HTTP请求框架 | 86 | * HTTP请求框架 |
| @@ -82,6 +88,12 @@ public class MessageBusProcessor { | @@ -82,6 +88,12 @@ public class MessageBusProcessor { | ||
| 82 | @Resource | 88 | @Resource |
| 83 | private RestTemplate restTemplate; | 89 | private RestTemplate restTemplate; |
| 84 | 90 | ||
| 91 | + @Autowired | ||
| 92 | + KafkaReciveApi kafkaReciveApi; | ||
| 93 | + | ||
| 94 | + @Autowired | ||
| 95 | + KafkaSendApi kafkaSendApi; | ||
| 96 | + | ||
| 85 | /** | 97 | /** |
| 86 | * 发起登录,存储token | 98 | * 发起登录,存储token |
| 87 | * | 99 | * |
| @@ -192,7 +204,7 @@ public class MessageBusProcessor { | @@ -192,7 +204,7 @@ public class MessageBusProcessor { | ||
| 192 | 204 | ||
| 193 | return code; | 205 | return code; |
| 194 | } | 206 | } |
| 195 | - @Scheduled(fixedDelay = 10000) | 207 | +// @Scheduled(fixedDelay = 10000) |
| 196 | public void heartBit() { | 208 | public void heartBit() { |
| 197 | if (!StringUtils.isEmpty(TOKEN) && LOGIN_STATUS){ | 209 | if (!StringUtils.isEmpty(TOKEN) && LOGIN_STATUS){ |
| 198 | /* | 210 | /* |
| @@ -237,7 +249,7 @@ public class MessageBusProcessor { | @@ -237,7 +249,7 @@ public class MessageBusProcessor { | ||
| 237 | public Boolean sendMsg(MSG msg) { | 249 | public Boolean sendMsg(MSG msg) { |
| 238 | if (LOGIN_STATUS) { | 250 | if (LOGIN_STATUS) { |
| 239 | try{ | 251 | try{ |
| 240 | - log.info("开始转发消息:{}",msg.toString()); | 252 | + log.info("………………开始发送消息:{}………………",msg.toString()); |
| 241 | /* | 253 | /* |
| 242 | * 发起HTTP 登录请求 | 254 | * 发起HTTP 登录请求 |
| 243 | * 登录接口的请求头为application/json | 255 | * 登录接口的请求头为application/json |
| @@ -277,7 +289,7 @@ public class MessageBusProcessor { | @@ -277,7 +289,7 @@ public class MessageBusProcessor { | ||
| 277 | JSONObject resJson = JSON.parseObject(response.getBody()); | 289 | JSONObject resJson = JSON.parseObject(response.getBody()); |
| 278 | String code = resJson.getString("code"); | 290 | String code = resJson.getString("code"); |
| 279 | if ("200".equals(code)) { | 291 | if ("200".equals(code)) { |
| 280 | - log.info("消息发送成功"); | 292 | + log.info("………………消息发送成功………………"); |
| 281 | return true; | 293 | return true; |
| 282 | } | 294 | } |
| 283 | } | 295 | } |
| @@ -300,7 +312,7 @@ public class MessageBusProcessor { | @@ -300,7 +312,7 @@ public class MessageBusProcessor { | ||
| 300 | * @return | 312 | * @return |
| 301 | */ | 313 | */ |
| 302 | // @Async | 314 | // @Async |
| 303 | - @Scheduled(fixedRate = 1000) | 315 | +// @Scheduled(fixedRate = 1000) |
| 304 | public JSONArray getMsg() { | 316 | public JSONArray getMsg() { |
| 305 | if(!LOGIN_STATUS){ | 317 | if(!LOGIN_STATUS){ |
| 306 | login(); | 318 | login(); |
| @@ -326,7 +338,7 @@ public class MessageBusProcessor { | @@ -326,7 +338,7 @@ public class MessageBusProcessor { | ||
| 326 | */ | 338 | */ |
| 327 | ResponseEntity<String> response = restTemplate.postForEntity(GET_MSG_URL, request, String.class); | 339 | ResponseEntity<String> response = restTemplate.postForEntity(GET_MSG_URL, request, String.class); |
| 328 | // 输出结果 | 340 | // 输出结果 |
| 329 | - | 341 | + log.info("接口访问结果:{}",response); |
| 330 | 342 | ||
| 331 | if (response.getStatusCode().equals(HttpStatus.OK)) { | 343 | if (response.getStatusCode().equals(HttpStatus.OK)) { |
| 332 | /* | 344 | /* |
| @@ -343,7 +355,7 @@ public class MessageBusProcessor { | @@ -343,7 +355,7 @@ public class MessageBusProcessor { | ||
| 343 | 取得是大数据小组的实体,他们的msg.body的封装是以对象实体object封装的。不是json字符串。 | 355 | 取得是大数据小组的实体,他们的msg.body的封装是以对象实体object封装的。不是json字符串。 |
| 344 | */ | 356 | */ |
| 345 | String msg = data.getObject(i,String.class); | 357 | String msg = data.getObject(i,String.class); |
| 346 | - log.info("开始转发消息---{}---",msg); | 358 | + log.info("循环处理消息[{}]---{}---",i,msg); |
| 347 | JSONObject rootJson = JSON.parseObject(msg); | 359 | JSONObject rootJson = JSON.parseObject(msg); |
| 348 | JSONObject msgJson = rootJson.getJSONObject("MSG"); | 360 | JSONObject msgJson = rootJson.getJSONObject("MSG"); |
| 349 | JSONObject body = msgJson.getJSONObject("BODY"); | 361 | JSONObject body = msgJson.getJSONObject("BODY"); |
| @@ -384,178 +396,111 @@ public class MessageBusProcessor { | @@ -384,178 +396,111 @@ public class MessageBusProcessor { | ||
| 384 | } | 396 | } |
| 385 | 397 | ||
| 386 | /** | 398 | /** |
| 387 | - * 读取备份消息并消息重发 | ||
| 388 | - * @return | 399 | + * feigin从服务直接获取消息 |
| 389 | */ | 400 | */ |
| 390 | - public void reSend(MSG msg){ | ||
| 391 | - log.error("***进入重发***"); | ||
| 392 | - for (int i = 0; i < RETRY_TIMES; i++) { | ||
| 393 | - Boolean sendResult = sendMsg(msg); | ||
| 394 | - if (sendResult){ | ||
| 395 | - log.error("***重发成功***"); | ||
| 396 | - break; | 401 | + @Scheduled(fixedRate = 1000) |
| 402 | + public void getDataFromFeigin(){ | ||
| 403 | + | ||
| 404 | + log.info("1-开始执行获取任务"); | ||
| 405 | + ResultJson<List<String>> listResultJson = kafkaReciveApi.recive("HYYW"); | ||
| 406 | + log.info("2-获取结果为:{},数量为:{}",listResultJson.toString(),listResultJson.getData().size()); | ||
| 407 | + if ("200".equals(listResultJson.getCode()) && listResultJson.getData()!=null && listResultJson.getData().size()>0){ | ||
| 408 | + log.info("3-开始处理获取数据"); | ||
| 409 | + List<String> dataList = listResultJson.getData(); | ||
| 410 | + for (int i = 0; i <dataList.size() ; i++) { | ||
| 411 | + String msg = dataList.get(i); | ||
| 412 | + log.info("4-循环处理消息[{}]--->{}<---",i,msg); | ||
| 413 | + JSONObject rootJson = JSON.parseObject(msg); | ||
| 414 | + JSONObject msgJson = rootJson.getJSONObject("MSG"); | ||
| 415 | + JSONObject body = msgJson.getJSONObject("BODY"); | ||
| 416 | + | ||
| 417 | + HEADER msgHeader = msgJson.getObject("HEADER",HEADER.class); | ||
| 418 | + | ||
| 419 | + MSG transMsg= new MSG(); | ||
| 420 | + String transBody = body.toJSONString(); | ||
| 421 | + transMsg.setHEADER(msgHeader); | ||
| 422 | + transMsg.setBODY(transBody); | ||
| 423 | + | ||
| 424 | + log.info("5-开始转发消息"); | ||
| 425 | + boolean sendResult = sendMsgByFeign(transMsg); | ||
| 426 | + if(!sendResult){ | ||
| 427 | + log.error("!!!!!!消息--->{}<---转发失败!!!!!!,尝试重发",transMsg.toString()); | ||
| 428 | + //todo:消息备份或者重发? | ||
| 429 | + reTrySend(transMsg); | ||
| 430 | + } | ||
| 397 | } | 431 | } |
| 398 | } | 432 | } |
| 399 | - log.error("***已尝试重发三次,重发失败***"); | ||
| 400 | - } | ||
| 401 | - | ||
| 402 | -} | ||
| 403 | 433 | ||
| 404 | -/** | ||
| 405 | - * 消息实体类 | ||
| 406 | - */ | ||
| 407 | -class MSGS implements Serializable { | ||
| 408 | - private MSG MSG; | ||
| 409 | - | ||
| 410 | - public MSG getMSG() { | ||
| 411 | - return MSG; | ||
| 412 | - } | ||
| 413 | - | ||
| 414 | - public void setMSG(MSG MSG) { | ||
| 415 | - this.MSG = MSG; | ||
| 416 | } | 434 | } |
| 417 | -} | ||
| 418 | 435 | ||
| 419 | -class MSG { | ||
| 420 | - /** | ||
| 421 | - * 具体消息头部信息 | ||
| 422 | - */ | ||
| 423 | - private HEADER HEADER; | ||
| 424 | /** | 436 | /** |
| 425 | - * 具体消息支持JSON字符串或者XML | 437 | + * feign从服务直接发送消息 |
| 426 | */ | 438 | */ |
| 427 | - private String BODY; | 439 | + public boolean sendMsgByFeign(MSG msg){ |
| 440 | + MSGS msgs = new MSGS(); | ||
| 441 | + msg.getHEADER().setSNDR("HYYW"); | ||
| 428 | 442 | ||
| 429 | - public HEADER getHEADER() { | ||
| 430 | - return HEADER; | ||
| 431 | - } | ||
| 432 | - | ||
| 433 | - public void setHEADER(HEADER HEADER) { | ||
| 434 | - this.HEADER = HEADER; | ||
| 435 | - } | 443 | + msgs.setMSG(msg); |
| 436 | 444 | ||
| 437 | - public String getBODY() { | ||
| 438 | - return BODY; | ||
| 439 | - } | ||
| 440 | - | ||
| 441 | - public void setBODY(String BODY) { | ||
| 442 | - this.BODY = BODY; | ||
| 443 | - } | 445 | + ResultJson response = kafkaSendApi.send(msgs); |
| 444 | 446 | ||
| 445 | - @Override | ||
| 446 | - public String toString() { | ||
| 447 | - return this.BODY; | 447 | + if ("200".equals(response.getCode())){ |
| 448 | + log.info("………………6-消息发送成功{}………………",response.toString()); | ||
| 449 | + return true; | ||
| 450 | + } | ||
| 451 | + log.info("400-消息发送失败->{}",response.toString()); | ||
| 452 | + return false; | ||
| 448 | } | 453 | } |
| 449 | -} | ||
| 450 | 454 | ||
| 451 | -/** | ||
| 452 | - * 消息发送头部信息 | ||
| 453 | - */ | ||
| 454 | -class HEADER { | ||
| 455 | - /** | ||
| 456 | - * 消息唯一标识,可以是ID | ||
| 457 | - */ | ||
| 458 | - private long SEQNO; | ||
| 459 | - /** | ||
| 460 | - * 发送的消息大类,在总线系统配置好后,分配给消息生产者 | ||
| 461 | - */ | ||
| 462 | - private String TYPE; | ||
| 463 | /** | 455 | /** |
| 464 | - * 发送的消息子类,在总线系统配置好后,分配给消息生产者 | 456 | + * feign重发消息 |
| 465 | */ | 457 | */ |
| 466 | - private String STYPE; | ||
| 467 | - /** | ||
| 468 | - * 消息增删改类型(IE=insert event,UE=update event,DE=delete event) | ||
| 469 | - */ | ||
| 470 | - private String OPTYPE; | ||
| 471 | - /** | ||
| 472 | - * 发送时间(如:20210320101421/yyyyMMddHHmmss) | ||
| 473 | - */ | ||
| 474 | - private String DDTM; | ||
| 475 | - /** | ||
| 476 | - * 你的账号名称 | ||
| 477 | - */ | ||
| 478 | - private String SNDR; | 458 | + public void reTrySend(MSG msg){ |
| 459 | + log.error("***进入重发***"); | ||
| 460 | + for (int i = 0; i < RETRY_TIMES; i++) { | ||
| 461 | + Boolean sendResult = sendMsgByFeign(msg); | ||
| 462 | + if (sendResult){ | ||
| 463 | + log.error("***重发成功***"); | ||
| 464 | + break; | ||
| 465 | + } | ||
| 466 | + if (i>85){ | ||
| 467 | + log.error("***重发{}次未成功,执行重新登录尝试***",i); | ||
| 468 | + login(); | ||
| 469 | + break; | ||
| 470 | + } | ||
| 471 | + log.error("***已尝试重发>>>{}<<<次,重发失败***",i); | ||
| 472 | + } | ||
| 473 | + } | ||
| 479 | /** | 474 | /** |
| 480 | - * 消息接收者 | 475 | + * 读取备份消息并消息重发 |
| 476 | + * @return | ||
| 481 | */ | 477 | */ |
| 482 | - private String RCVR; | ||
| 483 | - | ||
| 484 | - public long getSEQNO() { | ||
| 485 | - return SEQNO; | ||
| 486 | - } | ||
| 487 | - | ||
| 488 | - public void setSEQNO(long SEQNO) { | ||
| 489 | - this.SEQNO = SEQNO; | ||
| 490 | - } | ||
| 491 | - | ||
| 492 | - public String getTYPE() { | ||
| 493 | - return TYPE; | ||
| 494 | - } | ||
| 495 | - | ||
| 496 | - public void setTYPE(String TYPE) { | ||
| 497 | - this.TYPE = TYPE; | ||
| 498 | - } | ||
| 499 | - | ||
| 500 | - public String getSTYPE() { | ||
| 501 | - return STYPE; | ||
| 502 | - } | ||
| 503 | - | ||
| 504 | - public void setSTYPE(String STYPE) { | ||
| 505 | - this.STYPE = STYPE; | ||
| 506 | - } | 478 | + public void reSend(MSG msg){ |
| 479 | + log.error("***进入重发***"); | ||
| 480 | + for (int i = 0; i < RETRY_TIMES; i++) { | ||
| 481 | + Boolean sendResult = sendMsg(msg); | ||
| 482 | + if (sendResult){ | ||
| 483 | + log.error("***重发成功***"); | ||
| 484 | + break; | ||
| 485 | + } | ||
| 486 | + if (i>85){ | ||
| 487 | + log.error("***重发{}次未成功,执行重新登录尝试***",i); | ||
| 488 | + login(); | ||
| 489 | + break; | ||
| 490 | + } | ||
| 491 | + log.error("***已尝试重发>>>{}<<<次,重发失败***",i); | ||
| 492 | + } | ||
| 507 | 493 | ||
| 508 | - public String getOPTYPE() { | ||
| 509 | - return OPTYPE; | ||
| 510 | } | 494 | } |
| 511 | 495 | ||
| 512 | - public void setOPTYPE(String OPTYPE) { | ||
| 513 | - this.OPTYPE = OPTYPE; | ||
| 514 | - } | ||
| 515 | 496 | ||
| 516 | - public String getDDTM() { | ||
| 517 | - return DDTM; | ||
| 518 | - } | ||
| 519 | 497 | ||
| 520 | - public void setDDTM(String DDTM) { | ||
| 521 | - this.DDTM = DDTM; | ||
| 522 | - } | 498 | +} |
| 523 | 499 | ||
| 524 | - public String getSNDR() { | ||
| 525 | - return SNDR; | ||
| 526 | - } | ||
| 527 | 500 | ||
| 528 | - public void setSNDR(String SNDR) { | ||
| 529 | - this.SNDR = SNDR; | ||
| 530 | - } | ||
| 531 | 501 | ||
| 532 | - public String getRCVR() { | ||
| 533 | - return RCVR; | ||
| 534 | - } | ||
| 535 | 502 | ||
| 536 | - public void setRCVR(String RCVR) { | ||
| 537 | - this.RCVR = RCVR; | ||
| 538 | - } | ||
| 539 | -} | ||
| 540 | 503 | ||
| 541 | -/** | ||
| 542 | - * 收发送接口返回结果实体类 | ||
| 543 | - */ | ||
| 544 | 504 | ||
| 545 | -class ResultJson<T> implements Serializable { | ||
| 546 | - private static final long serialVersionUID = 1L; | ||
| 547 | 505 | ||
| 548 | - /** | ||
| 549 | - * 状态码 正确为200,其他为异常 | ||
| 550 | - */ | ||
| 551 | - private String code; | ||
| 552 | - /** | ||
| 553 | - * 结果描述 | ||
| 554 | - */ | ||
| 555 | - private String msg; | ||
| 556 | 506 | ||
| 557 | - /** | ||
| 558 | - * 结果数据 | ||
| 559 | - */ | ||
| 560 | - private T data; | ||
| 561 | -} |
-
请 注册 或 登录 后发表评论