作者 朱兆平

路由消费者接口增加,优化其他业务一些参数

... ... @@ -10,7 +10,7 @@
</parent>
<groupId>com.sunyo.wlpt.message.bus.service</groupId>
<artifactId>message_bus_service</artifactId>
<version>1.1.0-kafka</version>
<version>1.1.2-kafka</version>
<name>message_bus_service</name>
<description>消息总线服务</description>
... ...
... ... @@ -8,6 +8,7 @@ import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAu
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
... ... @@ -23,7 +24,6 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
@EnableTransactionManagement
@EnableScheduling
@EnableAsync
public class MessageBusServiceApplication {
public static void main(String[] args) {
... ...
... ... @@ -4,10 +4,12 @@ import com.sunyo.wlpt.message.bus.service.domain.BusServer;
import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
import javax.annotation.Resource;
import java.util.HashMap;
... ... @@ -17,12 +19,12 @@ import java.util.Properties;
import java.util.stream.Collectors;
@Configuration
public class KafkaInitialConfiguration {
public class KafkaInitialConfiguration extends WebMvcConfigurerAdapter {
@Resource
private BusServerMapper busServerMapper;
@Bean
@Bean("myAdminclient")
public AdminClient adminClient() {
Map<String, Object> configs = new HashMap<>();
String serverMap = ServerListForMap();
... ... @@ -44,8 +46,9 @@ public class KafkaInitialConfiguration {
}
@Bean
@Bean("myConsumer")
public KafkaConsumer<String, String> consumer(){
// ConsumerConfig.MAX_POLL_RECORDS_CONFIG;
Properties props = new Properties();
props.put("bootstrap.servers", ServerListForMap());
props.put("group.id", "systemGroup");
... ...
... ... @@ -8,9 +8,6 @@ import com.sunyo.wlpt.message.bus.service.service.BusQueueService;
import com.sunyo.wlpt.message.bus.service.service.KafkaService;
import com.sunyo.wlpt.message.bus.service.service.UserInfoService;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
... ...
... ... @@ -71,13 +71,21 @@ public class RouterController {
int result=messageRouterService.addRouter(messageRouter);
return result>0?new ResultJson("200","路由消息新增成功"):new ResultJson("500","路由消息新增失败");
}
@PostMapping("/addReciverByRouter")
public ResultJson addReciverByRouter(@RequestBody MessageRouterReciver reciver) {
int result=messageRouterReciverService.addMessageRouterReciver(reciver);
return result>0?new ResultJson("200","路由消息新增成功"):new ResultJson("500","路由消息新增失败");
}
@RequestMapping("/delRouter")
public ResultJson delRouter(@RequestParam(value = "id", required = false) String id)throws Exception{
int result=messageRouterService.delRouter(id);
return result>0?new ResultJson("200","路由消息删除成功"):new ResultJson("500","路由消息删除失败");
}
@PutMapping("/ediRouter")
public ResultJson edi(@RequestBody MessageRouter messageRouter)throws Exception {
public ResultJson edi(@RequestBody MessageRouter messageRouter){
int result=messageRouterService.ediRouter(messageRouter);
return result>0?new ResultJson("200","路由消息修改成功"):new ResultJson("500","路由消息修改失败");
}
... ...
package com.sunyo.wlpt.message.bus.service.controller;
import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.model.MessageRouter;
import com.sunyo.wlpt.message.bus.service.model.MessageRouterReciver;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.MessageRouterReciverService;
import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
@RequestMapping("/bus/router")
@RestController
public class RouterReceiverController {
@Autowired
MessageRouterReciverService messageRouterReciverService;
@ApiOperation(value = "添加路由接收者", notes = "添加路由接收者数组,参数必须要有:路由ID,接收者数组rcvrs")
@PostMapping("/receiver")
public ResultJson batchAddRouter(@RequestBody MessageRouter messageRouter){
boolean result = messageRouterReciverService.batchAddMessageRouterReciver(messageRouter);
return result ? new ResultJson("200","success") :new ResultJson("400","添加失败,没有订阅者或出现异常");
}
@ApiOperation(value = "删除路由接收者接口", notes = "接收路由接收者对象,根据接收者ID进行删除")
@DeleteMapping("/receiver")
public ResultJson del(@RequestBody MessageRouterReciver reciver){
boolean result = messageRouterReciverService.delReciver(reciver);
return result ? new ResultJson("200","success") :new ResultJson("400","添加失败,没有订阅者或出现异常");
}
}
... ...
package com.sunyo.wlpt.message.bus.service.service;
import com.sunyo.wlpt.message.bus.service.model.MessageRouter;
import com.sunyo.wlpt.message.bus.service.model.MessageRouterReciver;
public interface MessageRouterReciverService {
int addMessageRouterReciver(MessageRouterReciver messageRouterReciver)throws Exception;
int addMessageRouterReciver(MessageRouterReciver messageRouterReciver);
boolean delReciver(MessageRouterReciver messageRouterReciver);
boolean batchAddMessageRouterReciver(MessageRouter messageRouter);
}
... ...
... ... @@ -6,7 +6,7 @@ import com.sunyo.wlpt.message.bus.service.model.MessageRouter;
public interface MessageRouterService {
int addRouter(MessageRouter messageRouter)throws Exception;
int delRouter(String id)throws Exception;
int ediRouter(MessageRouter messageRouter)throws Exception;
int ediRouter(MessageRouter messageRouter);
int selectCountRouter(MessageRouter messageRouter);
PageInfo<MessageRouter> queRouterList(MessageRouter messageRouter,int pageNum,int pageSize)throws Exception;
}
... ...
... ... @@ -87,7 +87,7 @@ public class MessageRouterImpl implements MessageRouterService {
}
@Override
public int ediRouter(MessageRouter messageRouter) throws Exception {
public int ediRouter(MessageRouter messageRouter) {
return messageRouterMapper.updateByPrimaryKeySelective(messageRouter);
}
... ...
package com.sunyo.wlpt.message.bus.service.service.impl;
import com.sunyo.wlpt.message.bus.service.mapper.MessageRouterReciverMapper;
import com.sunyo.wlpt.message.bus.service.model.MessageRouter;
import com.sunyo.wlpt.message.bus.service.model.MessageRouterReciver;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.MessageRouterReciverService;
import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@Service
public class MessageRouterReciverImpl implements MessageRouterReciverService {
@Autowired
@Resource
MessageRouterReciverMapper messageRouterReciverMapper;
@Override
public int addMessageRouterReciver(MessageRouterReciver messageRouterReciver) throws Exception {
public int addMessageRouterReciver(MessageRouterReciver messageRouterReciver) {
return messageRouterReciverMapper.insertSelective(messageRouterReciver);
}
@Override
public boolean delReciver(MessageRouterReciver messageRouterReciver) {
int i = messageRouterReciverMapper.deleteByPrimaryKey(messageRouterReciver.getId());
return i>0;
}
@Override
@Transactional(rollbackFor = Exception.class)
public boolean batchAddMessageRouterReciver(MessageRouter messageRouter) {
int i =0;
if (StringUtils.isNotBlank(messageRouter.getId())){
if (messageRouter.getRcvrs()!=null && !messageRouter.getRcvrs().isEmpty()){
// 获取订阅者列表,循环添加
for (String rcvr: messageRouter.getRcvrs()
) {
MessageRouterReciver reciver = new MessageRouterReciver();
reciver.setRcvrTopic(rcvr);
reciver.setMessageRouterId(messageRouter.getId());
reciver.setId(IdUtils.generateId());
i += messageRouterReciverMapper.insertSelective(reciver);
}
}
return i > 0;
}else {
return false;
}
}
}
... ...
... ... @@ -16,6 +16,7 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
... ... @@ -34,10 +35,12 @@ public class KafkaServiceImp implements KafkaService {
private BusServerMapper busServerMapper;
@Autowired
private AdminClient KAFKA_ADMIN_CLIENT ;
@Qualifier("myAdminclient")
AdminClient KAFKA_ADMIN_CLIENT ;
@Autowired
private KafkaConsumer KAFKA_CONSUMER;
@Qualifier("myConsumer")
KafkaConsumer KAFKA_CONSUMER;
@Resource
ConsumerGroupMapper consumerGroupMapper;
... ... @@ -160,7 +163,6 @@ public class KafkaServiceImp implements KafkaService {
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
... ...