作者 朱兆平

Merge remote-tracking branch 'origin/kafka' into kafka

... ... @@ -13,6 +13,7 @@ import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import static com.sunyo.wlpt.message.bus.service.common.Constant.EXIST_QUEUE_INFO;
... ... @@ -110,10 +111,11 @@ public class BusQueueController {
* @return {@link ResultJson}
*/
@PutMapping("/update")
public ResultJson updateBusQueue(@RequestBody BusQueue busQueue)
public ResultJson updateBusQueue(@RequestBody BusQueue busQueue)throws ExecutionException, InterruptedException
{
// 先校验队列名称
String message = validateBusQueue(busQueue);
kafkaService.ediPartition(busQueue);
return message == null
? busQueueService.updateByPrimaryKeySelective(busQueue) > 0
? new ResultJson<>("200", "编辑MQ消息队列信息,成功")
... ...
package com.sunyo.wlpt.message.bus.service.service;
import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import com.sunyo.wlpt.message.bus.service.model.ConsumerGroupOffsets;
import java.util.List;
import java.util.concurrent.ExecutionException;
public interface KafkaService {
... ... @@ -15,6 +17,8 @@ public interface KafkaService {
boolean addTopic(String TopicName,int partitionNum);
public void updateAdminclient();
//by xyh
boolean ediPartition(BusQueue record)throws ExecutionException, InterruptedException;
/**
* topic信息消费状况监控监控
... ...
package com.sunyo.wlpt.message.bus.service.service.kafka;
import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import com.sunyo.wlpt.message.bus.service.domain.BusServer;
import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper;
import com.sunyo.wlpt.message.bus.service.mapper.ConsumerGroupMapper;
... ... @@ -86,6 +87,17 @@ public class KafkaServiceImp implements KafkaService {
KafkaAdmin admin = new KafkaAdmin(configs);
KAFKA_ADMIN_CLIENT = AdminClient.create(admin.getConfig());
}
@Override
public boolean ediPartition(BusQueue record) throws ExecutionException, InterruptedException{
Map<String, NewPartitions> newPartitions = new HashMap<>();
// 将MyTopic的Partition数量调整为2
newPartitions.put(record.getQueueName(), NewPartitions.increaseTo(record.getPartitionCount()));
CreatePartitionsResult result = KAFKA_ADMIN_CLIENT.createPartitions(newPartitions);
System.out.println(result.all().get());
return true;
}
/**
* 获取topicList
*/
... ...