作者 朱兆平

kafka消息总线路由管理

1 package com.sunyo.wlpt.message.bus.service.controller; 1 package com.sunyo.wlpt.message.bus.service.controller;
2 2
3 3
  4 +import com.sunyo.wlpt.message.bus.service.model.MessageRouter;
4 import io.swagger.annotations.ApiOperation; 5 import io.swagger.annotations.ApiOperation;
5 import org.springframework.web.bind.annotation.PostMapping; 6 import org.springframework.web.bind.annotation.PostMapping;
  7 +import org.springframework.web.bind.annotation.RequestBody;
6 import org.springframework.web.bind.annotation.RequestMapping; 8 import org.springframework.web.bind.annotation.RequestMapping;
7 import org.springframework.web.bind.annotation.RestController; 9 import org.springframework.web.bind.annotation.RestController;
8 10
  11 +import java.util.List;
  12 +
9 @RequestMapping("router") 13 @RequestMapping("router")
10 @RestController 14 @RestController
11 public class RouterController { 15 public class RouterController {
12 16
13 @ApiOperation(value = "批量添加消息路由", notes = "超级管理修改其他用户密码") 17 @ApiOperation(value = "批量添加消息路由", notes = "超级管理修改其他用户密码")
14 - @PostMapping  
15 - public void batchAddRouter(){ 18 + @PostMapping("batchAdd")
  19 + public void batchAddRouter(@RequestBody MessageRouter messageRouter){
  20 +
16 21
17 22
18 23
1 package com.sunyo.wlpt.message.bus.service.model; 1 package com.sunyo.wlpt.message.bus.service.model;
2 2
3 import java.util.Date; 3 import java.util.Date;
  4 +import java.util.List;
4 5
5 public class MessageRouter { 6 public class MessageRouter {
6 private String id; 7 private String id;
@@ -19,7 +20,7 @@ public class MessageRouter { @@ -19,7 +20,7 @@ public class MessageRouter {
19 20
20 private Boolean status; 21 private Boolean status;
21 22
22 - private Boolean usage; 23 + private Integer usage;
23 24
24 private String des; 25 private String des;
25 26
@@ -29,6 +30,12 @@ public class MessageRouter { @@ -29,6 +30,12 @@ public class MessageRouter {
29 30
30 private Date updateTime; 31 private Date updateTime;
31 32
  33 + private List<String> sndrs;
  34 +
  35 + private List<MessageType> types;
  36 +
  37 + private List<String> rcvrs;
  38 +
32 public String getId() { 39 public String getId() {
33 return id; 40 return id;
34 } 41 }
@@ -93,11 +100,11 @@ public class MessageRouter { @@ -93,11 +100,11 @@ public class MessageRouter {
93 this.status = status; 100 this.status = status;
94 } 101 }
95 102
96 - public Boolean getUsage() { 103 + public Integer getUsage() {
97 return usage; 104 return usage;
98 } 105 }
99 106
100 - public void setUsage(Boolean usage) { 107 + public void setUsage(Integer usage) {
101 this.usage = usage; 108 this.usage = usage;
102 } 109 }
103 110
@@ -132,4 +139,28 @@ public class MessageRouter { @@ -132,4 +139,28 @@ public class MessageRouter {
132 public void setUpdateTime(Date updateTime) { 139 public void setUpdateTime(Date updateTime) {
133 this.updateTime = updateTime; 140 this.updateTime = updateTime;
134 } 141 }
  142 +
  143 + public List<String> getSndrs() {
  144 + return sndrs;
  145 + }
  146 +
  147 + public void setSndrs(List<String> sndrs) {
  148 + this.sndrs = sndrs;
  149 + }
  150 +
  151 + public List<MessageType> getTypes() {
  152 + return types;
  153 + }
  154 +
  155 + public void setTypes(List<MessageType> types) {
  156 + this.types = types;
  157 + }
  158 +
  159 + public List<String> getRcvrs() {
  160 + return rcvrs;
  161 + }
  162 +
  163 + public void setRcvrs(List<String> rcvrs) {
  164 + this.rcvrs = rcvrs;
  165 + }
135 } 166 }
  1 +package com.sunyo.wlpt.message.bus.service.service;
  2 +
  3 +import com.sunyo.wlpt.message.bus.service.model.MessageRouter;
  4 +
  5 +public interface RouterService {
  6 +
  7 + boolean batchAddRouter(MessageRouter messageRouter);
  8 +}
  1 +package com.sunyo.wlpt.message.bus.service.service.impl;
  2 +
  3 +import com.sunyo.wlpt.message.bus.service.model.MessageRouter;
  4 +import com.sunyo.wlpt.message.bus.service.model.MessageRouterReciver;
  5 +import com.sunyo.wlpt.message.bus.service.service.RouterService;
  6 +import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
  7 +
  8 +import java.util.List;
  9 +
  10 +public class RouterServiceImp implements RouterService {
  11 +
  12 +
  13 + @Override
  14 + public boolean batchAddRouter(MessageRouter record) {
  15 +
  16 + List<String> senders = record.getSndrs();
  17 + if (senders!=null && !senders.isEmpty()){
  18 + /**
  19 + * 批量增加发送者表
  20 + */
  21 + for (String sndr: senders) {
  22 + String id = IdUtils.generateId();
  23 + MessageRouter router = new MessageRouter();
  24 + router.setId(id);
  25 + router.setSndr(sndr);
  26 + router.setDes(record.getDes());
  27 + router.setVer(record.getVer());
  28 + router.setStatus(record.getStatus());
  29 + router.setUsage(record.getUsage());
  30 + /**
  31 + * 批量增加接收者表
  32 + */
  33 + List<String> receivers = record.getRcvrs();
  34 + if (receivers!=null && !receivers.isEmpty()){
  35 + for (String rcvr :receivers) {
  36 + MessageRouterReciver messageRouterReciver = new MessageRouterReciver();
  37 + messageRouterReciver.setId(id);
  38 + messageRouterReciver.setMessageRouterId(id);
  39 + messageRouterReciver.setRcvrTopic(rcvr);
  40 + }
  41 + }
  42 +
  43 +
  44 + }
  45 + }
  46 + return false;
  47 + }
  48 +}
@@ -10,7 +10,7 @@ @@ -10,7 +10,7 @@
10 <result column="msg_limit" property="msgLimit" jdbcType="TINYINT" /> 10 <result column="msg_limit" property="msgLimit" jdbcType="TINYINT" />
11 <result column="character" property="character" jdbcType="VARCHAR" /> 11 <result column="character" property="character" jdbcType="VARCHAR" />
12 <result column="status" property="status" jdbcType="BIT" /> 12 <result column="status" property="status" jdbcType="BIT" />
13 - <result column="usage" property="usage" jdbcType="BIT" /> 13 + <result column="usage" property="usage" jdbcType="TINYINT" />
14 <result column="des" property="des" jdbcType="VARCHAR" /> 14 <result column="des" property="des" jdbcType="VARCHAR" />
15 <result column="ver" property="ver" jdbcType="VARCHAR" /> 15 <result column="ver" property="ver" jdbcType="VARCHAR" />
16 <result column="creat_time" property="creatTime" jdbcType="TIMESTAMP" /> 16 <result column="creat_time" property="creatTime" jdbcType="TIMESTAMP" />
@@ -38,7 +38,7 @@ @@ -38,7 +38,7 @@
38 ) 38 )
39 values (#{id,jdbcType=VARCHAR}, #{sndr,jdbcType=VARCHAR}, #{btype,jdbcType=VARCHAR}, 39 values (#{id,jdbcType=VARCHAR}, #{sndr,jdbcType=VARCHAR}, #{btype,jdbcType=VARCHAR},
40 #{stype,jdbcType=VARCHAR}, #{optype,jdbcType=VARCHAR}, #{msgLimit,jdbcType=TINYINT}, 40 #{stype,jdbcType=VARCHAR}, #{optype,jdbcType=VARCHAR}, #{msgLimit,jdbcType=TINYINT},
41 - #{character,jdbcType=VARCHAR}, #{status,jdbcType=BIT}, #{usage,jdbcType=BIT}, #{des,jdbcType=VARCHAR}, 41 + #{character,jdbcType=VARCHAR}, #{status,jdbcType=BIT}, #{usage,jdbcType=TINYINT}, #{des,jdbcType=VARCHAR},
42 #{ver,jdbcType=VARCHAR}, #{creatTime,jdbcType=TIMESTAMP}, #{updateTime,jdbcType=TIMESTAMP} 42 #{ver,jdbcType=VARCHAR}, #{creatTime,jdbcType=TIMESTAMP}, #{updateTime,jdbcType=TIMESTAMP}
43 ) 43 )
44 </insert> 44 </insert>
@@ -111,7 +111,7 @@ @@ -111,7 +111,7 @@
111 #{status,jdbcType=BIT}, 111 #{status,jdbcType=BIT},
112 </if> 112 </if>
113 <if test="usage != null" > 113 <if test="usage != null" >
114 - #{usage,jdbcType=BIT}, 114 + #{usage,jdbcType=TINYINT},
115 </if> 115 </if>
116 <if test="des != null" > 116 <if test="des != null" >
117 #{des,jdbcType=VARCHAR}, 117 #{des,jdbcType=VARCHAR},
@@ -152,7 +152,7 @@ @@ -152,7 +152,7 @@
152 status = #{status,jdbcType=BIT}, 152 status = #{status,jdbcType=BIT},
153 </if> 153 </if>
154 <if test="usage != null" > 154 <if test="usage != null" >
155 - usage = #{usage,jdbcType=BIT}, 155 + usage = #{usage,jdbcType=TINYINT},
156 </if> 156 </if>
157 <if test="des != null" > 157 <if test="des != null" >
158 des = #{des,jdbcType=VARCHAR}, 158 des = #{des,jdbcType=VARCHAR},
@@ -178,7 +178,7 @@ @@ -178,7 +178,7 @@
178 msg_limit = #{msgLimit,jdbcType=TINYINT}, 178 msg_limit = #{msgLimit,jdbcType=TINYINT},
179 character = #{character,jdbcType=VARCHAR}, 179 character = #{character,jdbcType=VARCHAR},
180 status = #{status,jdbcType=BIT}, 180 status = #{status,jdbcType=BIT},
181 - usage = #{usage,jdbcType=BIT}, 181 + usage = #{usage,jdbcType=TINYINT},
182 des = #{des,jdbcType=VARCHAR}, 182 des = #{des,jdbcType=VARCHAR},
183 ver = #{ver,jdbcType=VARCHAR}, 183 ver = #{ver,jdbcType=VARCHAR},
184 creat_time = #{creatTime,jdbcType=TIMESTAMP}, 184 creat_time = #{creatTime,jdbcType=TIMESTAMP},
1 -<?xml version="1.0" encoding="UTF-8" ?>  
2 -<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >  
3 -<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.MessageRouterReciverFilterMapper" >  
4 - <resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter" >  
5 - <id column="id" property="id" jdbcType="VARCHAR" />  
6 - <result column="filter" property="filter" jdbcType="VARCHAR" />  
7 - <result column="filter_value" property="filterValue" jdbcType="VARCHAR" />  
8 - <result column="type" property="type" jdbcType="VARCHAR" />  
9 - <result column="status" property="status" jdbcType="BIT" />  
10 - <result column="message_router_reciver_id" property="messageRouterReciverId" jdbcType="VARCHAR" />  
11 - <result column="creat_time" property="creatTime" jdbcType="TIMESTAMP" />  
12 - <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" /> 1 +<?xml version="1.0" encoding="UTF-8"?>
  2 +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
  3 +<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.MessageRouterReciverFilterMapper">
  4 + <resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter">
  5 + <id column="id" jdbcType="VARCHAR" property="id" />
  6 + <result column="filter" jdbcType="VARCHAR" property="filter" />
  7 + <result column="filter_value" jdbcType="VARCHAR" property="filterValue" />
  8 + <result column="type" jdbcType="VARCHAR" property="type" />
  9 + <result column="status" jdbcType="BIT" property="status" />
  10 + <result column="message_router_reciver_id" jdbcType="VARCHAR" property="messageRouterReciverId" />
  11 + <result column="creat_time" jdbcType="TIMESTAMP" property="creatTime" />
  12 + <result column="update_time" jdbcType="TIMESTAMP" property="updateTime" />
13 </resultMap> 13 </resultMap>
14 - <sql id="Base_Column_List" > 14 + <sql id="Base_Column_List">
15 id, filter, filter_value, type, status, message_router_reciver_id, creat_time, update_time 15 id, filter, filter_value, type, status, message_router_reciver_id, creat_time, update_time
16 </sql> 16 </sql>
17 - <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" > 17 + <select id="selectByPrimaryKey" parameterType="java.lang.String" resultMap="BaseResultMap">
18 select 18 select
19 <include refid="Base_Column_List" /> 19 <include refid="Base_Column_List" />
20 from message_router_reciver_filter 20 from message_router_reciver_filter
21 where id = #{id,jdbcType=VARCHAR} 21 where id = #{id,jdbcType=VARCHAR}
22 </select> 22 </select>
23 - <delete id="deleteByPrimaryKey" parameterType="java.lang.String" > 23 + <delete id="deleteByPrimaryKey" parameterType="java.lang.String">
24 delete from message_router_reciver_filter 24 delete from message_router_reciver_filter
25 where id = #{id,jdbcType=VARCHAR} 25 where id = #{id,jdbcType=VARCHAR}
26 </delete> 26 </delete>
27 - <insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter" > 27 + <insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter">
28 insert into message_router_reciver_filter (id, filter, filter_value, 28 insert into message_router_reciver_filter (id, filter, filter_value,
29 type, status, message_router_reciver_id, 29 type, status, message_router_reciver_id,
30 creat_time, update_time) 30 creat_time, update_time)
@@ -32,89 +32,89 @@ @@ -32,89 +32,89 @@
32 #{type,jdbcType=VARCHAR}, #{status,jdbcType=BIT}, #{messageRouterReciverId,jdbcType=VARCHAR}, 32 #{type,jdbcType=VARCHAR}, #{status,jdbcType=BIT}, #{messageRouterReciverId,jdbcType=VARCHAR},
33 #{creatTime,jdbcType=TIMESTAMP}, #{updateTime,jdbcType=TIMESTAMP}) 33 #{creatTime,jdbcType=TIMESTAMP}, #{updateTime,jdbcType=TIMESTAMP})
34 </insert> 34 </insert>
35 - <insert id="insertSelective" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter" > 35 + <insert id="insertSelective" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter">
36 insert into message_router_reciver_filter 36 insert into message_router_reciver_filter
37 - <trim prefix="(" suffix=")" suffixOverrides="," >  
38 - <if test="id != null" > 37 + <trim prefix="(" suffix=")" suffixOverrides=",">
  38 + <if test="id != null">
39 id, 39 id,
40 </if> 40 </if>
41 - <if test="filter != null" > 41 + <if test="filter != null">
42 filter, 42 filter,
43 </if> 43 </if>
44 - <if test="filterValue != null" > 44 + <if test="filterValue != null">
45 filter_value, 45 filter_value,
46 </if> 46 </if>
47 - <if test="type != null" > 47 + <if test="type != null">
48 type, 48 type,
49 </if> 49 </if>
50 - <if test="status != null" > 50 + <if test="status != null">
51 status, 51 status,
52 </if> 52 </if>
53 - <if test="messageRouterReciverId != null" > 53 + <if test="messageRouterReciverId != null">
54 message_router_reciver_id, 54 message_router_reciver_id,
55 </if> 55 </if>
56 - <if test="creatTime != null" > 56 + <if test="creatTime != null">
57 creat_time, 57 creat_time,
58 </if> 58 </if>
59 - <if test="updateTime != null" > 59 + <if test="updateTime != null">
60 update_time, 60 update_time,
61 </if> 61 </if>
62 </trim> 62 </trim>
63 - <trim prefix="values (" suffix=")" suffixOverrides="," >  
64 - <if test="id != null" > 63 + <trim prefix="values (" suffix=")" suffixOverrides=",">
  64 + <if test="id != null">
65 #{id,jdbcType=VARCHAR}, 65 #{id,jdbcType=VARCHAR},
66 </if> 66 </if>
67 - <if test="filter != null" > 67 + <if test="filter != null">
68 #{filter,jdbcType=VARCHAR}, 68 #{filter,jdbcType=VARCHAR},
69 </if> 69 </if>
70 - <if test="filterValue != null" > 70 + <if test="filterValue != null">
71 #{filterValue,jdbcType=VARCHAR}, 71 #{filterValue,jdbcType=VARCHAR},
72 </if> 72 </if>
73 - <if test="type != null" > 73 + <if test="type != null">
74 #{type,jdbcType=VARCHAR}, 74 #{type,jdbcType=VARCHAR},
75 </if> 75 </if>
76 - <if test="status != null" > 76 + <if test="status != null">
77 #{status,jdbcType=BIT}, 77 #{status,jdbcType=BIT},
78 </if> 78 </if>
79 - <if test="messageRouterReciverId != null" > 79 + <if test="messageRouterReciverId != null">
80 #{messageRouterReciverId,jdbcType=VARCHAR}, 80 #{messageRouterReciverId,jdbcType=VARCHAR},
81 </if> 81 </if>
82 - <if test="creatTime != null" > 82 + <if test="creatTime != null">
83 #{creatTime,jdbcType=TIMESTAMP}, 83 #{creatTime,jdbcType=TIMESTAMP},
84 </if> 84 </if>
85 - <if test="updateTime != null" > 85 + <if test="updateTime != null">
86 #{updateTime,jdbcType=TIMESTAMP}, 86 #{updateTime,jdbcType=TIMESTAMP},
87 </if> 87 </if>
88 </trim> 88 </trim>
89 </insert> 89 </insert>
90 - <update id="updateByPrimaryKeySelective" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter" > 90 + <update id="updateByPrimaryKeySelective" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter">
91 update message_router_reciver_filter 91 update message_router_reciver_filter
92 - <set >  
93 - <if test="filter != null" > 92 + <set>
  93 + <if test="filter != null">
94 filter = #{filter,jdbcType=VARCHAR}, 94 filter = #{filter,jdbcType=VARCHAR},
95 </if> 95 </if>
96 - <if test="filterValue != null" > 96 + <if test="filterValue != null">
97 filter_value = #{filterValue,jdbcType=VARCHAR}, 97 filter_value = #{filterValue,jdbcType=VARCHAR},
98 </if> 98 </if>
99 - <if test="type != null" > 99 + <if test="type != null">
100 type = #{type,jdbcType=VARCHAR}, 100 type = #{type,jdbcType=VARCHAR},
101 </if> 101 </if>
102 - <if test="status != null" > 102 + <if test="status != null">
103 status = #{status,jdbcType=BIT}, 103 status = #{status,jdbcType=BIT},
104 </if> 104 </if>
105 - <if test="messageRouterReciverId != null" > 105 + <if test="messageRouterReciverId != null">
106 message_router_reciver_id = #{messageRouterReciverId,jdbcType=VARCHAR}, 106 message_router_reciver_id = #{messageRouterReciverId,jdbcType=VARCHAR},
107 </if> 107 </if>
108 - <if test="creatTime != null" > 108 + <if test="creatTime != null">
109 creat_time = #{creatTime,jdbcType=TIMESTAMP}, 109 creat_time = #{creatTime,jdbcType=TIMESTAMP},
110 </if> 110 </if>
111 - <if test="updateTime != null" > 111 + <if test="updateTime != null">
112 update_time = #{updateTime,jdbcType=TIMESTAMP}, 112 update_time = #{updateTime,jdbcType=TIMESTAMP},
113 </if> 113 </if>
114 </set> 114 </set>
115 where id = #{id,jdbcType=VARCHAR} 115 where id = #{id,jdbcType=VARCHAR}
116 </update> 116 </update>
117 - <update id="updateByPrimaryKey" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter" > 117 + <update id="updateByPrimaryKey" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter">
118 update message_router_reciver_filter 118 update message_router_reciver_filter
119 set filter = #{filter,jdbcType=VARCHAR}, 119 set filter = #{filter,jdbcType=VARCHAR},
120 filter_value = #{filterValue,jdbcType=VARCHAR}, 120 filter_value = #{filterValue,jdbcType=VARCHAR},