作者 朱兆平

升级为feign访问服务进行消息转发,优化

... ... @@ -171,5 +171,7 @@ feign:
config:
default:
logger-level: FULL
httpclient:
connection-timeout: 2000
... ...
... ... @@ -29,7 +29,7 @@ public class MessageTransApplication {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(100);
taskScheduler.setPoolSize(10);
return taskScheduler;
}
... ...
package com.tianbo.messagebus.myinterface;
import com.tianbo.messagebus.controller.response.ResultJson;
import feign.Headers;
import feign.Param;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@FeignClient(name = "kafka-server-consumer",
url = "http://127.0.0.1:8088/",
fallback = KafkaReciveFallback.class )
public interface KafkaReciveApi {
@ResponseBody
@RequestMapping(value = "/kafka/receive",method = RequestMethod.GET)
ResultJson<List<String>> recive(@RequestParam(value = "username",required = true) String username);
ResultJson recive(@RequestParam(value = "username",required = true) String username);
}
... ...
... ... @@ -14,7 +14,7 @@ import java.util.List;
public class KafkaReciveFallback implements KafkaReciveApi {
@Override
public ResultJson<List<String>> recive(String username) {
public ResultJson recive(String username) {
log.info("获取消息失败");
return new ResultJson<>("400","获取消息失败",new ArrayList<>());
}
... ...
... ... @@ -24,6 +24,7 @@ import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@Service
... ... @@ -402,11 +403,14 @@ public class MessageBusProcessor {
public void getDataFromFeigin(){
log.info("1-开始执行获取任务");
ResultJson<List<String>> listResultJson = kafkaReciveApi.recive("HYYW");
log.info("2-获取结果为:{},数量为:{}",listResultJson.toString(),listResultJson.getData().size());
if ("200".equals(listResultJson.getCode()) && listResultJson.getData()!=null && listResultJson.getData().size()>0){
ResultJson listResultJson = kafkaReciveApi.recive("HYYW");
List<String> dataList = new ArrayList<>();
if(listResultJson.getData() instanceof List){
dataList = (List) listResultJson.getData();
}
log.info("2-获取结果为:{},数量为:{}",listResultJson.toString(),dataList.size());
if ("200".equals(listResultJson.getCode()) && listResultJson.getData()!=null && dataList.size()>0){
log.info("3-开始处理获取数据");
List<String> dataList = listResultJson.getData();
for (int i = 0; i <dataList.size() ; i++) {
String msg = dataList.get(i);
log.info("4-循环处理消息[{}]--->{}<---",i,msg);
... ...