作者 xudada

Merge remote-tracking branch 'origin/kafka' into kafka

@@ -48,14 +48,12 @@ public class KafkaInitialConfiguration { @@ -48,14 +48,12 @@ public class KafkaInitialConfiguration {
48 public KafkaConsumer<String, String> consumer(){ 48 public KafkaConsumer<String, String> consumer(){
49 Properties props = new Properties(); 49 Properties props = new Properties();
50 props.put("bootstrap.servers", ServerListForMap()); 50 props.put("bootstrap.servers", ServerListForMap());
51 - props.put("group.id", "test"); 51 + props.put("group.id", "systemGroup");
52 props.put("enable.auto.commit", "true"); 52 props.put("enable.auto.commit", "true");
53 props.put("auto.offset.reset", "earliest"); 53 props.put("auto.offset.reset", "earliest");
54 props.put("auto.commit.interval.ms", "1000"); 54 props.put("auto.commit.interval.ms", "1000");
55 - props.put("auto.commit.interval.ms", "1000");  
56 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 55 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
57 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 56 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
58 - KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);  
59 - return consumer; 57 + return new KafkaConsumer<>(props);
60 } 58 }
61 } 59 }
@@ -7,6 +7,7 @@ import com.sunyo.wlpt.message.bus.service.response.ResultJson; @@ -7,6 +7,7 @@ import com.sunyo.wlpt.message.bus.service.response.ResultJson;
7 import com.sunyo.wlpt.message.bus.service.service.BusQueueService; 7 import com.sunyo.wlpt.message.bus.service.service.BusQueueService;
8 import com.sunyo.wlpt.message.bus.service.service.KafkaService; 8 import com.sunyo.wlpt.message.bus.service.service.KafkaService;
9 import com.sunyo.wlpt.message.bus.service.service.UserInfoService; 9 import com.sunyo.wlpt.message.bus.service.service.UserInfoService;
  10 +import org.apache.commons.lang.StringUtils;
10 import org.apache.kafka.clients.admin.AdminClient; 11 import org.apache.kafka.clients.admin.AdminClient;
11 import org.springframework.beans.factory.annotation.Autowired; 12 import org.springframework.beans.factory.annotation.Autowired;
12 import org.springframework.stereotype.Service; 13 import org.springframework.stereotype.Service;
@@ -201,4 +202,12 @@ public class BusQueueController { @@ -201,4 +202,12 @@ public class BusQueueController {
201 return new ResultJson<List<ConsumerGroupOffsets>>("200","success",result); 202 return new ResultJson<List<ConsumerGroupOffsets>>("200","success",result);
202 } 203 }
203 204
  205 + @PostMapping("clean")
  206 + public ResultJson clean(@RequestBody BusQueue busQueue){
  207 + if (StringUtils.isNotBlank(busQueue.getQueueName())){
  208 + boolean result = kafkaService.delTopicPartitionMessage(busQueue.getQueueName());
  209 + return result ? new ResultJson("200","success"):new ResultJson("400","faild");
  210 + }
  211 + return new ResultJson("400","缺少topic信息");
  212 + }
204 } 213 }
@@ -8,11 +8,26 @@ import java.util.concurrent.ExecutionException; @@ -8,11 +8,26 @@ import java.util.concurrent.ExecutionException;
8 8
9 public interface KafkaService { 9 public interface KafkaService {
10 10
  11 + /**
  12 + * 增加一个TOPIC
  13 + * @param TopicName
  14 + * @param partitionNum
  15 + * @return
  16 + */
11 boolean addTopic(String TopicName,int partitionNum); 17 boolean addTopic(String TopicName,int partitionNum);
12 18
13 public void updateAdminclient(); 19 public void updateAdminclient();
14 //by xyh 20 //by xyh
15 boolean ediPartition(BusQueue record)throws ExecutionException, InterruptedException; 21 boolean ediPartition(BusQueue record)throws ExecutionException, InterruptedException;
16 22
  23 + /**
  24 + * topic信息消费状况监控监控
  25 + * @return
  26 + */
17 List<ConsumerGroupOffsets> queueMonitor(); 27 List<ConsumerGroupOffsets> queueMonitor();
  28 +
  29 + /**
  30 + * 清空topic里面的消息
  31 + */
  32 + boolean delTopicPartitionMessage(String topicName);
18 } 33 }
@@ -5,16 +5,17 @@ import com.sunyo.wlpt.message.bus.service.domain.BusQueue; @@ -5,16 +5,17 @@ import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
5 import com.sunyo.wlpt.message.bus.service.domain.BusServer; 5 import com.sunyo.wlpt.message.bus.service.domain.BusServer;
6 import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper; 6 import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper;
7 import com.sunyo.wlpt.message.bus.service.mapper.ConsumerGroupMapper; 7 import com.sunyo.wlpt.message.bus.service.mapper.ConsumerGroupMapper;
8 -import com.sunyo.wlpt.message.bus.service.model.ConsumerGroup;  
9 import com.sunyo.wlpt.message.bus.service.model.ConsumerGroupOffsets; 8 import com.sunyo.wlpt.message.bus.service.model.ConsumerGroupOffsets;
10 import com.sunyo.wlpt.message.bus.service.service.KafkaService; 9 import com.sunyo.wlpt.message.bus.service.service.KafkaService;
  10 +import kafka.tools.ConsoleConsumer;
11 import lombok.extern.slf4j.Slf4j; 11 import lombok.extern.slf4j.Slf4j;
12 import org.apache.kafka.clients.admin.*; 12 import org.apache.kafka.clients.admin.*;
  13 +import org.apache.kafka.clients.consumer.Consumer;
13 import org.apache.kafka.clients.consumer.KafkaConsumer; 14 import org.apache.kafka.clients.consumer.KafkaConsumer;
14 import org.apache.kafka.clients.consumer.OffsetAndMetadata; 15 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
15 import org.apache.kafka.common.KafkaFuture; 16 import org.apache.kafka.common.KafkaFuture;
  17 +import org.apache.kafka.common.PartitionInfo;
16 import org.apache.kafka.common.TopicPartition; 18 import org.apache.kafka.common.TopicPartition;
17 -import org.apache.kafka.common.TopicPartitionReplica;  
18 import org.springframework.beans.factory.annotation.Autowired; 19 import org.springframework.beans.factory.annotation.Autowired;
19 import org.springframework.kafka.core.KafkaAdmin; 20 import org.springframework.kafka.core.KafkaAdmin;
20 import org.springframework.stereotype.Service; 21 import org.springframework.stereotype.Service;
@@ -188,4 +189,47 @@ public class KafkaServiceImp implements KafkaService { @@ -188,4 +189,47 @@ public class KafkaServiceImp implements KafkaService {
188 return KAFKA_SERVERS; 189 return KAFKA_SERVERS;
189 } 190 }
190 191
  192 + @Override
  193 + public boolean delTopicPartitionMessage(String topic) {
  194 + Map<TopicPartition, RecordsToDelete> recordsToDeleteMap = new HashMap<TopicPartition, RecordsToDelete>(16);
  195 + DeleteRecordsResult deleteRecordsResult = null;
  196 + //获取topic的partition信息
  197 + Map<Integer, Long> partitionInfoMap = getPartitionsForTopic(topic);
  198 +
  199 + for (Map.Entry<Integer, Long> entry2 : partitionInfoMap.entrySet()) {
  200 + TopicPartition topicPartition = new TopicPartition(topic, (int)entry2.getKey());
  201 + RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset((long)entry2.getValue());
  202 + recordsToDeleteMap.put(topicPartition, recordsToDelete);
  203 + }
  204 + deleteRecordsResult = KAFKA_ADMIN_CLIENT.deleteRecords((Map)recordsToDeleteMap);
  205 + Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = (Map<TopicPartition, KafkaFuture<DeletedRecords>>)deleteRecordsResult.lowWatermarks();
  206 +
  207 + lowWatermarks.entrySet().forEach(entry -> {
  208 + try {
  209 + log.info("删除信息:TOPIC-{},partition-{},lastoffset-{},删除结果-{}", entry.getKey().topic(),
  210 + entry.getKey().partition(),
  211 + ((DeletedRecords)((KafkaFuture)entry.getValue()).get()).lowWatermark(),
  212 + ((KafkaFuture)entry.getValue()).isDone());
  213 + }
  214 + catch (InterruptedException | ExecutionException ex2) {
  215 + log.error(ex2.toString());
  216 + }
  217 + });
  218 +
  219 + return true;
  220 + }
  221 +
  222 + private Map<Integer, Long> getPartitionsForTopic(String topic) {
  223 + Collection<PartitionInfo> partitionInfos = (Collection<PartitionInfo>)KAFKA_CONSUMER.partitionsFor(topic);
  224 + List<TopicPartition> tp = new ArrayList<TopicPartition>();
  225 + List<TopicPartition> list = new ArrayList<>();
  226 + Map<Integer, Long> map = new HashMap<>(16);
  227 + partitionInfos.forEach(str -> {
  228 + list.add(new TopicPartition(topic, str.partition()));
  229 + KAFKA_CONSUMER.assign((Collection)list);
  230 + KAFKA_CONSUMER.seekToEnd((Collection)list);
  231 + map.put(str.partition(), KAFKA_CONSUMER.position(new TopicPartition(topic, str.partition())));
  232 + });
  233 + return map;
  234 + }
191 } 235 }