KafkaReadProcessor.java
7.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package com.tianbo.messagebus.service;
import com.alibaba.fastjson.JSON;
import com.tianbo.messagebus.config.KafkaConsumerConfig;
import com.tianbo.messagebus.controller.response.ResultJson;
import com.tianbo.messagebus.kafka.ConsumersCache;
import com.tianbo.messagebus.model.MSGS;
import com.tianbo.messagebus.myinterface.KafkaSendApi;
import feign.FeignException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
@Service
@Slf4j
@EnableAsync
public class KafkaReadProcessor {
/**
* 账号名/TOPIC名称
*/
@Value("${message-bus.auth.username}")
private String userName;
@Value("${message-bus.consumer-group-id}")
private String groupName;
@Autowired
KafkaSendApi kafkaSendApi;
@Value("${kafka.bootstrap-servers}")
private String servers;
@Value("${kafka.max-poll-records}")
private String maxPollRecords;
/**
* 失败重发请求次数
*/
private static final int RETRY_TIMES= 10;
/**
* 多线程中consumer锁
*/
@Async
@Scheduled(fixedRate = 3000)
public void msgProcess(){
try{
if (StringUtils.isNotEmpty(userName) && StringUtils.isNotEmpty(groupName)){
log.info("1.【开始】用[{}]组读取topic[{}]->",groupName,userName);
Map<String, Object> map=new KafkaConsumerConfig().consumerConfigs();
map.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,servers);
map.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxPollRecords);
log.info("----2.消费者组为:{}----",groupName);
//针对三个partition创建三个消费者,并缓存
for (int i = 1; i <=3 ; i++) {
KafkaConsumer<String, String> consumer;
String consumerName = userName+"-"+i;
//消费锁判定
Map map_lock = ConsumersCache.getConsumerLock();
if (map_lock.containsKey(consumerName) && ConsumersCache.getLockState(consumerName)){
log.info("[CONSUMER-LOCK-{}] 消费状态为锁定,正在消费",consumerName);
return;
}else {
log.info("[CONSUMER-LOCK-{}] 消费状态为正常,可以消费",consumerName);
ConsumersCache.lock(consumerName);
}
if (ConsumersCache.getConsumerMap().containsKey(consumerName)){
consumer = ConsumersCache.consumerMap.get(consumerName);
log.info("[loop-start]3.从缓存中获取到消费者:{}的消费者信息[{}]。",consumerName,consumer);
}else {
map.put(ConsumerConfig.CLIENT_ID_CONFIG,consumerName);
consumer =new KafkaConsumer<String, String>(map);
ConsumersCache.consumerMap.put(consumerName,consumer);
log.info("[CONSUMER] 3.缓存中没有消费者{}的信息,创建新的消费者信息",consumerName);
}
consumer.subscribe(Arrays.asList(userName));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
log.info("----[CONSUMER] 4.消费者:{}此次成功消费数据{}条----",consumerName,records.count());
if(!records.isEmpty()){
for (ConsumerRecord<String, String> record : records) {
String msg = record.value();
log.info("[loop-start]5.开始处理消息{}",msg);
MSGS msgs = transMsg(msg);
boolean sendResult = sendmsg(msgs);
if(!sendResult){
log.error("!!!!!!消息--->{}<---转发失败!!!!!!,尝试重发",msg);
//todo:消息备份或者重发?
reTrySend(msgs);
}
}
consumer.commitSync();
log.info("[CONSUMER] 消费者{}-{}消费提交成功",consumerName,groupName);
}else {
log.info("----[END]5.消费者的TOPIC没有新的消费数据即将返回----");
}
ConsumersCache.unlock(consumerName);
}
}
}catch (Exception e){
e.printStackTrace();
log.info("转发出错,{}",e.toString());
log.error("转发出错,{}",e.toString());
ConsumersCache.unlock(userName+"-"+1);
ConsumersCache.unlock(userName+"-"+2);
ConsumersCache.unlock(userName+"-"+3);
}
}
public MSGS transMsg(String msg){
MSGS rootJson = JSON.parseObject(msg,MSGS.class);
rootJson.getMSG().getHEADER().setSNDR(userName);
// JSONObject msgJson = rootJson.getJSONObject("MSG");
// JSONObject body = msgJson.getJSONObject("BODY");
//
// HEADER msgHeader = msgJson.getObject("HEADER",HEADER.class);
// msgHeader.setSNDR(userName);
//
// MSG transMsg= new MSG();
// transMsg.setHEADER(msgHeader);
// transMsg.setBODY(body);
//
// MSGS msgs = new MSGS();
// msgs.setMSG(transMsg);
return rootJson;
}
public boolean sendmsg(MSGS msgs){
try {
ResultJson response = kafkaSendApi.send(msgs);
if ("200".equals(response.getCode())){
log.info("[SEND-PRODUCT]………………消息发送成功{}………………",response.toString());
return true;
}
log.info("[SEND-PRODUCT]400-消息发送失败->{}",response.toString());
}catch (FeignException ex){
log.error("[SEND-PRODUCT] 发送服务调用失败-->>{}",ex.toString());
}
return false;
}
/**
* feign重发消息
*/
public boolean reTrySend(MSGS msgs) throws InterruptedException {
log.error("***进入重发***");
boolean flag = false;
int i = 0;
while (true){
Thread.sleep(1000);
i++;
boolean sendResult = sendmsg(msgs);
if (sendResult){
log.error("[RESEND-PRODUCT]***重发成功,重发次数({})***",i);
log.info("[RESEND-PRODUCT]***重发成功,重发次数({})***",i);
flag = true;
break;
}
}
return flag;
}
}