作者 朱兆平

多线程及线程锁

... ... @@ -164,6 +164,7 @@ message-bus:
consumer-group-id: HYYWGroup
kafka:
bootstrap-servers: 192.168.1.73:32771
max-poll-records: 60
consumer:
properties:
security:
... ...
... ... @@ -6,7 +6,7 @@
<packaging>jar</packaging>
<groupId>com.tianbo</groupId>
<artifactId>messagebus-trans-message</artifactId>
<version>1.0-feign-kafka</version>
<version>1.4-feign-kafka</version>
<description>消息转发服务</description>
<parent>
... ...
... ... @@ -3,6 +3,7 @@ package com.tianbo.messagebus;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
... ... @@ -15,6 +16,7 @@ import org.springframework.web.client.RestTemplate;
@EnableEurekaClient
@EnableFeignClients
@EnableScheduling
@EnableCircuitBreaker
public class MessageTransApplication {
public static void main(String[] args) {
... ...
... ... @@ -63,8 +63,8 @@ public class KafkaConsumerConfig {
propsMap.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,60000);
propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,50);
propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,400);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10*60*1000);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 60);
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120*1000);
//}
return propsMap;
}
... ...
... ... @@ -13,6 +13,7 @@ import java.util.Map;
@Slf4j
public class ConsumersCache {
public static Map<String, KafkaConsumer<String, String>> consumerMap;
public static Map<String, Boolean> consumerLock;
public static Map<String, KafkaConsumer<String, String>> getConsumerMap() {
if (consumerMap !=null){
... ... @@ -22,4 +23,30 @@ public class ConsumersCache {
consumerMap = new HashMap<String, KafkaConsumer<String, String>>();
return consumerMap;
}
public static Map<String, Boolean> getConsumerLock() {
if (consumerMap !=null){
return consumerLock;
}
log.trace("初始化消费者锁缓存");
consumerLock = new HashMap<String, Boolean>();
return consumerLock;
}
public static void lock(String key){
getConsumerLock();
consumerLock.put(key,true);
}
public static void unlock(String key){
getConsumerLock();
consumerLock.put(key,false);
}
public static Boolean getLockState(String key){
getConsumerLock();
return consumerLock.get(key);
}
}
... ...
package com.tianbo.messagebus.model;
import java.util.List;
public class Cache {
public static List<MSGS> SEND_CACHE;
}
... ...
package com.tianbo.messagebus.model;
import com.alibaba.fastjson.JSONObject;
public class MSG {
/**
* 具体消息头部信息
... ... @@ -8,7 +10,7 @@ public class MSG {
/**
* 具体消息支持JSON字符串或者XML
*/
private String BODY;
private Object BODY;
public HEADER getHEADER() {
return HEADER;
... ... @@ -18,16 +20,16 @@ public class MSG {
this.HEADER = HEADER;
}
public String getBODY() {
public Object getBODY() {
return BODY;
}
public void setBODY(String BODY) {
public void setBODY(Object BODY) {
this.BODY = BODY;
}
@Override
public String toString() {
return this.BODY;
return JSONObject.toJSONString(this.BODY);
}
}
... ...
package com.tianbo.messagebus.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.tianbo.messagebus.config.KafkaConsumerConfig;
import com.tianbo.messagebus.controller.response.ResultJson;
import com.tianbo.messagebus.kafka.ConsumersCache;
import com.tianbo.messagebus.model.HEADER;
import com.tianbo.messagebus.model.MSG;
import com.tianbo.messagebus.model.MSGS;
import com.tianbo.messagebus.myinterface.KafkaSendApi;
import feign.FeignException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
... ... @@ -17,6 +15,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
... ... @@ -26,6 +26,7 @@ import java.util.Map;
@Service
@Slf4j
@EnableAsync
public class KafkaReadProcessor {
/**
... ... @@ -42,12 +43,20 @@ public class KafkaReadProcessor {
@Value("${kafka.bootstrap-servers}")
private String servers;
@Value("${kafka.max-poll-records}")
private String maxPollRecords;
/**
* 失败重发请求次数
*/
private static final int RETRY_TIMES= 10;
@Scheduled(fixedRate = 6000)
/**
* 多线程中consumer锁
*/
@Async
@Scheduled(fixedRate = 3000)
public void msgProcess(){
try{
if (StringUtils.isNotEmpty(userName) && StringUtils.isNotEmpty(groupName)){
... ... @@ -55,12 +64,25 @@ public class KafkaReadProcessor {
Map<String, Object> map=new KafkaConsumerConfig().consumerConfigs();
map.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
map.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
log.info("----2.消费者组为:{}----",groupName);
//针对三个partition创建三个消费者,并缓存
for (int i = 1; i <=3 ; i++) {
KafkaConsumer<String, String> consumer;
String consumerName = userName+"-"+i;
//消费锁判定
Map map_lock = ConsumersCache.getConsumerLock();
if (map_lock.containsKey(consumerName) && ConsumersCache.getLockState(consumerName)){
log.info("[CONSUMER-LOCK-{}] 消费状态为锁定,正在消费",consumerName);
return;
}else {
log.info("[CONSUMER-LOCK-{}] 消费状态为正常,可以消费",consumerName);
ConsumersCache.lock(consumerName);
}
if (ConsumersCache.getConsumerMap().containsKey(consumerName)){
consumer = ConsumersCache.consumerMap.get(consumerName);
log.info("[loop-start]3.从缓存中获取到消费者:{}的消费者信息[{}]。",consumerName,consumer);
... ... @@ -68,12 +90,12 @@ public class KafkaReadProcessor {
map.put(ConsumerConfig.CLIENT_ID_CONFIG,consumerName);
consumer =new KafkaConsumer<String, String>(map);
ConsumersCache.consumerMap.put(consumerName,consumer);
log.info("3.缓存中没有消费者{}的信息,创建新的消费者信息",consumerName);
log.info("[CONSUMER] 3.缓存中没有消费者{}的信息,创建新的消费者信息",consumerName);
}
consumer.subscribe(Arrays.asList(userName));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
log.info("----4.消费者:{}此次成功消费数据{}条----",consumerName,records.count());
log.info("----[CONSUMER] 4.消费者:{}此次成功消费数据{}条----",consumerName,records.count());
if(!records.isEmpty()){
for (ConsumerRecord<String, String> record : records) {
... ... @@ -87,65 +109,80 @@ public class KafkaReadProcessor {
//todo:消息备份或者重发?
reTrySend(msgs);
}
}
consumer.commitSync();
log.info("5.消费者{}-{}消费提交成功",consumerName,groupName);
log.info("[CONSUMER] 消费者{}-{}消费提交成功",consumerName,groupName);
}else {
log.info("----[END]5.消费者的TOPIC没有新的消费数据即将返回----");
}
ConsumersCache.unlock(consumerName);
}
}
}catch (Exception e){
e.printStackTrace();
log.info("转发出错,{}",e.toString());
log.error("转发出错,{}",e.toString());
ConsumersCache.unlock(userName+"-"+1);
ConsumersCache.unlock(userName+"-"+2);
ConsumersCache.unlock(userName+"-"+3);
}
}
public MSGS transMsg(String msg){
JSONObject rootJson = JSON.parseObject(msg);
JSONObject msgJson = rootJson.getJSONObject("MSG");
JSONObject body = msgJson.getJSONObject("BODY");
HEADER msgHeader = msgJson.getObject("HEADER",HEADER.class);
msgHeader.setSNDR(userName);
MSG transMsg= new MSG();
String transBody = body.toJSONString();
transMsg.setHEADER(msgHeader);
transMsg.setBODY(transBody);
MSGS msgs = new MSGS();
msgs.setMSG(transMsg);
return msgs;
MSGS rootJson = JSON.parseObject(msg,MSGS.class);
rootJson.getMSG().getHEADER().setSNDR(userName);
// JSONObject msgJson = rootJson.getJSONObject("MSG");
// JSONObject body = msgJson.getJSONObject("BODY");
//
// HEADER msgHeader = msgJson.getObject("HEADER",HEADER.class);
// msgHeader.setSNDR(userName);
//
// MSG transMsg= new MSG();
// transMsg.setHEADER(msgHeader);
// transMsg.setBODY(body);
//
// MSGS msgs = new MSGS();
// msgs.setMSG(transMsg);
return rootJson;
}
public boolean sendmsg(MSGS msgs){
ResultJson response = kafkaSendApi.send(msgs);
try {
ResultJson response = kafkaSendApi.send(msgs);
if ("200".equals(response.getCode())){
log.info("………………6-消息发送成功{}………………",response.toString());
return true;
if ("200".equals(response.getCode())){
log.info("[SEND-PRODUCT]………………消息发送成功{}………………",response.toString());
return true;
}
log.info("[SEND-PRODUCT]400-消息发送失败->{}",response.toString());
}catch (FeignException ex){
log.error("[SEND-PRODUCT] 发送服务调用失败-->>{}",ex.toString());
}
log.info("400-消息发送失败->{}",response.toString());
return false;
}
/**
* feign重发消息
*/
public void reTrySend(MSGS msgs){
public boolean reTrySend(MSGS msgs) throws InterruptedException {
log.error("***进入重发***");
for (int i = 0; i < RETRY_TIMES; i++) {
boolean flag = false;
int i = 0;
while (true){
Thread.sleep(1000);
i++;
boolean sendResult = sendmsg(msgs);
if (sendResult){
log.error("***重发成功***");
log.error("[RESEND-PRODUCT]***重发成功,重发次数({})***",i);
log.info("[RESEND-PRODUCT]***重发成功,重发次数({})***",i);
flag = true;
break;
}
}
log.error("***已尝试重发>>>{}<<<次,重发失败***",RETRY_TIMES);
return flag;
}
}
... ...
package com.tianbo.messagebus.service;
import com.tianbo.messagebus.controller.response.ResultJson;
import com.tianbo.messagebus.model.Cache;
import com.tianbo.messagebus.model.MSGS;
import com.tianbo.messagebus.myinterface.KafkaSendApi;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
/**
* 重发服务
* 从内存数组队列读取报文,并发送直到发送成功为止,
* 代表这个程序不能随便重启了,重启会丢失需要重复发的数据
*/
@Service
@Slf4j
public class ResendProcessor {
private static int TIMES = 20;
@Autowired
KafkaSendApi kafkaSendApi;
public boolean sendmsg(MSGS msgs){
ResultJson response = kafkaSendApi.send(msgs);
if ("200".equals(response.getCode())){
log.info("………………重发消息发送成功{}………………",response.toString());
return true;
}
log.info("400-重发消息发送失败->{}",response.toString());
return false;
}
public void resend(){
if (!Cache.SEND_CACHE.isEmpty()){
log.info("开始从内存读取报文,发送失败的报文,待重发数量->{}",Cache.SEND_CACHE.size());
Cache.SEND_CACHE.removeIf(this::send);
}
}
private boolean send(MSGS msgs){
for (int j = 0; j < TIMES; j++) {
if(sendmsg(msgs)) {
return true;
}
}
return false;
}
}
... ...