作者 朱兆平

将kafka的相关BEAN初始化配置,增加过滤器接口

... ... @@ -29,3 +29,6 @@ build/
### VS Code ###
.vscode/
/out/
/src/META-INF/
/docker/*.jar
... ...
... ... @@ -10,7 +10,7 @@
</parent>
<groupId>com.sunyo.wlpt.message.bus.service</groupId>
<artifactId>message_bus_service</artifactId>
<version>1.0.0</version>
<version>1.1.0-kafka</version>
<name>message_bus_service</name>
<description>消息总线服务</description>
... ... @@ -272,6 +272,23 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>gen-webadmin</id>
<phase>package</phase>
<configuration>
<tasks>
<copy todir="docker" file="target/${project.artifactId}-${project.version}.${project.packaging}" />
</tasks>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- mybatis generator 自动生成代码插件 -->
<plugin>
<groupId>org.mybatis.generator</groupId>
... ...
... ... @@ -4,6 +4,7 @@ import com.sunyo.wlpt.message.bus.service.domain.BusServer;
import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
... ... @@ -12,15 +13,16 @@ import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
//@Configuration
@Configuration
public class KafkaInitialConfiguration {
// @Resource
@Resource
private BusServerMapper busServerMapper;
// @Bean
@Bean
public AdminClient adminClient() {
Map<String, Object> configs = new HashMap<>();
String serverMap = ServerListForMap();
... ... @@ -40,4 +42,20 @@ public class KafkaInitialConfiguration {
return KAFKA_SERVERS;
}
@Bean
public KafkaConsumer<String, String> consumer(){
Properties props = new Properties();
props.put("bootstrap.servers", ServerListForMap());
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
return consumer;
}
}
... ...
... ... @@ -67,25 +67,8 @@ public class RouterController {
if(count>0){
return new ResultJson("200","路由消息已存在!");
}
String routerid=UUID.randomUUID().toString();
String reciverid=UUID.randomUUID().toString();
messageRouter.setId(routerid);
int result=messageRouterService.addRouter(messageRouter);
//没有指定接收者只添加路由
if(messageRouter.getReciver()!=null && messageRouter.getReciver().getRcvrTopic()!=null && messageRouter.getReciver().getRcvrTopic()!=""){
MessageRouterReciver routerReciver=new MessageRouterReciver();
routerReciver.setId(reciverid);
routerReciver.setMessageRouterId(routerid);
routerReciver.setRcvrTopic(messageRouter.getReciver().getRcvrTopic());
messageRouterReciverService.addMessageRouterReciver(routerReciver);
if(messageRouter.getReciver().getReciverFilter()!=null && messageRouter.getReciver().getReciverFilter().getFilter()!=null&& messageRouter.getReciver().getReciverFilter().getFilter()!=""){
MessageRouterReciverFilter routerReciverFilter=new MessageRouterReciverFilter();
routerReciverFilter.setId(UUID.randomUUID().toString());
routerReciverFilter.setMessageRouterReciverId(reciverid);
routerReciverFilter.setFilter(messageRouter.getReciver().getReciverFilter().getFilter());
messageRouterReciverFilterService.addMessageRouterReciverFilter(routerReciverFilter);
}
}
return result>0?new ResultJson("200","路由消息新增成功"):new ResultJson("500","路由消息新增失败");
}
@RequestMapping("/delRouter")
... ...
package com.sunyo.wlpt.message.bus.service.controller;
import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.model.MessageRouter;
import com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.MessageRouterReciverFilterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* 路由接收者过滤器接口
*/
@RequestMapping("/bus")
@RestController
public class RouterRcvrFilterController {
@Autowired
MessageRouterReciverFilterService messageRouterReciverFilterService;
@PostMapping("/filter")
public ResultJson add(@RequestBody MessageRouterReciverFilter messageRouterReciverFilter){
int result=messageRouterReciverFilterService.addMessageRouterReciverFilter(messageRouterReciverFilter);
return result>0?new ResultJson("200","过滤规则新增成功"):new ResultJson("500","过滤规则新增失败");
}
@GetMapping("/filter")
public ResultJson get(@RequestParam(value = "filter", required = false) String filter,
@RequestParam(value = "pageNum",required = false,defaultValue = "1") int pageNum,
@RequestParam(value = "pageSize",required = false,defaultValue = "10") int pageSize){
PageInfo<MessageRouterReciverFilter> list = messageRouterReciverFilterService.get(filter,pageNum,pageSize);
return new ResultJson<>("200","success",list);
}
}
... ...
... ... @@ -15,6 +15,8 @@ public interface MessageRouterReciverFilterMapper {
List<MessageRouterReciverFilter> selectByFilterKey(String id);
List<MessageRouterReciverFilter> selectAllByFilterKey(String record);
int updateByPrimaryKeySelective(MessageRouterReciverFilter record);
int updateByPrimaryKey(MessageRouterReciverFilter record);
... ...
... ... @@ -16,7 +16,9 @@ public class MessageRouterReciver {
private Date creatTime;
private Date updateTime;
private MessageRouterReciverFilter reciverFilter;
private List<MessageRouterReciverFilter> filterList;
public String getId() {
... ...
... ... @@ -3,8 +3,9 @@ package com.sunyo.wlpt.message.bus.service.model;
import lombok.Data;
import java.util.Date;
@Data
public class MessageRouterReciverFilter {
public class MessageRouterReciverFilter{
private String id;
private String filter;
... ...
package com.sunyo.wlpt.message.bus.service.service;
import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter;
public interface MessageRouterReciverFilterService {
int addMessageRouterReciverFilter(MessageRouterReciverFilter messageRouterReciverFilter);
PageInfo<MessageRouterReciverFilter> get(String filter, int pageNum, int pageSize);
}
... ...
... ... @@ -4,20 +4,81 @@ import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.mapper.MessageRouterMapper;
import com.sunyo.wlpt.message.bus.service.model.MessageRouter;
import com.sunyo.wlpt.message.bus.service.model.MessageRouterReciver;
import com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter;
import com.sunyo.wlpt.message.bus.service.service.MessageRouterReciverFilterService;
import com.sunyo.wlpt.message.bus.service.service.MessageRouterReciverService;
import com.sunyo.wlpt.message.bus.service.service.MessageRouterService;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
import java.util.UUID;
@Service
public class MessageRouterImpl implements MessageRouterService {
@Autowired
@Resource
MessageRouterMapper messageRouterMapper;
@Autowired
MessageRouterReciverService messageRouterReciverService;
@Autowired
MessageRouterReciverFilterService messageRouterReciverFilterService;
@Override
@Transactional(rollbackFor = Exception.class)
public int addRouter(MessageRouter messageRouter) throws Exception {
return messageRouterMapper.insertSelective(messageRouter);
String routerid=UUID.randomUUID().toString();
String reciverid=UUID.randomUUID().toString();
/**
* 先添加路由,在根据路由添加路由接收者
*/
messageRouter.setId(routerid);
messageRouterMapper.insertSelective(messageRouter);
/**
* 没有指定接收者只添加路由,单个增加接收者
*/
if(messageRouter.getReciver()!=null && messageRouter.getReciver().getRcvrTopic()!=null && messageRouter.getReciver().getRcvrTopic()!=""){
MessageRouterReciver routerReciver=new MessageRouterReciver();
routerReciver.setId(reciverid);
routerReciver.setMessageRouterId(routerid);
routerReciver.setRcvrTopic(messageRouter.getReciver().getRcvrTopic());
messageRouterReciverService.addMessageRouterReciver(routerReciver);
if(messageRouter.getReciver().getReciverFilter()!=null && messageRouter.getReciver().getReciverFilter().getFilter()!=null&& messageRouter.getReciver().getReciverFilter().getFilter()!=""){
MessageRouterReciverFilter routerReciverFilter=new MessageRouterReciverFilter();
routerReciverFilter.setId(UUID.randomUUID().toString());
routerReciverFilter.setMessageRouterReciverId(reciverid);
routerReciverFilter.setFilter(messageRouter.getReciver().getReciverFilter().getFilter());
messageRouterReciverFilterService.addMessageRouterReciverFilter(routerReciverFilter);
}
}
/**
* 一个路由增加多个接收者
*/
if(messageRouter.getRcvrList()!=null && !messageRouter.getRcvrList().isEmpty()){
for (
MessageRouterReciver routerReciver:messageRouter.getRcvrList()
) {
if (StringUtils.isNotBlank(routerReciver.getRcvrTopic())){
String batchAddRevrId =UUID.randomUUID().toString();
routerReciver.setId(batchAddRevrId);
routerReciver.setMessageRouterId(routerid);
messageRouterReciverService.addMessageRouterReciver(routerReciver);
}
}
}
return 1;
}
@Override
... ...
package com.sunyo.wlpt.message.bus.service.service.impl;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.mapper.MessageRouterReciverFilterMapper;
import com.sunyo.wlpt.message.bus.service.model.MessageRouter;
import com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter;
import com.sunyo.wlpt.message.bus.service.service.MessageRouterReciverFilterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
@Service
public class MessageRouterReciverFilterImpl implements MessageRouterReciverFilterService {
@Autowired
@Resource
MessageRouterReciverFilterMapper messageRouterReciverFilterMapper;
@Override
public int addMessageRouterReciverFilter(MessageRouterReciverFilter messageRouterReciverFilter) {
return messageRouterReciverFilterMapper.insertSelective(messageRouterReciverFilter);
}
@Override
public PageInfo<MessageRouterReciverFilter> get(String filter,int pageNum, int pageSize) {
PageHelper.startPage(pageNum,pageSize);
List<MessageRouterReciverFilter> list=messageRouterReciverFilterMapper.selectAllByFilterKey(filter);
PageInfo<MessageRouterReciverFilter> result=new PageInfo<>(list);
return result;
}
}
... ...
... ... @@ -14,6 +14,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
... ... @@ -31,9 +32,11 @@ public class KafkaServiceImp implements KafkaService {
@Resource
private BusServerMapper busServerMapper;
private static AdminClient KAFKA_ADMIN_CLIENT ;
@Autowired
private AdminClient KAFKA_ADMIN_CLIENT ;
private static KafkaConsumer KAFKA_CONSUMER;
@Autowired
private KafkaConsumer KAFKA_CONSUMER;
@Resource
ConsumerGroupMapper consumerGroupMapper;
... ... @@ -44,7 +47,7 @@ public class KafkaServiceImp implements KafkaService {
@Override
public boolean addTopic(String TopicName,int partitionNum){
intAdminClient();
// intAdminClient();
NewTopic newTopic = new NewTopic(TopicName, partitionNum, (short) 1);
List<NewTopic> topicList = Arrays.asList(newTopic);
KAFKA_ADMIN_CLIENT.createTopics(topicList);
... ... @@ -98,8 +101,8 @@ public class KafkaServiceImp implements KafkaService {
public List<ConsumerGroupOffsets> queueMonitor() {
List<ConsumerGroupOffsets> result = new ArrayList<ConsumerGroupOffsets>();
try{
intAdminClient();
intConsumer();
// intAdminClient();
// intConsumer();
/**
* 1. 获取consumerGroup 列表
... ...
... ... @@ -26,6 +26,17 @@
from message_router_reciver_filter
where message_router_reciver_id = #{messageRouterReciverId,jdbcType=VARCHAR}
</select>
<select id="selectAllByFilterKey" parameterType="java.lang.String" resultMap="BaseResultMap">
select
<include refid="Base_Column_List" />
from message_router_reciver_filter
<where>
<if test="filter!=null and filter!=''">
AND filter = #{filter,jdbcType=VARCHAR}
</if>
</where>
</select>
<delete id="deleteByPrimaryKey" parameterType="java.lang.String">
delete from message_router_reciver_filter
where id = #{id,jdbcType=VARCHAR}
... ...