作者 朱兆平

升级直接从kafka获取消息并转发

... ... @@ -161,6 +161,13 @@ message-bus:
password: 111111
#心跳间隔时间默认10秒,单位毫秒
heartbit-interval: 10000
consumer-group-id: HYYWGroup
kafka:
bootstrap-servers: 192.168.1.73:32771
consumer:
properties:
security:
protocol: PLAINTEXT://192.168.1.73:32771
info:
version: 1.0
description: "消息总线-消息转发服务。[转发大数据小组消息到总线上]"
... ...
... ... @@ -6,7 +6,7 @@
<packaging>jar</packaging>
<groupId>com.tianbo</groupId>
<artifactId>messagebus-trans-message</artifactId>
<version>1.0-feign</version>
<version>1.0-feign-kafka</version>
<description>消息转发服务</description>
<parent>
... ... @@ -33,6 +33,12 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<!-- 配置start kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 配置end kafka-->
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
... ...
... ... @@ -29,7 +29,7 @@ public class MessageTransApplication {
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.setPoolSize(3);
return taskScheduler;
}
... ...
package com.tianbo.messagebus.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String servers;
// @Value("${kafka.producer.enable-auto-commit}")
// private boolean enableAutoCommit;
// @Value("${kafka.consumer.session-timeout}")
// private String sessionTimeout;
// @Value("${kafka.producer.auto-commit-interval}")
// private String autoCommitInterval;
//@Value("${kafka.producer.auto-offset-reset}")
// private String autoOffsetReset;
// @Value("${kafka.consumer.concurrency}")
// private int concurrency;
//private static Map<String, Object> propsMap;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(10);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
//配置手动提交offset
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
//if(propsMap==null){
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
propsMap.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,1048576*5);
propsMap.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,60000);
propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,50);
propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,400);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10*60*1000);
//}
return propsMap;
}
// @Bean
// public Listener listener() {
// return new Listener();
// }
}
... ...
package com.tianbo.messagebus.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.HashMap;
import java.util.Map;
/**
* 消费者缓存类
*/
@Slf4j
public class ConsumersCache {
public static Map<String, KafkaConsumer<String, String>> consumerMap;
public static Map<String, KafkaConsumer<String, String>> getConsumerMap() {
if (consumerMap !=null){
return consumerMap;
}
log.trace("初始化消费者缓存");
consumerMap = new HashMap<String, KafkaConsumer<String, String>>();
return consumerMap;
}
}
... ...
... ... @@ -8,6 +8,7 @@ import org.springframework.web.bind.annotation.*;
import java.util.List;
@FeignClient(name = "kafka-server-producer",
url = "http://127.0.0.1:8080/",
fallback = KafkaSendFallback.class )
public interface KafkaSendApi {
... ...
... ... @@ -6,7 +6,6 @@ import com.tianbo.messagebus.model.MSGS;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service("myKafkaSend")
... ... @@ -14,7 +13,6 @@ public class KafkaSendFallback implements KafkaSendApi {
@Override
public ResultJson send(MSGS msgs) {
log.info("发送消息失败");
return new ResultJson<>("400","发送消息失败");
return new ResultJson<>("10400","发送消息失败");
}
}
... ...
package com.tianbo.messagebus.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tianbo.messagebus.config.KafkaConsumerConfig;
import com.tianbo.messagebus.controller.response.ResultJson;
import com.tianbo.messagebus.kafka.ConsumersCache;
import com.tianbo.messagebus.model.HEADER;
import com.tianbo.messagebus.model.MSG;
import com.tianbo.messagebus.model.MSGS;
import com.tianbo.messagebus.myinterface.KafkaSendApi;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
@Service
@Slf4j
public class KafkaReadProcessor {
/**
* 账号名/TOPIC名称
*/
@Value("${message-bus.auth.username}")
private String userName;
@Value("${message-bus.consumer-group-id}")
private String groupName;
@Autowired
KafkaSendApi kafkaSendApi;
@Value("${kafka.bootstrap-servers}")
private String servers;
/**
* 失败重发请求次数
*/
private static final int RETRY_TIMES= 10;
@Scheduled(fixedRate = 6000)
public void msgProcess(){
try{
if (StringUtils.isNotEmpty(userName) && StringUtils.isNotEmpty(groupName)){
log.info("1.【开始】用[{}]组读取topic[{}]->",groupName,userName);
Map<String, Object> map=new KafkaConsumerConfig().consumerConfigs();
map.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
log.info("----2.消费者组为:{}----",groupName);
//针对三个partition创建三个消费者,并缓存
for (int i = 1; i <=3 ; i++) {
KafkaConsumer<String, String> consumer;
String consumerName = userName+"-"+i;
if (ConsumersCache.getConsumerMap().containsKey(consumerName)){
consumer = ConsumersCache.consumerMap.get(consumerName);
log.info("[loop-start]3.从缓存中获取到消费者:{}的消费者信息[{}]。",consumerName,consumer);
}else {
map.put(ConsumerConfig.CLIENT_ID_CONFIG,consumerName);
consumer =new KafkaConsumer<String, String>(map);
ConsumersCache.consumerMap.put(consumerName,consumer);
log.info("3.缓存中没有消费者{}的信息,创建新的消费者信息",consumerName);
}
consumer.subscribe(Arrays.asList(userName));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
log.info("----4.消费者:{}此次成功消费数据{}条----",consumerName,records.count());
if(!records.isEmpty()){
for (ConsumerRecord<String, String> record : records) {
String msg = record.value();
log.info("[loop-start]5.开始处理消息{}",msg);
MSGS msgs = transMsg(msg);
boolean sendResult = sendmsg(msgs);
if(!sendResult){
log.error("!!!!!!消息--->{}<---转发失败!!!!!!,尝试重发",msg);
//todo:消息备份或者重发?
reTrySend(msgs);
}
}
consumer.commitSync();
log.info("5.消费者{}-{}消费提交成功",consumerName,groupName);
}else {
log.info("----[END]5.消费者的TOPIC没有新的消费数据即将返回----");
}
}
}
}catch (Exception e){
e.printStackTrace();
}
}
public MSGS transMsg(String msg){
JSONObject rootJson = JSON.parseObject(msg);
JSONObject msgJson = rootJson.getJSONObject("MSG");
JSONObject body = msgJson.getJSONObject("BODY");
HEADER msgHeader = msgJson.getObject("HEADER",HEADER.class);
msgHeader.setSNDR(userName);
MSG transMsg= new MSG();
String transBody = body.toJSONString();
transMsg.setHEADER(msgHeader);
transMsg.setBODY(transBody);
MSGS msgs = new MSGS();
msgs.setMSG(transMsg);
return msgs;
}
public boolean sendmsg(MSGS msgs){
ResultJson response = kafkaSendApi.send(msgs);
if ("200".equals(response.getCode())){
log.info("………………6-消息发送成功{}………………",response.toString());
return true;
}
log.info("400-消息发送失败->{}",response.toString());
return false;
}
/**
* feign重发消息
*/
public void reTrySend(MSGS msgs){
log.error("***进入重发***");
for (int i = 0; i < RETRY_TIMES; i++) {
boolean sendResult = sendmsg(msgs);
if (sendResult){
log.error("***重发成功***");
break;
}
}
log.error("***已尝试重发>>>{}<<<次,重发失败***",RETRY_TIMES);
}
}
... ...
... ... @@ -81,7 +81,7 @@ public class MessageBusProcessor {
/**
* 失败重发请求次数
*/
private static final int RETRY_TIMES= 100;
private static final int RETRY_TIMES= 10;
/**
* HTTP请求框架
... ... @@ -399,42 +399,50 @@ public class MessageBusProcessor {
/**
* feigin从服务直接获取消息
*/
@Scheduled(fixedRate = 1000)
// @Scheduled(fixedRate = 6000)
public void getDataFromFeigin(){
log.info("1-开始执行获取任务");
ResultJson listResultJson = kafkaReciveApi.recive("HYYW");
List<String> dataList = new ArrayList<>();
if(listResultJson.getData() instanceof List){
dataList = (List) listResultJson.getData();
}
log.info("2-获取结果为:{},数量为:{}",listResultJson.toString(),dataList.size());
if ("200".equals(listResultJson.getCode()) && listResultJson.getData()!=null && dataList.size()>0){
log.info("3-开始处理获取数据");
for (int i = 0; i <dataList.size() ; i++) {
String msg = dataList.get(i);
log.info("4-循环处理消息[{}]--->{}<---",i,msg);
JSONObject rootJson = JSON.parseObject(msg);
JSONObject msgJson = rootJson.getJSONObject("MSG");
JSONObject body = msgJson.getJSONObject("BODY");
HEADER msgHeader = msgJson.getObject("HEADER",HEADER.class);
MSG transMsg= new MSG();
String transBody = body.toJSONString();
transMsg.setHEADER(msgHeader);
transMsg.setBODY(transBody);
log.info("5-开始转发消息");
boolean sendResult = sendMsgByFeign(transMsg);
if(!sendResult){
log.error("!!!!!!消息--->{}<---转发失败!!!!!!,尝试重发",transMsg.toString());
//todo:消息备份或者重发?
reTrySend(transMsg);
try{
log.info("1-开始执行获取任务");
ResultJson listResultJson = kafkaReciveApi.recive("HYYW");
List dataList = new ArrayList<>();
if(listResultJson.getData() instanceof List){
dataList = (List) listResultJson.getData();
}
log.info("2-获取结果为:{},数量为:{}",listResultJson.toString(),dataList.size());
if ("200".equals(listResultJson.getCode()) && listResultJson.getData()!=null && dataList.size()>0){
log.info("3-开始处理获取数据");
for (int i = 0; i <dataList.size() ; i++) {
String msg = ((List<String>) dataList).get(i);
log.info("4-循环处理消息[{}]--->{}<---",i,msg);
JSONObject rootJson = JSON.parseObject(msg);
JSONObject msgJson = rootJson.getJSONObject("MSG");
JSONObject body = msgJson.getJSONObject("BODY");
HEADER msgHeader = msgJson.getObject("HEADER",HEADER.class);
MSG transMsg= new MSG();
String transBody = body.toJSONString();
transMsg.setHEADER(msgHeader);
transMsg.setBODY(transBody);
log.info("5-开始转发消息");
boolean sendResult = sendMsgByFeign(transMsg);
if(!sendResult){
log.error("!!!!!!消息--->{}<---转发失败!!!!!!,尝试重发",transMsg.toString());
//todo:消息备份或者重发?
reTrySend(transMsg);
}
}
}
}catch (Exception e){
log.error("000-获取消息出错{}",e.toString());
e.printStackTrace();
}
}
/**
... ...