作者 朱兆平

增加清除TOPIC消息

@@ -48,7 +48,7 @@ public class KafkaInitialConfiguration { @@ -48,7 +48,7 @@ public class KafkaInitialConfiguration {
48 public KafkaConsumer<String, String> consumer(){ 48 public KafkaConsumer<String, String> consumer(){
49 Properties props = new Properties(); 49 Properties props = new Properties();
50 props.put("bootstrap.servers", ServerListForMap()); 50 props.put("bootstrap.servers", ServerListForMap());
51 - props.put("group.id", "test"); 51 + props.put("group.id", "systemGroup");
52 props.put("enable.auto.commit", "true"); 52 props.put("enable.auto.commit", "true");
53 props.put("auto.offset.reset", "earliest"); 53 props.put("auto.offset.reset", "earliest");
54 props.put("auto.commit.interval.ms", "1000"); 54 props.put("auto.commit.interval.ms", "1000");
@@ -7,6 +7,7 @@ import com.sunyo.wlpt.message.bus.service.response.ResultJson; @@ -7,6 +7,7 @@ import com.sunyo.wlpt.message.bus.service.response.ResultJson;
7 import com.sunyo.wlpt.message.bus.service.service.BusQueueService; 7 import com.sunyo.wlpt.message.bus.service.service.BusQueueService;
8 import com.sunyo.wlpt.message.bus.service.service.KafkaService; 8 import com.sunyo.wlpt.message.bus.service.service.KafkaService;
9 import com.sunyo.wlpt.message.bus.service.service.UserInfoService; 9 import com.sunyo.wlpt.message.bus.service.service.UserInfoService;
  10 +import org.apache.commons.lang.StringUtils;
10 import org.apache.kafka.clients.admin.AdminClient; 11 import org.apache.kafka.clients.admin.AdminClient;
11 import org.springframework.beans.factory.annotation.Autowired; 12 import org.springframework.beans.factory.annotation.Autowired;
12 import org.springframework.stereotype.Service; 13 import org.springframework.stereotype.Service;
@@ -199,4 +200,12 @@ public class BusQueueController { @@ -199,4 +200,12 @@ public class BusQueueController {
199 return new ResultJson<List<ConsumerGroupOffsets>>("200","success",result); 200 return new ResultJson<List<ConsumerGroupOffsets>>("200","success",result);
200 } 201 }
201 202
  203 + @PostMapping("clean")
  204 + public ResultJson clean(@RequestBody BusQueue busQueue){
  205 + if (StringUtils.isNotBlank(busQueue.getQueueName())){
  206 + boolean result = kafkaService.delTopicPartitionMessage(busQueue.getQueueName());
  207 + return result ? new ResultJson("200","success"):new ResultJson("400","faild");
  208 + }
  209 + return new ResultJson("400","缺少topic信息");
  210 + }
202 } 211 }
@@ -6,9 +6,24 @@ import java.util.List; @@ -6,9 +6,24 @@ import java.util.List;
6 6
7 public interface KafkaService { 7 public interface KafkaService {
8 8
  9 + /**
  10 + * 增加一个TOPIC
  11 + * @param TopicName
  12 + * @param partitionNum
  13 + * @return
  14 + */
9 boolean addTopic(String TopicName,int partitionNum); 15 boolean addTopic(String TopicName,int partitionNum);
10 16
11 public void updateAdminclient(); 17 public void updateAdminclient();
12 18
  19 + /**
  20 + * topic信息消费状况监控监控
  21 + * @return
  22 + */
13 List<ConsumerGroupOffsets> queueMonitor(); 23 List<ConsumerGroupOffsets> queueMonitor();
  24 +
  25 + /**
  26 + * 清空topic里面的消息
  27 + */
  28 + boolean delTopicPartitionMessage(String topicName);
14 } 29 }
@@ -4,16 +4,17 @@ package com.sunyo.wlpt.message.bus.service.service.kafka; @@ -4,16 +4,17 @@ package com.sunyo.wlpt.message.bus.service.service.kafka;
4 import com.sunyo.wlpt.message.bus.service.domain.BusServer; 4 import com.sunyo.wlpt.message.bus.service.domain.BusServer;
5 import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper; 5 import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper;
6 import com.sunyo.wlpt.message.bus.service.mapper.ConsumerGroupMapper; 6 import com.sunyo.wlpt.message.bus.service.mapper.ConsumerGroupMapper;
7 -import com.sunyo.wlpt.message.bus.service.model.ConsumerGroup;  
8 import com.sunyo.wlpt.message.bus.service.model.ConsumerGroupOffsets; 7 import com.sunyo.wlpt.message.bus.service.model.ConsumerGroupOffsets;
9 import com.sunyo.wlpt.message.bus.service.service.KafkaService; 8 import com.sunyo.wlpt.message.bus.service.service.KafkaService;
  9 +import kafka.tools.ConsoleConsumer;
10 import lombok.extern.slf4j.Slf4j; 10 import lombok.extern.slf4j.Slf4j;
11 import org.apache.kafka.clients.admin.*; 11 import org.apache.kafka.clients.admin.*;
  12 +import org.apache.kafka.clients.consumer.Consumer;
12 import org.apache.kafka.clients.consumer.KafkaConsumer; 13 import org.apache.kafka.clients.consumer.KafkaConsumer;
13 import org.apache.kafka.clients.consumer.OffsetAndMetadata; 14 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
14 import org.apache.kafka.common.KafkaFuture; 15 import org.apache.kafka.common.KafkaFuture;
  16 +import org.apache.kafka.common.PartitionInfo;
15 import org.apache.kafka.common.TopicPartition; 17 import org.apache.kafka.common.TopicPartition;
16 -import org.apache.kafka.common.TopicPartitionReplica;  
17 import org.springframework.beans.factory.annotation.Autowired; 18 import org.springframework.beans.factory.annotation.Autowired;
18 import org.springframework.kafka.core.KafkaAdmin; 19 import org.springframework.kafka.core.KafkaAdmin;
19 import org.springframework.stereotype.Service; 20 import org.springframework.stereotype.Service;
@@ -176,4 +177,47 @@ public class KafkaServiceImp implements KafkaService { @@ -176,4 +177,47 @@ public class KafkaServiceImp implements KafkaService {
176 return KAFKA_SERVERS; 177 return KAFKA_SERVERS;
177 } 178 }
178 179
  180 + @Override
  181 + public boolean delTopicPartitionMessage(String topic) {
  182 + Map<TopicPartition, RecordsToDelete> recordsToDeleteMap = new HashMap<TopicPartition, RecordsToDelete>(16);
  183 + DeleteRecordsResult deleteRecordsResult = null;
  184 + //获取topic的partition信息
  185 + Map<Integer, Long> partitionInfoMap = getPartitionsForTopic(topic);
  186 +
  187 + for (Map.Entry<Integer, Long> entry2 : partitionInfoMap.entrySet()) {
  188 + TopicPartition topicPartition = new TopicPartition(topic, (int)entry2.getKey());
  189 + RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset((long)entry2.getValue());
  190 + recordsToDeleteMap.put(topicPartition, recordsToDelete);
  191 + }
  192 + deleteRecordsResult = KAFKA_ADMIN_CLIENT.deleteRecords((Map)recordsToDeleteMap);
  193 + Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = (Map<TopicPartition, KafkaFuture<DeletedRecords>>)deleteRecordsResult.lowWatermarks();
  194 +
  195 + lowWatermarks.entrySet().forEach(entry -> {
  196 + try {
  197 + log.info("删除信息:TOPIC-{},partition-{},lastoffset-{},删除结果-{}", entry.getKey().topic(),
  198 + entry.getKey().partition(),
  199 + ((DeletedRecords)((KafkaFuture)entry.getValue()).get()).lowWatermark(),
  200 + ((KafkaFuture)entry.getValue()).isDone());
  201 + }
  202 + catch (InterruptedException | ExecutionException ex2) {
  203 + log.error(ex2.toString());
  204 + }
  205 + });
  206 +
  207 + return true;
  208 + }
  209 +
  210 + private Map<Integer, Long> getPartitionsForTopic(String topic) {
  211 + Collection<PartitionInfo> partitionInfos = (Collection<PartitionInfo>)KAFKA_CONSUMER.partitionsFor(topic);
  212 + List<TopicPartition> tp = new ArrayList<TopicPartition>();
  213 + List<TopicPartition> list = new ArrayList<>();
  214 + Map<Integer, Long> map = new HashMap<>(16);
  215 + partitionInfos.forEach(str -> {
  216 + list.add(new TopicPartition(topic, str.partition()));
  217 + KAFKA_CONSUMER.assign((Collection)list);
  218 + KAFKA_CONSUMER.seekToEnd((Collection)list);
  219 + map.put(str.partition(), KAFKA_CONSUMER.position(new TopicPartition(topic, str.partition())));
  220 + });
  221 + return map;
  222 + }
179 } 223 }