package com.tianbo.messagebus.config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.bootstrap-servers}") private String servers; // @Value("${kafka.producer.enable-auto-commit}") // private boolean enableAutoCommit; // @Value("${kafka.consumer.session-timeout}") // private String sessionTimeout; // @Value("${kafka.producer.auto-commit-interval}") // private String autoCommitInterval; //@Value("${kafka.producer.auto-offset-reset}") // private String autoOffsetReset; // @Value("${kafka.consumer.concurrency}") // private int concurrency; //private static Map<String, Object> propsMap; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(10); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(1500); //配置手动提交offset return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { //if(propsMap==null){ Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); propsMap.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,1048576*5); propsMap.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,60000); propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,50); propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,400); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 60); propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120*1000); //} return propsMap; } // @Bean // public Listener listener() { // return new Listener(); // } }