作者 xudada

优化partition

@@ -15,6 +15,7 @@ import org.springframework.web.bind.annotation.*; @@ -15,6 +15,7 @@ import org.springframework.web.bind.annotation.*;
15 import javax.annotation.Resource; 15 import javax.annotation.Resource;
16 import java.io.IOException; 16 import java.io.IOException;
17 import java.util.List; 17 import java.util.List;
  18 +import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.TimeoutException; 19 import java.util.concurrent.TimeoutException;
19 20
20 import static com.sunyo.wlpt.message.bus.service.common.Constant.EXIST_QUEUE_INFO; 21 import static com.sunyo.wlpt.message.bus.service.common.Constant.EXIST_QUEUE_INFO;
@@ -112,10 +113,11 @@ public class BusQueueController { @@ -112,10 +113,11 @@ public class BusQueueController {
112 * @return {@link ResultJson} 113 * @return {@link ResultJson}
113 */ 114 */
114 @PutMapping("/update") 115 @PutMapping("/update")
115 - public ResultJson updateBusQueue(@RequestBody BusQueue busQueue) 116 + public ResultJson updateBusQueue(@RequestBody BusQueue busQueue)throws ExecutionException, InterruptedException
116 { 117 {
117 // 先校验队列名称 118 // 先校验队列名称
118 String message = validateBusQueue(busQueue); 119 String message = validateBusQueue(busQueue);
  120 + kafkaService.ediPartition(busQueue);
119 return message == null 121 return message == null
120 ? busQueueService.updateByPrimaryKeySelective(busQueue) > 0 122 ? busQueueService.updateByPrimaryKeySelective(busQueue) > 0
121 ? new ResultJson<>("200", "编辑MQ消息队列信息,成功") 123 ? new ResultJson<>("200", "编辑MQ消息队列信息,成功")
1 package com.sunyo.wlpt.message.bus.service.service; 1 package com.sunyo.wlpt.message.bus.service.service;
2 2
  3 +import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
3 import com.sunyo.wlpt.message.bus.service.model.ConsumerGroupOffsets; 4 import com.sunyo.wlpt.message.bus.service.model.ConsumerGroupOffsets;
4 5
5 import java.util.List; 6 import java.util.List;
  7 +import java.util.concurrent.ExecutionException;
6 8
7 public interface KafkaService { 9 public interface KafkaService {
8 10
9 boolean addTopic(String TopicName,int partitionNum); 11 boolean addTopic(String TopicName,int partitionNum);
10 12
11 public void updateAdminclient(); 13 public void updateAdminclient();
  14 + //by xyh
  15 + boolean ediPartition(BusQueue record)throws ExecutionException, InterruptedException;
12 16
13 List<ConsumerGroupOffsets> queueMonitor(); 17 List<ConsumerGroupOffsets> queueMonitor();
14 } 18 }
1 package com.sunyo.wlpt.message.bus.service.service.kafka; 1 package com.sunyo.wlpt.message.bus.service.service.kafka;
2 2
3 3
  4 +import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
4 import com.sunyo.wlpt.message.bus.service.domain.BusServer; 5 import com.sunyo.wlpt.message.bus.service.domain.BusServer;
5 import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper; 6 import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper;
6 import com.sunyo.wlpt.message.bus.service.mapper.ConsumerGroupMapper; 7 import com.sunyo.wlpt.message.bus.service.mapper.ConsumerGroupMapper;
@@ -82,6 +83,17 @@ public class KafkaServiceImp implements KafkaService { @@ -82,6 +83,17 @@ public class KafkaServiceImp implements KafkaService {
82 KafkaAdmin admin = new KafkaAdmin(configs); 83 KafkaAdmin admin = new KafkaAdmin(configs);
83 KAFKA_ADMIN_CLIENT = AdminClient.create(admin.getConfig()); 84 KAFKA_ADMIN_CLIENT = AdminClient.create(admin.getConfig());
84 } 85 }
  86 +
  87 + @Override
  88 + public boolean ediPartition(BusQueue record) throws ExecutionException, InterruptedException{
  89 + Map<String, NewPartitions> newPartitions = new HashMap<>();
  90 + // 将MyTopic的Partition数量调整为2
  91 + newPartitions.put(record.getQueueName(), NewPartitions.increaseTo(record.getPartitionCount()));
  92 + CreatePartitionsResult result = KAFKA_ADMIN_CLIENT.createPartitions(newPartitions);
  93 + System.out.println(result.all().get());
  94 + return true;
  95 + }
  96 +
85 /** 97 /**
86 * 获取topicList 98 * 获取topicList
87 */ 99 */