KafkaConsumerConfig.java
3.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.tianbo.messagebus.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String servers;
// @Value("${kafka.producer.enable-auto-commit}")
// private boolean enableAutoCommit;
// @Value("${kafka.consumer.session-timeout}")
// private String sessionTimeout;
// @Value("${kafka.producer.auto-commit-interval}")
// private String autoCommitInterval;
//@Value("${kafka.producer.auto-offset-reset}")
// private String autoOffsetReset;
// @Value("${kafka.consumer.concurrency}")
// private int concurrency;
//private static Map<String, Object> propsMap;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(10);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1500);
//配置手动提交offset
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
//if(propsMap==null){
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
propsMap.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,1048576*5);
propsMap.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,60000);
propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,50);
propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,400);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 60);
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120*1000);
//}
return propsMap;
}
// @Bean
// public Listener listener() {
// return new Listener();
// }
}