ConsumersCache.java
1.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.tianbo.messagebus.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.HashMap;
import java.util.Map;
/**
* 消费者缓存类
*/
@Slf4j
public class ConsumersCache {
public static Map<String, KafkaConsumer<String, String>> consumerMap;
public static Map<String, Boolean> consumerLock;
public static Map<String, KafkaConsumer<String, String>> getConsumerMap() {
if (consumerMap !=null){
return consumerMap;
}
log.trace("初始化消费者缓存");
consumerMap = new HashMap<String, KafkaConsumer<String, String>>();
return consumerMap;
}
public static Map<String, Boolean> getConsumerLock() {
if (consumerMap !=null){
return consumerLock;
}
log.trace("初始化消费者锁缓存");
consumerLock = new HashMap<String, Boolean>();
return consumerLock;
}
public static void lock(String key){
getConsumerLock();
consumerLock.put(key,true);
}
public static void unlock(String key){
getConsumerLock();
consumerLock.put(key,false);
}
public static Boolean getLockState(String key){
getConsumerLock();
return consumerLock.get(key);
}
}