作者 朱兆平

kafka消息总线路由管理消息联查

... ... @@ -6,6 +6,7 @@ import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import com.sunyo.wlpt.message.bus.service.model.MessageType;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.MessageTypeService;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;
... ... @@ -36,4 +37,12 @@ public class MessageTypesController {
PageInfo<MessageType> list = messageTypeService.pageList(messageType);
return new ResultJson("200","success",list);
}
@ApiOperation(value = "添加一个消息类", notes = "类名与消息类型必填")
@PostMapping("add")
public ResultJson add(@RequestBody MessageType messageType){
boolean result = messageTypeService.add(messageType);
return result ? new ResultJson("200","success") : new ResultJson("400","error");
}
}
... ...
... ... @@ -2,7 +2,10 @@ package com.sunyo.wlpt.message.bus.service.controller;
import com.sunyo.wlpt.message.bus.service.model.MessageRouter;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.RouterService;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
... ... @@ -10,16 +13,18 @@ import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RequestMapping("router")
@RequestMapping("/bus/router")
@RestController
public class RouterController {
@ApiOperation(value = "批量添加消息路由", notes = "超级管理修改其他用户密码")
@PostMapping("batchAdd")
public void batchAddRouter(@RequestBody MessageRouter messageRouter){
@Autowired
RouterService routerService;
@ApiOperation(value = "批量添加消息路由", notes = "超级管理修改其他用户密码")
@PostMapping("batchAdd")
public ResultJson batchAddRouter(@RequestBody MessageRouter messageRouter){
boolean result = routerService.batchAddRouter(messageRouter);
return result ? new ResultJson("200","success") :new ResultJson("400","error");
}
}
... ...
... ... @@ -61,6 +61,8 @@ public interface BusQueueMapper {
*/
List<BusQueue> selectByVirtualHostId(String virtualHostId);
List<BusQueue> selectTopicByUsername(String virtualHostId);
/**
* update record selective
*
... ...
... ... @@ -2,6 +2,8 @@ package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.model.MessageRouterReciver;
import java.util.List;
public interface MessageRouterReciverMapper {
int deleteByPrimaryKey(String id);
... ... @@ -11,6 +13,8 @@ public interface MessageRouterReciverMapper {
MessageRouterReciver selectByPrimaryKey(String id);
List<MessageRouterReciver> selectByRouterKey(String id);
int updateByPrimaryKeySelective(MessageRouterReciver record);
int updateByPrimaryKey(MessageRouterReciver record);
... ...
... ... @@ -2,6 +2,8 @@ package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.model.UserTopic;
import java.util.List;
public interface UserTopicMapper {
int deleteByPrimaryKey(String id);
... ...
... ... @@ -30,6 +30,8 @@ public class MessageRouter {
private Date updateTime;
private List<MessageRouterReciver> rcvrList;
private List<String> sndrs;
private List<MessageType> types;
... ... @@ -163,4 +165,12 @@ public class MessageRouter {
public void setRcvrs(List<String> rcvrs) {
this.rcvrs = rcvrs;
}
public List<MessageRouterReciver> getRcvrList() {
return rcvrList;
}
public void setRcvrList(List<MessageRouterReciver> rcvrList) {
this.rcvrList = rcvrList;
}
}
\ No newline at end of file
... ...
... ... @@ -12,4 +12,6 @@ public interface MessageTypeService {
List<MessageType> list(MessageType messageType);
boolean add(MessageType messageType);
}
... ...
... ... @@ -6,6 +6,7 @@ import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import com.sunyo.wlpt.message.bus.service.mapper.MessageTypeMapper;
import com.sunyo.wlpt.message.bus.service.model.MessageType;
import com.sunyo.wlpt.message.bus.service.service.MessageTypeService;
import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
... ... @@ -30,4 +31,12 @@ public class MessageTypeServiceImp implements MessageTypeService {
PageInfo<MessageType> pageInfo = new PageInfo<>(list);
return pageInfo;
}
@Override
public boolean add(MessageType messageType) {
String id = IdUtils.generateId();
messageType.setId(id);
int i = messageTypeMapper.insertSelective(messageType);
return i > 0;
}
}
... ...
package com.sunyo.wlpt.message.bus.service.service.impl;
import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import com.sunyo.wlpt.message.bus.service.mapper.BusQueueMapper;
import com.sunyo.wlpt.message.bus.service.mapper.MessageRouterMapper;
import com.sunyo.wlpt.message.bus.service.mapper.MessageRouterReciverMapper;
import com.sunyo.wlpt.message.bus.service.mapper.UserTopicMapper;
import com.sunyo.wlpt.message.bus.service.model.MessageRouter;
import com.sunyo.wlpt.message.bus.service.model.MessageRouterReciver;
import com.sunyo.wlpt.message.bus.service.model.MessageType;
import com.sunyo.wlpt.message.bus.service.model.UserTopic;
import com.sunyo.wlpt.message.bus.service.service.RouterService;
import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.List;
@Service
public class RouterServiceImp implements RouterService {
@Resource
MessageRouterMapper messageRouterMapper;
@Resource
MessageRouterReciverMapper routerReciverMapper;
@Resource
BusQueueMapper busQueueMapper;
@Transactional(rollbackFor = Exception.class)
@Override
public boolean batchAddRouter(MessageRouter record) {
if (record.getSndrs()!=null && record.getTypes()!=null && record.getRcvrs()!=null){
List<String> senders = record.getSndrs();
if (senders!=null && !senders.isEmpty()){
/**
* 批量增加发送者表
*/
for (String sndr: senders) {
String id = IdUtils.generateId();
MessageRouter router = new MessageRouter();
router.setId(id);
router.setSndr(sndr);
router.setDes(record.getDes());
router.setVer(record.getVer());
router.setStatus(record.getStatus());
router.setUsage(record.getUsage());
List<String> senders = record.getSndrs();
if (senders!=null && !senders.isEmpty()){
/**
* 批量增加接收者表
* 批量增加发送者表
*/
List<String> receivers = record.getRcvrs();
if (receivers!=null && !receivers.isEmpty()){
for (String rcvr :receivers) {
MessageRouterReciver messageRouterReciver = new MessageRouterReciver();
messageRouterReciver.setId(id);
messageRouterReciver.setMessageRouterId(id);
messageRouterReciver.setRcvrTopic(rcvr);
for (String sndr: senders) {
String id = IdUtils.generateId();
MessageRouter router = new MessageRouter();
router.setId(id);
router.setSndr(sndr);
router.setDes(record.getDes());
router.setVer(record.getVer());
router.setStatus(record.getStatus());
router.setUsage(record.getUsage());
/**
* 消息类型处理
*/
List<MessageType> messageTypes = record.getTypes();
if (messageTypes!=null && !messageTypes.isEmpty()){
for (MessageType type: messageTypes) {
/**
* 判断消息类型是 大类 主类还是子类
*/
if (type.getType() == 2){
//大类处理
router.setBtype(type.getName());
}
if (type.getType() == 3){
//二级类处理
router.setStype(type.getName());
}
}
}
}
/**
* 路由生产者消息入库
*/
messageRouterMapper.insertSelective(router);
/**
* 批量增加接收者表
* 这里得receiver列表指的是接收者账号
* 要通过账号找到 账号所对应的队列信息 再入库
*/
List<String> receivers = record.getRcvrs();
if (receivers!=null && !receivers.isEmpty()){
for (String rcvr :receivers) {
String rcvr_id = IdUtils.generateId();
MessageRouterReciver messageRouterReciver = new MessageRouterReciver();
messageRouterReciver.setId(rcvr_id);
messageRouterReciver.setMessageRouterId(id);
/**
* 寻找账号对应的队列名称
*/
List<BusQueue> busQueues = busQueueMapper.selectTopicByUsername(rcvr);
messageRouterReciver.setRcvrTopic(busQueues.get(0).getQueueName());
/**
* 插入数据到订阅者表
*/
routerReciverMapper.insertSelective(messageRouterReciver);
}
}
}
}
return true;
}else {
return false;
}
return false;
}
}
... ...
... ... @@ -42,6 +42,32 @@
where id = #{id,jdbcType=VARCHAR}
</select>
<select id="selectTopicByUsername" resultMap="BaseResultMap" parameterType="java.lang.String" >
select
bq.id,
bq.queue_name,
bq.user_id,
bq.username,
bq.virtual_host_id,
bq.durability,
bq.auto_delete,
bq.arguments,
bq.description,
bq.gmt_create,
bq.gmt_modified,
bq.queue_type,
bq.partition_count,
bq.server_type
from user_topic ut
LEFT JOIN bus_queue bq
on
ut.bus_queue_id = bq.id
where
bq.server_type = 1
and bq.queue_type = 1
and ut.username = #{username,jdbcType=VARCHAR}
</select>
<select id="selectByVirtualHostId" parameterType="java.lang.String" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
... ...
... ... @@ -15,9 +15,10 @@
<result column="ver" property="ver" jdbcType="VARCHAR" />
<result column="creat_time" property="creatTime" jdbcType="TIMESTAMP" />
<result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
<collection column="id" javaType="java.util.ArrayList" ofType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciver" property="rcvrList" select="com.sunyo.wlpt.message.bus.service.mapper.selectByRouterKey" />
</resultMap>
<sql id="Base_Column_List" >
id, sndr, btype, stype, optype, msg_limit, character, status, usage, des, ver, creat_time,
id, sndr, btype, stype, optype, msg_limit, `character`, status, `usage`, des, ver, creat_time,
update_time
</sql>
<select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
... ... @@ -33,7 +34,7 @@
<insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouter" >
insert into message_router (id, sndr, btype,
stype, optype, msg_limit,
character, status, usage, des,
`character`, status, `usage`, des,
ver, creat_time, update_time
)
values (#{id,jdbcType=VARCHAR}, #{sndr,jdbcType=VARCHAR}, #{btype,jdbcType=VARCHAR},
... ... @@ -64,13 +65,13 @@
msg_limit,
</if>
<if test="character != null" >
character,
`character`,
</if>
<if test="status != null" >
status,
</if>
<if test="usage != null" >
usage,
`usage`,
</if>
<if test="des != null" >
des,
... ... @@ -146,13 +147,13 @@
msg_limit = #{msgLimit,jdbcType=TINYINT},
</if>
<if test="character != null" >
character = #{character,jdbcType=VARCHAR},
`character` = #{character,jdbcType=VARCHAR},
</if>
<if test="status != null" >
status = #{status,jdbcType=BIT},
</if>
<if test="usage != null" >
usage = #{usage,jdbcType=TINYINT},
`usage` = #{usage,jdbcType=TINYINT},
</if>
<if test="des != null" >
des = #{des,jdbcType=VARCHAR},
... ... @@ -176,9 +177,9 @@
stype = #{stype,jdbcType=VARCHAR},
optype = #{optype,jdbcType=VARCHAR},
msg_limit = #{msgLimit,jdbcType=TINYINT},
character = #{character,jdbcType=VARCHAR},
`character` = #{character,jdbcType=VARCHAR},
status = #{status,jdbcType=BIT},
usage = #{usage,jdbcType=TINYINT},
`usage` = #{usage,jdbcType=TINYINT},
des = #{des,jdbcType=VARCHAR},
ver = #{ver,jdbcType=VARCHAR},
creat_time = #{creatTime,jdbcType=TIMESTAMP},
... ...
... ... @@ -17,6 +17,12 @@
from message_router_reciver
where id = #{id,jdbcType=VARCHAR}
</select>
<select id="selectByRouterKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
select
<include refid="Base_Column_List" />
from message_router_reciver
where message_router_id = #{messageRouterId,jdbcType=VARCHAR}
</select>
<delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
delete from message_router_reciver
where id = #{id,jdbcType=VARCHAR}
... ...