作者 朱兆平

kafka消息总线功能

正在显示 60 个修改的文件 包含 2677 行增加94 行删除
1 server: 1 server:
2 - port: 9030 2 + port: 19031
3 3
4 # spring 配置 4 # spring 配置
5 spring: 5 spring:
@@ -22,9 +22,9 @@ spring: @@ -22,9 +22,9 @@ spring:
22 # redis设置 22 # redis设置
23 redis: 23 redis:
24 database: 0 # Redis 数据库索引(默认为 0) 24 database: 0 # Redis 数据库索引(默认为 0)
25 - host: 192.168.37.139 # Redis 服务器地址 25 + host: 192.168.1.53 # Redis 服务器地址
26 port: 6379 # Redis 服务器连接端口 26 port: 6379 # Redis 服务器连接端口
27 - password: 123456 # Redis 服务器连接密码(默认为空) 27 + password: # Redis 服务器连接密码(默认为空)
28 lettuce: 28 lettuce:
29 pool: 29 pool:
30 max-active: 8 # 连接池最大连接数(使用负值表示没有限制) 默认 8 30 max-active: 8 # 连接池最大连接数(使用负值表示没有限制) 默认 8
@@ -34,10 +34,10 @@ spring: @@ -34,10 +34,10 @@ spring:
34 34
35 # rabbitmq配置 35 # rabbitmq配置
36 rabbitmq: 36 rabbitmq:
37 - host: 192.168.37.139 37 + host: 192.168.1.63
38 port: 5672 38 port: 5672
39 - username: rabbit  
40 - password: 123456 39 + username: mrz
  40 + password: vmvnv1v2
41 virtual-host: / 41 virtual-host: /
42 42
43 # 多环境配置 43 # 多环境配置
@@ -50,18 +50,12 @@ spring: @@ -50,18 +50,12 @@ spring:
50 time-zone: GMT+8 50 time-zone: GMT+8
51 date-format: yyyy-MM-dd HH:mm:ss 51 date-format: yyyy-MM-dd HH:mm:ss
52 52
53 - # zipkin 链路追踪配置  
54 - zipkin:  
55 - base-url: http://192.168.1.63:9411  
56 - sleuth:  
57 - sampler:  
58 - probability: 1  
59 53
60 # mybatis 配置 54 # mybatis 配置
61 mybatis: 55 mybatis:
62 mapper-locations: classpath:mapper/*.xml 56 mapper-locations: classpath:mapper/*.xml
63 type-aliases-package: com.sunyo.wlpt.message.bus.service.domain 57 type-aliases-package: com.sunyo.wlpt.message.bus.service.domain
64 - 58 +debug: false
65 # 日志配置 59 # 日志配置
66 logging: 60 logging:
67 config: config/logback-dev.xml 61 config: config/logback-dev.xml
@@ -19,8 +19,9 @@ @@ -19,8 +19,9 @@
19 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 19 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
20 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 20 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
21 <spring-cloud.version>Hoxton.RELEASE</spring-cloud.version> 21 <spring-cloud.version>Hoxton.RELEASE</spring-cloud.version>
  22 + <swagger2_version>2.9.2</swagger2_version>
22 <!-- springboot 2.2.1默认的es版本是6.8.4,下面的es的版本要和ES的版本一致 --> 23 <!-- springboot 2.2.1默认的es版本是6.8.4,下面的es的版本要和ES的版本一致 -->
23 - <elasticsearch.version>7.4.0</elasticsearch.version> 24 + <elasticsearch.version>7.5.1</elasticsearch.version>
24 </properties> 25 </properties>
25 26
26 <dependencies> 27 <dependencies>
@@ -31,6 +32,17 @@ @@ -31,6 +32,17 @@
31 </dependency> 32 </dependency>
32 33
33 <dependency> 34 <dependency>
  35 + <groupId>org.springframework.kafka</groupId>
  36 + <artifactId>spring-kafka</artifactId>
  37 + </dependency>
  38 + <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
  39 + <dependency>
  40 + <groupId>org.apache.kafka</groupId>
  41 + <artifactId>kafka_2.13</artifactId>
  42 + <version>2.6.0</version>
  43 + </dependency>
  44 +
  45 + <dependency>
34 <groupId>org.springframework.boot</groupId> 46 <groupId>org.springframework.boot</groupId>
35 <artifactId>spring-boot-starter</artifactId> 47 <artifactId>spring-boot-starter</artifactId>
36 </dependency> 48 </dependency>
@@ -59,6 +71,41 @@ @@ -59,6 +71,41 @@
59 <dependency> 71 <dependency>
60 <groupId>org.springframework.boot</groupId> 72 <groupId>org.springframework.boot</groupId>
61 <artifactId>spring-boot-starter-data-elasticsearch</artifactId> 73 <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
  74 + <exclusions>
  75 + <exclusion>
  76 + <groupId>org.elasticsearch</groupId>
  77 + <artifactId>elasticsearch</artifactId>
  78 + </exclusion>
  79 + <exclusion>
  80 + <groupId>org.elasticsearch.client</groupId>
  81 + <artifactId>elasticsearch-rest-high-level-client</artifactId>
  82 + </exclusion>
  83 + </exclusions>
  84 + </dependency>
  85 + <dependency>
  86 + <groupId>org.elasticsearch.client</groupId>
  87 + <artifactId>elasticsearch-rest-high-level-client</artifactId>
  88 + <version>${elasticsearch.version}</version>
  89 + <exclusions>
  90 + <exclusion>
  91 + <groupId>org.elasticsearch</groupId>
  92 + <artifactId>elasticsearch</artifactId>
  93 + </exclusion>
  94 + <exclusion>
  95 + <groupId>org.elasticsearch.client</groupId>
  96 + <artifactId>elasticsearch-rest-client</artifactId>
  97 + </exclusion>
  98 + </exclusions>
  99 + </dependency>
  100 + <dependency>
  101 + <groupId>org.elasticsearch.client</groupId>
  102 + <artifactId>elasticsearch-rest-client</artifactId>
  103 + <version>${elasticsearch.version}</version>
  104 + </dependency>
  105 + <dependency>
  106 + <groupId>org.elasticsearch</groupId>
  107 + <artifactId>elasticsearch</artifactId>
  108 + <version>${elasticsearch.version}</version>
62 </dependency> 109 </dependency>
63 <!-- SpringBoot end --> 110 <!-- SpringBoot end -->
64 111
@@ -79,11 +126,6 @@ @@ -79,11 +126,6 @@
79 </dependency> 126 </dependency>
80 127
81 <dependency> 128 <dependency>
82 - <groupId>org.springframework.cloud</groupId>  
83 - <artifactId>spring-cloud-starter-zipkin</artifactId>  
84 - </dependency>  
85 -  
86 - <dependency>  
87 <groupId>de.codecentric</groupId> 129 <groupId>de.codecentric</groupId>
88 <artifactId>spring-boot-admin-starter-client</artifactId> 130 <artifactId>spring-boot-admin-starter-client</artifactId>
89 <version>2.2.0</version> 131 <version>2.2.0</version>
@@ -148,16 +190,21 @@ @@ -148,16 +190,21 @@
148 <artifactId>log4j</artifactId> 190 <artifactId>log4j</artifactId>
149 <version>1.2.17</version> 191 <version>1.2.17</version>
150 </dependency> 192 </dependency>
151 - 193 + <!-- https://mvnrepository.com/artifact/com.github.xiaoymin/swagger-bootstrap-ui -->
  194 + <dependency>
  195 + <groupId>com.github.xiaoymin</groupId>
  196 + <artifactId>swagger-bootstrap-ui</artifactId>
  197 + <version>1.9.6</version>
  198 + </dependency>
152 <dependency> 199 <dependency>
153 <groupId>io.springfox</groupId> 200 <groupId>io.springfox</groupId>
154 <artifactId>springfox-swagger2</artifactId> 201 <artifactId>springfox-swagger2</artifactId>
155 <version>2.9.2</version> 202 <version>2.9.2</version>
156 </dependency> 203 </dependency>
157 <dependency> 204 <dependency>
158 - <groupId>com.github.xiaoymin</groupId>  
159 - <artifactId>swagger-bootstrap-ui</artifactId>  
160 - <version>1.9.6</version> 205 + <groupId>io.springfox</groupId>
  206 + <artifactId>springfox-swagger-ui</artifactId>
  207 + <version>2.9.2</version>
161 </dependency> 208 </dependency>
162 <dependency> 209 <dependency>
163 <groupId>joda-time</groupId> 210 <groupId>joda-time</groupId>
@@ -225,6 +272,17 @@ @@ -225,6 +272,17 @@
225 <groupId>org.springframework.boot</groupId> 272 <groupId>org.springframework.boot</groupId>
226 <artifactId>spring-boot-maven-plugin</artifactId> 273 <artifactId>spring-boot-maven-plugin</artifactId>
227 </plugin> 274 </plugin>
  275 + <!-- mybatis generator 自动生成代码插件 -->
  276 + <plugin>
  277 + <groupId>org.mybatis.generator</groupId>
  278 + <artifactId>mybatis-generator-maven-plugin</artifactId>
  279 + <version>1.3.2</version>
  280 + <configuration>
  281 + <configurationFile>${basedir}/src/main/resources/generator/generatorConfig.xml</configurationFile>
  282 + <overwrite>true</overwrite>
  283 + <verbose>true</verbose>
  284 + </configuration>
  285 + </plugin>
228 </plugins> 286 </plugins>
229 </build> 287 </build>
230 <repositories> 288 <repositories>
@@ -2,7 +2,9 @@ package com.sunyo.wlpt.message.bus.service; @@ -2,7 +2,9 @@ package com.sunyo.wlpt.message.bus.service;
2 2
3 import org.mybatis.spring.annotation.MapperScan; 3 import org.mybatis.spring.annotation.MapperScan;
4 import org.springframework.boot.SpringApplication; 4 import org.springframework.boot.SpringApplication;
  5 +import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
5 import org.springframework.boot.autoconfigure.SpringBootApplication; 6 import org.springframework.boot.autoconfigure.SpringBootApplication;
  7 +import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;
6 import org.springframework.cache.annotation.EnableCaching; 8 import org.springframework.cache.annotation.EnableCaching;
7 import org.springframework.cloud.netflix.eureka.EnableEurekaClient; 9 import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
8 import org.springframework.cloud.openfeign.EnableFeignClients; 10 import org.springframework.cloud.openfeign.EnableFeignClients;
@@ -21,6 +23,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement; @@ -21,6 +23,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
21 @EnableTransactionManagement 23 @EnableTransactionManagement
22 @EnableScheduling 24 @EnableScheduling
23 @EnableAsync 25 @EnableAsync
  26 +
24 public class MessageBusServiceApplication { 27 public class MessageBusServiceApplication {
25 28
26 public static void main(String[] args) { 29 public static void main(String[] args) {
  1 +package com.sunyo.wlpt.message.bus.service;
  2 +
  3 +import com.google.common.base.Predicates;
  4 +import org.springframework.context.annotation.Bean;
  5 +import org.springframework.context.annotation.Configuration;
  6 +import springfox.documentation.builders.ApiInfoBuilder;
  7 +import springfox.documentation.builders.PathSelectors;
  8 +import springfox.documentation.builders.RequestHandlerSelectors;
  9 +import springfox.documentation.service.ApiInfo;
  10 +import springfox.documentation.service.Contact;
  11 +import springfox.documentation.spi.DocumentationType;
  12 +import springfox.documentation.spring.web.plugins.Docket;
  13 +import springfox.documentation.swagger2.annotations.EnableSwagger2;
  14 +
  15 +@Configuration
  16 +@EnableSwagger2 // Swagger的开关,表示已经启用Swagger
  17 +public class SwaggerConfig {
  18 + @Bean
  19 + public Docket api() {
  20 + Docket docket = new Docket(DocumentationType.SWAGGER_2)
  21 + .apiInfo(apiInfo())
  22 + .pathMapping("/")
  23 + .select() // 选择哪些路径和api会生成document
  24 + .apis(RequestHandlerSelectors.any())// 对所有api进行监控
  25 +// .apis(RequestHandlerSelectors.basePackage("com.hanstrovsky.controller"))// 选择监控的package
  26 +// .apis(RequestHandlerSelectors.withMethodAnnotation(ApiOperation.class))// 只监控有ApiOperation注解的接口
  27 + //不显示错误的接口地址
  28 + .paths(Predicates.not(PathSelectors.regex("/error.*")))//错误路径不监控
  29 + .paths(PathSelectors.regex("/.*"))// 对根下所有路径进行监控
  30 + .build();
  31 + return docket;
  32 + }
  33 +
  34 + private ApiInfo apiInfo() {
  35 + return new ApiInfoBuilder().title("消息总线项目接口文档")
  36 + .contact(new Contact("Hanstrovsky", "www.hanstrovsky.com", "Hanstrovsky@gmail.com"))
  37 + .description("这是用Swagger动态生成的接口文档")
  38 + .termsOfServiceUrl("NO terms of service")
  39 + .license("The Apache License, Version 2.0")
  40 + .licenseUrl("http://www.apache.org/licenses/LICENSE-2.0.html")
  41 + .version("1.0")
  42 + .build();
  43 + }
  44 +}
  1 +package com.sunyo.wlpt.message.bus.service.bean;
  2 +
  3 +
  4 +import org.elasticsearch.client.RestClientBuilder;
  5 +import org.elasticsearch.client.RestHighLevelClient;
  6 +import org.springframework.beans.factory.annotation.Autowired;
  7 +import org.springframework.context.annotation.Bean;
  8 +import org.springframework.context.annotation.Configuration;
  9 +import org.springframework.data.elasticsearch.client.ClientConfiguration;
  10 +import org.springframework.data.elasticsearch.client.RestClients;
  11 +
  12 +@Configuration
  13 +public class EsRestClientConfig {
  14 +
  15 + @Bean(name = "EsHighLevelClient")
  16 + public RestHighLevelClient highLevelClient() {
  17 + final ClientConfiguration clientConfiguration= ClientConfiguration.builder().connectedTo("192.168.1.73:9200").build();
  18 + return RestClients.create(clientConfiguration).rest();
  19 + }
  20 +}
  1 +package com.sunyo.wlpt.message.bus.service.bean;
  2 +
  3 +import com.sunyo.wlpt.message.bus.service.domain.BusServer;
  4 +import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper;
  5 +import org.apache.kafka.clients.admin.AdminClient;
  6 +import org.apache.kafka.clients.admin.AdminClientConfig;
  7 +import org.springframework.context.annotation.Bean;
  8 +import org.springframework.context.annotation.Configuration;
  9 +import org.springframework.kafka.core.KafkaAdmin;
  10 +
  11 +import javax.annotation.Resource;
  12 +import java.util.HashMap;
  13 +import java.util.List;
  14 +import java.util.Map;
  15 +import java.util.stream.Collectors;
  16 +
  17 +//@Configuration
  18 +public class KafkaInitialConfiguration {
  19 +
  20 +// @Resource
  21 + private BusServerMapper busServerMapper;
  22 +
  23 +// @Bean
  24 + public AdminClient adminClient() {
  25 + Map<String, Object> configs = new HashMap<>();
  26 + String serverMap = ServerListForMap();
  27 + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
  28 + serverMap);
  29 + KafkaAdmin admin = new KafkaAdmin(configs);
  30 + return AdminClient.create(admin.getConfig());
  31 + }
  32 +
  33 + public String ServerListForMap(){
  34 + List<BusServer> serverList = busServerMapper.selectBusServerList(new BusServer());
  35 +
  36 + String KAFKA_SERVERS = serverList.stream().map(item -> {
  37 + return item.getServerIp()+":"+item.getServerPort();
  38 + })
  39 + .collect(Collectors.joining(","));
  40 +
  41 + return KAFKA_SERVERS;
  42 + }
  43 +}
@@ -13,6 +13,7 @@ import org.elasticsearch.client.RestClient; @@ -13,6 +13,7 @@ import org.elasticsearch.client.RestClient;
13 import org.elasticsearch.client.RestClientBuilder; 13 import org.elasticsearch.client.RestClientBuilder;
14 import org.elasticsearch.client.RestHighLevelClient; 14 import org.elasticsearch.client.RestHighLevelClient;
15 import org.springframework.beans.factory.annotation.Autowired; 15 import org.springframework.beans.factory.annotation.Autowired;
  16 +import org.springframework.beans.factory.annotation.Qualifier;
16 import org.springframework.context.annotation.Bean; 17 import org.springframework.context.annotation.Bean;
17 import org.springframework.context.annotation.Configuration; 18 import org.springframework.context.annotation.Configuration;
18 import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration; 19 import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
@@ -56,7 +57,7 @@ public class ElasticSearchConfig extends AbstractElasticsearchConfiguration { @@ -56,7 +57,7 @@ public class ElasticSearchConfig extends AbstractElasticsearchConfiguration {
56 57
57 58
58 @Override 59 @Override
59 - @Bean 60 +// @Bean
60 public RestHighLevelClient elasticsearchClient() 61 public RestHighLevelClient elasticsearchClient()
61 { 62 {
62 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); 63 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
1 -package com.sunyo.wlpt.message.bus.service.config;  
2 -  
3 -import org.springframework.context.annotation.Bean;  
4 -import org.springframework.context.annotation.Configuration;  
5 -import springfox.documentation.builders.ApiInfoBuilder;  
6 -import springfox.documentation.builders.PathSelectors;  
7 -import springfox.documentation.builders.RequestHandlerSelectors;  
8 -import springfox.documentation.service.ApiInfo;  
9 -import springfox.documentation.spi.DocumentationType;  
10 -import springfox.documentation.spring.web.plugins.Docket;  
11 -import springfox.documentation.swagger2.annotations.EnableSwagger2;  
12 -  
13 -/**  
14 - * @author 子诚  
15 - * Description:swagger-knife4j 的配置文件  
16 - * 时间:2020/7/10 11:48  
17 - */  
18 -@Configuration  
19 -@EnableSwagger2  
20 -public class SwaggerConfig {  
21 - @Bean  
22 - public Docket createRestApi()  
23 - {  
24 - return new Docket(DocumentationType.SWAGGER_2)  
25 - .apiInfo(apiInfo())  
26 - .select()  
27 - .apis(RequestHandlerSelectors.basePackage("com.sunyo.wlpt.message.bus.service.controller"))  
28 - .paths(PathSelectors.any())  
29 - .build();  
30 - }  
31 -  
32 - private ApiInfo apiInfo()  
33 - {  
34 - return new ApiInfoBuilder()  
35 - .title("消息总线平台--后台管理服务")  
36 - .description("消息总线平台--后台管理服务")  
37 - .termsOfServiceUrl("http://localhost:9030/")  
38 - .contact("子诚")  
39 - .version("1.0.0")  
40 - .build();  
41 - }  
42 -}  
@@ -2,9 +2,14 @@ package com.sunyo.wlpt.message.bus.service.controller; @@ -2,9 +2,14 @@ package com.sunyo.wlpt.message.bus.service.controller;
2 2
3 import com.github.pagehelper.PageInfo; 3 import com.github.pagehelper.PageInfo;
4 import com.sunyo.wlpt.message.bus.service.domain.BusQueue; 4 import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
  5 +import com.sunyo.wlpt.message.bus.service.model.ConsumerGroupOffsets;
5 import com.sunyo.wlpt.message.bus.service.response.ResultJson; 6 import com.sunyo.wlpt.message.bus.service.response.ResultJson;
6 import com.sunyo.wlpt.message.bus.service.service.BusQueueService; 7 import com.sunyo.wlpt.message.bus.service.service.BusQueueService;
  8 +import com.sunyo.wlpt.message.bus.service.service.KafkaService;
7 import com.sunyo.wlpt.message.bus.service.service.UserInfoService; 9 import com.sunyo.wlpt.message.bus.service.service.UserInfoService;
  10 +import org.apache.kafka.clients.admin.AdminClient;
  11 +import org.springframework.beans.factory.annotation.Autowired;
  12 +import org.springframework.stereotype.Service;
8 import org.springframework.web.bind.annotation.*; 13 import org.springframework.web.bind.annotation.*;
9 14
10 import javax.annotation.Resource; 15 import javax.annotation.Resource;
@@ -30,6 +35,10 @@ public class BusQueueController { @@ -30,6 +35,10 @@ public class BusQueueController {
30 @Resource 35 @Resource
31 private BusQueueService busQueueService; 36 private BusQueueService busQueueService;
32 37
  38 + @Resource
  39 + private KafkaService kafkaService;
  40 +
  41 +
33 /** 42 /**
34 * 分页查询,消息队列-列表 43 * 分页查询,消息队列-列表
35 * 44 *
@@ -182,4 +191,12 @@ public class BusQueueController { @@ -182,4 +191,12 @@ public class BusQueueController {
182 } 191 }
183 } 192 }
184 193
  194 + @GetMapping("monitor")
  195 + public ResultJson<List<ConsumerGroupOffsets>> queueMonitor(){
  196 +
  197 + List<ConsumerGroupOffsets> result = kafkaService.queueMonitor();
  198 +
  199 + return new ResultJson<List<ConsumerGroupOffsets>>("200","success",result);
  200 + }
  201 +
185 } 202 }
  1 +package com.sunyo.wlpt.message.bus.service.controller;
  2 +
  3 +
  4 +import com.github.pagehelper.PageInfo;
  5 +import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
  6 +import com.sunyo.wlpt.message.bus.service.model.MessageType;
  7 +import com.sunyo.wlpt.message.bus.service.response.ResultJson;
  8 +import com.sunyo.wlpt.message.bus.service.service.MessageTypeService;
  9 +import org.springframework.beans.factory.annotation.Autowired;
  10 +import org.springframework.stereotype.Service;
  11 +import org.springframework.web.bind.annotation.*;
  12 +
  13 +import javax.annotation.Resource;
  14 +import java.util.List;
  15 +
  16 +@RestController
  17 +@RequestMapping("bus/message/type")
  18 +public class MessageTypesController {
  19 +
  20 + @Autowired
  21 + MessageTypeService messageTypeService;
  22 +
  23 + @GetMapping("list")
  24 + public ResultJson<List<MessageType>> typeList()
  25 + {
  26 + MessageType messageType = new MessageType();
  27 + messageType.setPageNum(0);
  28 + messageType.setPageSize(1000000);
  29 + List<MessageType> list = messageTypeService.list(messageType);
  30 + return new ResultJson("200","success",list);
  31 + }
  32 +
  33 + @PostMapping("list")
  34 + public ResultJson<PageInfo<MessageType>> typeListPage(@RequestBody MessageType messageType)
  35 + {
  36 + PageInfo<MessageType> list = messageTypeService.pageList(messageType);
  37 + return new ResultJson("200","success",list);
  38 + }
  39 +}
  1 +package com.sunyo.wlpt.message.bus.service.controller;
  2 +
  3 +
  4 +import io.swagger.annotations.ApiOperation;
  5 +import org.springframework.web.bind.annotation.PostMapping;
  6 +import org.springframework.web.bind.annotation.RequestMapping;
  7 +import org.springframework.web.bind.annotation.RestController;
  8 +
  9 +@RequestMapping("router")
  10 +@RestController
  11 +public class RouterController {
  12 +
  13 + @ApiOperation(value = "批量添加消息路由", notes = "超级管理修改其他用户密码")
  14 + @PostMapping
  15 + public void batchAddRouter(){
  16 +
  17 +
  18 +
  19 + }
  20 +}
1 package com.sunyo.wlpt.message.bus.service.controller.es; 1 package com.sunyo.wlpt.message.bus.service.controller.es;
2 2
3 import com.sunyo.wlpt.message.bus.service.domain.es.ElasticSearchInfo; 3 import com.sunyo.wlpt.message.bus.service.domain.es.ElasticSearchInfo;
  4 +import com.sunyo.wlpt.message.bus.service.exception.CustomException;
  5 +import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType;
  6 +import com.sunyo.wlpt.message.bus.service.model.ESPage;
  7 +import com.sunyo.wlpt.message.bus.service.model.MessageBusMsg;
4 import com.sunyo.wlpt.message.bus.service.response.ResultJson; 8 import com.sunyo.wlpt.message.bus.service.response.ResultJson;
5 import com.sunyo.wlpt.message.bus.service.service.ElasticSearchInfoService; 9 import com.sunyo.wlpt.message.bus.service.service.ElasticSearchInfoService;
  10 +import lombok.extern.slf4j.Slf4j;
  11 +import org.apache.commons.lang.StringUtils;
  12 +import org.elasticsearch.index.query.BoolQueryBuilder;
  13 +import org.elasticsearch.index.query.MatchQueryBuilder;
  14 +import org.elasticsearch.index.query.QueryBuilders;
  15 +import org.elasticsearch.index.query.WildcardQueryBuilder;
  16 +import org.elasticsearch.search.builder.SearchSourceBuilder;
  17 +import org.springframework.data.domain.Page;
  18 +import org.springframework.data.domain.Pageable;
  19 +import org.springframework.data.domain.Sort;
  20 +import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
  21 +import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
6 import org.springframework.web.bind.annotation.*; 22 import org.springframework.web.bind.annotation.*;
7 23
8 import javax.annotation.Resource; 24 import javax.annotation.Resource;
  25 +import java.util.Date;
  26 +import java.util.List;
9 27
10 /** 28 /**
11 * @author 子诚 29 * @author 子诚
12 * Description: 30 * Description:
13 * 时间:2020/9/10 17:09 31 * 时间:2020/9/10 17:09
14 */ 32 */
  33 +@Slf4j
15 @CrossOrigin 34 @CrossOrigin
16 @RequestMapping("bus/es") 35 @RequestMapping("bus/es")
17 @RestController 36 @RestController
@@ -19,6 +38,7 @@ public class ElasticSearchInfoController { @@ -19,6 +38,7 @@ public class ElasticSearchInfoController {
19 @Resource 38 @Resource
20 private ElasticSearchInfoService elasticSearchInfoService; 39 private ElasticSearchInfoService elasticSearchInfoService;
21 40
  41 +
22 /** 42 /**
23 * 分页查询,ES信息列表 43 * 分页查询,ES信息列表
24 * 44 *
@@ -82,4 +102,90 @@ public class ElasticSearchInfoController { @@ -82,4 +102,90 @@ public class ElasticSearchInfoController {
82 { 102 {
83 return elasticSearchInfoService.updateByPrimaryKeySelective(elasticSearchInfo); 103 return elasticSearchInfoService.updateByPrimaryKeySelective(elasticSearchInfo);
84 } 104 }
  105 +
  106 +
  107 + @RequestMapping("/search/wildmsg")
  108 + public ResultJson searchMsg(@RequestBody MessageBusMsg messageBusMsg)
  109 + {
  110 + Sort sort = Sort.by(Sort.Direction.DESC, "ddtm","creatime");
  111 + //前端提交的起始页从1开始,ES是从0开始
  112 + ESPage page = ESPage.of(messageBusMsg.getPageNum()-1,messageBusMsg.getPageSize(),sort);
  113 + WildcardQueryBuilder queryBuilders=null;
  114 + if(StringUtils.isNotBlank(messageBusMsg.getAlias_sendContent())){
  115 + queryBuilders = QueryBuilders.wildcardQuery("msg", "*"+messageBusMsg.getAlias_sendContent().toLowerCase()+"*");
  116 + }else {
  117 + return ResultJson.error(new CustomException(CustomExceptionType.SEARCH_EXCEPTION));
  118 + }
  119 +
  120 + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  121 + searchSourceBuilder.query(queryBuilders);
  122 + Page<MessageBusMsg> data = elasticSearchInfoService.search(searchSourceBuilder,MessageBusMsg.class,page);
  123 + ResultJson resultJson = new ResultJson("200","success",data);
  124 + return resultJson;
  125 + }
  126 +
  127 + @PostMapping("/search/term")
  128 + public ResultJson search(@RequestBody MessageBusMsg messageBusMsg)
  129 + {
  130 +
  131 + Sort sort = Sort.by(Sort.Direction.DESC, "ddtm","creatime");
  132 + ESPage page = ESPage.of(messageBusMsg.getPageNum()-1,messageBusMsg.getPageSize(),sort);
  133 + /**
  134 + * term精确字段检索不要与matchQuery检索混用
  135 + */
  136 + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
  137 +
  138 + queryBuilder = esQueryFilter(queryBuilder,messageBusMsg);
  139 + log.info(new Date().toString());
  140 + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  141 + searchSourceBuilder.query(queryBuilder);
  142 + Page<MessageBusMsg> data = elasticSearchInfoService.search(searchSourceBuilder,MessageBusMsg.class,page);
  143 + ResultJson resultJson = new ResultJson("200","success",data);
  144 + return resultJson;
  145 + }
  146 +
  147 + public BoolQueryBuilder esQueryFilter(BoolQueryBuilder queryBuilder, MessageBusMsg messageBusMsg){
  148 +
  149 + //and条件
  150 + if (StringUtils.isNotBlank(messageBusMsg.getSndr())){
  151 + queryBuilder = queryBuilder.must(QueryBuilders.termQuery("sndr",messageBusMsg.getSndr()));
  152 + }
  153 + if (StringUtils.isNotBlank(messageBusMsg.getBtype())){
  154 + queryBuilder = queryBuilder.must(QueryBuilders.termQuery("btype",messageBusMsg.getBtype()));
  155 + }
  156 + if (StringUtils.isNotBlank(messageBusMsg.getStype())){
  157 + queryBuilder = queryBuilder.must(QueryBuilders.termQuery("stype",messageBusMsg.getStype()));
  158 + }
  159 +
  160 + if (StringUtils.isNotBlank(messageBusMsg.getSeqn())){
  161 + queryBuilder = queryBuilder.must(QueryBuilders.termQuery("seqn",messageBusMsg.getSeqn()));
  162 + }
  163 + if (StringUtils.isNotBlank(messageBusMsg.getMsgid())){
  164 + queryBuilder = queryBuilder.must(QueryBuilders.termQuery("msgid",messageBusMsg.getMsgid()));
  165 + }
  166 +
  167 + if (StringUtils.isNotBlank(messageBusMsg.getRcvrsUserName())){
  168 + queryBuilder = queryBuilder.should(QueryBuilders.termQuery("rcvrs.username",messageBusMsg.getRcvrsUserName()));
  169 + }
  170 +
  171 + if (StringUtils.isNotBlank(messageBusMsg.getRcvlogUsername())){
  172 + queryBuilder = queryBuilder.should(QueryBuilders.termQuery("rcvlog.username",messageBusMsg.getRcvlogUsername()));
  173 + }
  174 + if (messageBusMsg.getCreatimeSearch() !=null && !messageBusMsg.getCreatimeSearch().isEmpty() && StringUtils.isNotBlank(messageBusMsg.getCreatimeSearch().get(0)) && StringUtils.isNotBlank(messageBusMsg.getCreatimeSearch().get(1))){
  175 + queryBuilder = queryBuilder.must(QueryBuilders.rangeQuery("ddtm").from(messageBusMsg.getCreatimeSearch().get(0)).to(messageBusMsg.getCreatimeSearch().get(1)));
  176 + //or条件
  177 + queryBuilder = queryBuilder.should(QueryBuilders.rangeQuery("creatime").from(messageBusMsg.getCreatimeSearch().get(0)).to(messageBusMsg.getCreatimeSearch().get(1)));
  178 + }
  179 +
  180 + if (StringUtils.isNotBlank(messageBusMsg.getMsgid())){
  181 + queryBuilder = queryBuilder.should(QueryBuilders.termQuery("_id",messageBusMsg.getMsgid()));
  182 + }
  183 + //wild搜索大写搜不到小写也搜不到大写的包含,要转成小写
  184 + if (StringUtils.isNotBlank(messageBusMsg.getAlias_sendContent())){
  185 + queryBuilder = queryBuilder.must(QueryBuilders.wildcardQuery("msg", "*"+messageBusMsg.getAlias_sendContent().toLowerCase()+"*"));
  186 + }
  187 + log.info("bool查询语句为:{}",queryBuilder);
  188 + return queryBuilder;
  189 + }
  190 +
85 } 191 }
@@ -86,4 +86,20 @@ public class BusQueue implements Serializable { @@ -86,4 +86,20 @@ public class BusQueue implements Serializable {
86 * 一个队列对应一个虚拟机 86 * 一个队列对应一个虚拟机
87 */ 87 */
88 private VirtualHost virtualHost; 88 private VirtualHost virtualHost;
  89 +
  90 + /**
  91 + * topic partition数量,默认3
  92 + */
  93 + private int partitionCount;
  94 +
  95 + /**
  96 + * 队列类型,默认1 kafka
  97 + */
  98 + private int queueType;
  99 + private int serverType;
  100 +
  101 + /**
  102 + * 消费者组名
  103 + */
  104 + private String consumerGroupName;
89 } 105 }
@@ -90,4 +90,8 @@ public class BusServer implements Serializable { @@ -90,4 +90,8 @@ public class BusServer implements Serializable {
90 */ 90 */
91 private String aliasName; 91 private String aliasName;
92 92
  93 + /**
  94 + * 服务类型 1kafka 2rabbit
  95 + */
  96 + private String serverType;
93 } 97 }
@@ -14,6 +14,8 @@ public enum CustomExceptionType { @@ -14,6 +14,8 @@ public enum CustomExceptionType {
14 SERVER_EXCEPTION("10500", "服务器异常,发送消息失败!"), 14 SERVER_EXCEPTION("10500", "服务器异常,发送消息失败!"),
15 CLIENT_EXCEPTION("10400", "报文格式错误,请检查报文格式!"), 15 CLIENT_EXCEPTION("10400", "报文格式错误,请检查报文格式!"),
16 16
  17 + SEARCH_EXCEPTION("10600", "搜索内容不能为空"),
  18 +
17 BINDING_ERROR("10501", "配置信息,未进行绑定!"), 19 BINDING_ERROR("10501", "配置信息,未进行绑定!"),
18 SENDER_ERROR("10401", "报文格式错误,发送者不能为空!"), 20 SENDER_ERROR("10401", "报文格式错误,发送者不能为空!"),
19 CONTENT_ERROR("10402", "报文格式错误,消息内容不能为空!"), 21 CONTENT_ERROR("10402", "报文格式错误,消息内容不能为空!"),
@@ -45,6 +47,8 @@ public enum CustomExceptionType { @@ -45,6 +47,8 @@ public enum CustomExceptionType {
45 RECEIVE_HOST_NO_EXIST("30414", "参数错误,虚拟主机名称不能为空!"), 47 RECEIVE_HOST_NO_EXIST("30414", "参数错误,虚拟主机名称不能为空!"),
46 RECEIVE_QUEUE_NO_EXIST("30415", "参数错误,队列名称不能为空!"), 48 RECEIVE_QUEUE_NO_EXIST("30415", "参数错误,队列名称不能为空!"),
47 49
  50 + KAFKA_QUEUE_ADD_ERR("40000","添加kafka队列失败"),
  51 +
48 52
49 CLIENT_ERROR("400", "客户端异常"), 53 CLIENT_ERROR("400", "客户端异常"),
50 SYSTEM_ERROR("500", "系统服务异常"), 54 SYSTEM_ERROR("500", "系统服务异常"),
  1 +package com.sunyo.wlpt.message.bus.service.mapper;
  2 +
  3 +import com.sunyo.wlpt.message.bus.service.model.Btype;
  4 +
  5 +public interface BtypeMapper {
  6 + int deleteByPrimaryKey(String id);
  7 +
  8 + int insert(Btype record);
  9 +
  10 + int insertSelective(Btype record);
  11 +
  12 + Btype selectByPrimaryKey(String id);
  13 +
  14 + int updateByPrimaryKeySelective(Btype record);
  15 +
  16 + int updateByPrimaryKey(Btype record);
  17 +}
  1 +package com.sunyo.wlpt.message.bus.service.mapper;
  2 +
  3 +import com.sunyo.wlpt.message.bus.service.model.ConsumerGroup;
  4 +
  5 +import java.util.List;
  6 +
  7 +public interface ConsumerGroupMapper {
  8 + int deleteByPrimaryKey(String id);
  9 +
  10 + int insert(ConsumerGroup record);
  11 +
  12 + int insertSelective(ConsumerGroup record);
  13 +
  14 + ConsumerGroup selectByPrimaryKey(String id);
  15 +
  16 + List<String> groups();
  17 +
  18 + int updateByPrimaryKeySelective(ConsumerGroup record);
  19 +
  20 + int updateByPrimaryKey(ConsumerGroup record);
  21 +}
  1 +package com.sunyo.wlpt.message.bus.service.mapper;
  2 +
  3 +import com.sunyo.wlpt.message.bus.service.model.MessageRouter;
  4 +
  5 +public interface MessageRouterMapper {
  6 + int deleteByPrimaryKey(String id);
  7 +
  8 + int insert(MessageRouter record);
  9 +
  10 + int insertSelective(MessageRouter record);
  11 +
  12 + MessageRouter selectByPrimaryKey(String id);
  13 +
  14 + int updateByPrimaryKeySelective(MessageRouter record);
  15 +
  16 + int updateByPrimaryKey(MessageRouter record);
  17 +}
  1 +package com.sunyo.wlpt.message.bus.service.mapper;
  2 +
  3 +import com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter;
  4 +
  5 +public interface MessageRouterReciverFilterMapper {
  6 + int deleteByPrimaryKey(String id);
  7 +
  8 + int insert(MessageRouterReciverFilter record);
  9 +
  10 + int insertSelective(MessageRouterReciverFilter record);
  11 +
  12 + MessageRouterReciverFilter selectByPrimaryKey(String id);
  13 +
  14 + int updateByPrimaryKeySelective(MessageRouterReciverFilter record);
  15 +
  16 + int updateByPrimaryKey(MessageRouterReciverFilter record);
  17 +}
  1 +package com.sunyo.wlpt.message.bus.service.mapper;
  2 +
  3 +import com.sunyo.wlpt.message.bus.service.model.MessageRouterReciver;
  4 +
  5 +public interface MessageRouterReciverMapper {
  6 + int deleteByPrimaryKey(String id);
  7 +
  8 + int insert(MessageRouterReciver record);
  9 +
  10 + int insertSelective(MessageRouterReciver record);
  11 +
  12 + MessageRouterReciver selectByPrimaryKey(String id);
  13 +
  14 + int updateByPrimaryKeySelective(MessageRouterReciver record);
  15 +
  16 + int updateByPrimaryKey(MessageRouterReciver record);
  17 +}
  1 +package com.sunyo.wlpt.message.bus.service.mapper;
  2 +
  3 +import com.sunyo.wlpt.message.bus.service.model.MessageType;
  4 +
  5 +import java.util.List;
  6 +
  7 +public interface MessageTypeMapper {
  8 + int deleteByPrimaryKey(String id);
  9 +
  10 + int insert(MessageType record);
  11 +
  12 + int insertSelective(MessageType record);
  13 +
  14 + MessageType selectByPrimaryKey(String id);
  15 +
  16 + List<MessageType> selectByParentId(String id);
  17 +
  18 + List<MessageType> selectAll(MessageType record);
  19 +
  20 + int updateByPrimaryKeySelective(MessageType record);
  21 +
  22 + int updateByPrimaryKey(MessageType record);
  23 +}
  1 +package com.sunyo.wlpt.message.bus.service.mapper;
  2 +
  3 +import com.sunyo.wlpt.message.bus.service.model.Stype;
  4 +
  5 +public interface StypeMapper {
  6 + int deleteByPrimaryKey(String id);
  7 +
  8 + int insert(Stype record);
  9 +
  10 + int insertSelective(Stype record);
  11 +
  12 + Stype selectByPrimaryKey(String id);
  13 +
  14 + int updateByPrimaryKeySelective(Stype record);
  15 +
  16 + int updateByPrimaryKey(Stype record);
  17 +}
  1 +package com.sunyo.wlpt.message.bus.service.mapper;
  2 +
  3 +import com.sunyo.wlpt.message.bus.service.model.UserTopic;
  4 +
  5 +public interface UserTopicMapper {
  6 + int deleteByPrimaryKey(String id);
  7 +
  8 + int insert(UserTopic record);
  9 +
  10 + int insertSelective(UserTopic record);
  11 +
  12 + UserTopic selectByPrimaryKey(String id);
  13 +
  14 + int updateByPrimaryKeySelective(UserTopic record);
  15 +
  16 + int updateByPrimaryKey(UserTopic record);
  17 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +import lombok.Data;
  4 +
  5 +@Data
  6 +public class BasePage {
  7 + /**
  8 + * //开始页
  9 + */
  10 + private int pageNum =0;
  11 +
  12 + /**
  13 + * 每页多少条数据
  14 + */
  15 + private int pageSize=10;
  16 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +import java.util.List;
  4 +
  5 +public class Btype {
  6 + private String id;
  7 +
  8 + private String name;
  9 +
  10 + private String parentId;
  11 +
  12 + private String des;
  13 +
  14 + private List<Stype> children;
  15 +
  16 + public String getId() {
  17 + return id;
  18 + }
  19 +
  20 + public void setId(String id) {
  21 + this.id = id == null ? null : id.trim();
  22 + }
  23 +
  24 + public String getName() {
  25 + return name;
  26 + }
  27 +
  28 + public void setName(String name) {
  29 + this.name = name == null ? null : name.trim();
  30 + }
  31 +
  32 + public String getParentId() {
  33 + return parentId;
  34 + }
  35 +
  36 + public void setParentId(String parentId) {
  37 + this.parentId = parentId == null ? null : parentId.trim();
  38 + }
  39 +
  40 + public String getDes() {
  41 + return des;
  42 + }
  43 +
  44 + public void setDes(String des) {
  45 + this.des = des == null ? null : des.trim();
  46 + }
  47 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +public class ConsumerGroup {
  4 + private String id;
  5 +
  6 + private String name;
  7 +
  8 + private String des;
  9 +
  10 + private String userId;
  11 +
  12 + private String username;
  13 +
  14 + public String getId() {
  15 + return id;
  16 + }
  17 +
  18 + public void setId(String id) {
  19 + this.id = id == null ? null : id.trim();
  20 + }
  21 +
  22 + public String getName() {
  23 + return name;
  24 + }
  25 +
  26 + public void setName(String name) {
  27 + this.name = name == null ? null : name.trim();
  28 + }
  29 +
  30 + public String getDes() {
  31 + return des;
  32 + }
  33 +
  34 + public void setDes(String des) {
  35 + this.des = des == null ? null : des.trim();
  36 + }
  37 +
  38 + public String getUserId() {
  39 + return userId;
  40 + }
  41 +
  42 + public void setUserId(String userId) {
  43 + this.userId = userId == null ? null : userId.trim();
  44 + }
  45 +
  46 + public String getUsername() {
  47 + return username;
  48 + }
  49 +
  50 + public void setUsername(String username) {
  51 + this.username = username == null ? null : username.trim();
  52 + }
  53 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +import lombok.Data;
  4 +
  5 +/**
  6 + * @author mrz
  7 + */
  8 +@Data
  9 +public class ConsumerGroupOffsets {
  10 + private String groupName;
  11 + private String topic;
  12 + private int partition;
  13 + private long offset;
  14 + private long endoffset;
  15 + private long lag;
  16 +
  17 + public long getLag() {
  18 + return this.lag;
  19 + }
  20 + public void setLag(){
  21 + this.lag = endoffset - offset;
  22 + }
  23 +
  24 + public ConsumerGroupOffsets(String consumerGroupName, String topic, int partition, long offset, long endoffset) {
  25 + this.groupName = consumerGroupName;
  26 + this.topic = topic;
  27 + this.partition = partition;
  28 + this.offset = offset;
  29 + this.endoffset = endoffset;
  30 + }
  31 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +public class ESMessageModel {
  4 + private String _index;
  5 + private String _type;
  6 + private String _id;
  7 + private int _score;
  8 + private Object _source;
  9 +
  10 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +import lombok.Data;
  4 +import org.springframework.data.domain.PageRequest;
  5 +import org.springframework.data.domain.Pageable;
  6 +import org.springframework.data.domain.Sort;
  7 +import org.springframework.util.Assert;
  8 +
  9 +
  10 +public class ESPage implements Pageable {
  11 + private static final long serialVersionUID = -4541509938956089563L;
  12 +
  13 + /**
  14 + * //开始页
  15 + */
  16 + private int pageNum;
  17 +
  18 + /**
  19 + * 每页多少条数据
  20 + */
  21 + private int pageSize;
  22 +
  23 + /**
  24 + * 排序字段
  25 + */
  26 + final Sort sort;
  27 +
  28 + protected ESPage(int pageNumber, int pageSize, Sort sort) {
  29 + if (pageNumber < 0) {
  30 + pageNumber = 0;
  31 + } else if (pageSize < 1) {
  32 + pageSize = 10;
  33 + }
  34 +
  35 + this.pageNum = pageNumber;
  36 + this.pageSize = pageSize;
  37 +
  38 + Assert.notNull(sort, "Sort must not be null!");
  39 + this.sort = sort;
  40 +
  41 + }
  42 +
  43 + public ESPage(Sort sort) {
  44 + this.sort=sort;
  45 + }
  46 +
  47 + public static ESPage of(int page, int size) {
  48 + return of(page, size, Sort.unsorted());
  49 + }
  50 +
  51 + public static ESPage of(int page, int size, Sort sort) {
  52 + return new ESPage(page, size, sort);
  53 + }
  54 +
  55 + public static ESPage of(int page, int size, Sort.Direction direction, String... properties) {
  56 + return of(page, size, Sort.by(direction, properties));
  57 + }
  58 +
  59 + @Override
  60 + public int getPageNumber() {
  61 + return this.pageNum;
  62 + }
  63 +
  64 + @Override
  65 + public long getOffset() {
  66 + return (long)this.pageNum * (long)this.pageSize;
  67 + }
  68 +
  69 + @Override
  70 + public Pageable next() {
  71 + return new ESPage(this.getPageNumber() + 1, this.getPageSize(), this.getSort());
  72 + }
  73 +
  74 + @Override
  75 + public Pageable previousOrFirst() {
  76 + return this.hasPrevious() ? this.previous() : this.first();
  77 + }
  78 +
  79 + @Override
  80 + public Pageable first() {
  81 + return new ESPage(0, this.getPageSize(), this.getSort());
  82 + }
  83 +
  84 + @Override
  85 + public boolean hasPrevious() {
  86 + return this.pageNum > 0;
  87 + }
  88 +
  89 + @Override
  90 + public int getPageSize() {
  91 + return this.pageSize;
  92 + }
  93 +
  94 + @Override
  95 + public Sort getSort() {
  96 + return this.sort;
  97 + }
  98 +
  99 + @Override
  100 + public String toString() {
  101 + return String.format("Page request [number: %d, size %d, sort: %s]", this.getPageNumber(), this.getPageSize(), this.sort);
  102 + }
  103 +
  104 + public Pageable previous() {
  105 + return this.getPageNumber() == 0 ? this : new ESPage(this.getPageNumber() - 1, this.getPageSize(), this.getSort());
  106 + }
  107 +
  108 +
  109 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +import lombok.Data;
  4 +import org.springframework.data.domain.Sort;
  5 +
  6 +import java.util.Date;
  7 +import java.util.List;
  8 +
  9 +@Data
  10 +public class MessageBusMsg{
  11 + private String sndr;
  12 + private String rcvr;
  13 + private String btype;
  14 + private String stype;
  15 + private Date ddtm;
  16 + private String seqn;
  17 + private String msgid;
  18 + private String msg;
  19 + private Date creatime;
  20 +
  21 + /**
  22 + * 查询搜索字段
  23 + */
  24 + private List<String> creatimeSearch;
  25 + //消息接收者
  26 + private String rcvrsUserName;
  27 + //路由接收者
  28 + private String rcvlogUsername;
  29 + //搜索内容
  30 + private String alias_sendContent;
  31 + /**
  32 + * //开始页
  33 + */
  34 + private int pageNum;
  35 +
  36 + /**
  37 + * 每页多少条数据
  38 + */
  39 + private int pageSize;
  40 +
  41 +
  42 + private List<MessageBusMsgTarget> target;
  43 + private List<MessageBusMsgRcvrs> rcvrs;
  44 + private List<MessageBusMsgRcvlog> rcvlog;
  45 +
  46 +
  47 + public void setAlias_sendContent(String alias_sendContent) {
  48 + this.msg = alias_sendContent;
  49 + this.alias_sendContent = alias_sendContent;
  50 + }
  51 +
  52 +
  53 + public void setMsg(String msg) {
  54 + this.msg = msg;
  55 + this.alias_sendContent = msg;
  56 + }
  57 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +import lombok.Data;
  4 +
  5 +import java.util.Date;
  6 +
  7 +@Data
  8 +public class MessageBusMsgRcvlog {
  9 + private String username;
  10 + private Date rvtm;
  11 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +import lombok.Data;
  4 +
  5 +@Data
  6 +public class MessageBusMsgRcvrs {
  7 +
  8 + private String username;
  9 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +import lombok.Data;
  4 +
  5 +@Data
  6 +public class MessageBusMsgTarget {
  7 + private String filter;
  8 + private String value;
  9 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +import java.util.Date;
  4 +
  5 +public class MessageRouter {
  6 + private String id;
  7 +
  8 + private String sndr;
  9 +
  10 + private String btype;
  11 +
  12 + private String stype;
  13 +
  14 + private String optype;
  15 +
  16 + private Byte msgLimit;
  17 +
  18 + private String character;
  19 +
  20 + private Boolean status;
  21 +
  22 + private Boolean usage;
  23 +
  24 + private String des;
  25 +
  26 + private String ver;
  27 +
  28 + private Date creatTime;
  29 +
  30 + private Date updateTime;
  31 +
  32 + public String getId() {
  33 + return id;
  34 + }
  35 +
  36 + public void setId(String id) {
  37 + this.id = id == null ? null : id.trim();
  38 + }
  39 +
  40 + public String getSndr() {
  41 + return sndr;
  42 + }
  43 +
  44 + public void setSndr(String sndr) {
  45 + this.sndr = sndr == null ? null : sndr.trim();
  46 + }
  47 +
  48 + public String getBtype() {
  49 + return btype;
  50 + }
  51 +
  52 + public void setBtype(String btype) {
  53 + this.btype = btype == null ? null : btype.trim();
  54 + }
  55 +
  56 + public String getStype() {
  57 + return stype;
  58 + }
  59 +
  60 + public void setStype(String stype) {
  61 + this.stype = stype == null ? null : stype.trim();
  62 + }
  63 +
  64 + public String getOptype() {
  65 + return optype;
  66 + }
  67 +
  68 + public void setOptype(String optype) {
  69 + this.optype = optype == null ? null : optype.trim();
  70 + }
  71 +
  72 + public Byte getMsgLimit() {
  73 + return msgLimit;
  74 + }
  75 +
  76 + public void setMsgLimit(Byte msgLimit) {
  77 + this.msgLimit = msgLimit;
  78 + }
  79 +
  80 + public String getCharacter() {
  81 + return character;
  82 + }
  83 +
  84 + public void setCharacter(String character) {
  85 + this.character = character == null ? null : character.trim();
  86 + }
  87 +
  88 + public Boolean getStatus() {
  89 + return status;
  90 + }
  91 +
  92 + public void setStatus(Boolean status) {
  93 + this.status = status;
  94 + }
  95 +
  96 + public Boolean getUsage() {
  97 + return usage;
  98 + }
  99 +
  100 + public void setUsage(Boolean usage) {
  101 + this.usage = usage;
  102 + }
  103 +
  104 + public String getDes() {
  105 + return des;
  106 + }
  107 +
  108 + public void setDes(String des) {
  109 + this.des = des == null ? null : des.trim();
  110 + }
  111 +
  112 + public String getVer() {
  113 + return ver;
  114 + }
  115 +
  116 + public void setVer(String ver) {
  117 + this.ver = ver == null ? null : ver.trim();
  118 + }
  119 +
  120 + public Date getCreatTime() {
  121 + return creatTime;
  122 + }
  123 +
  124 + public void setCreatTime(Date creatTime) {
  125 + this.creatTime = creatTime;
  126 + }
  127 +
  128 + public Date getUpdateTime() {
  129 + return updateTime;
  130 + }
  131 +
  132 + public void setUpdateTime(Date updateTime) {
  133 + this.updateTime = updateTime;
  134 + }
  135 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +import java.util.Date;
  4 +
  5 +public class MessageRouterReciver {
  6 + private String id;
  7 +
  8 + private String rcvrTopic;
  9 +
  10 + private String messageRouterId;
  11 +
  12 + private Date creatTime;
  13 +
  14 + private Date updateTime;
  15 +
  16 + public String getId() {
  17 + return id;
  18 + }
  19 +
  20 + public void setId(String id) {
  21 + this.id = id == null ? null : id.trim();
  22 + }
  23 +
  24 + public String getRcvrTopic() {
  25 + return rcvrTopic;
  26 + }
  27 +
  28 + public void setRcvrTopic(String rcvrTopic) {
  29 + this.rcvrTopic = rcvrTopic == null ? null : rcvrTopic.trim();
  30 + }
  31 +
  32 + public String getMessageRouterId() {
  33 + return messageRouterId;
  34 + }
  35 +
  36 + public void setMessageRouterId(String messageRouterId) {
  37 + this.messageRouterId = messageRouterId == null ? null : messageRouterId.trim();
  38 + }
  39 +
  40 + public Date getCreatTime() {
  41 + return creatTime;
  42 + }
  43 +
  44 + public void setCreatTime(Date creatTime) {
  45 + this.creatTime = creatTime;
  46 + }
  47 +
  48 + public Date getUpdateTime() {
  49 + return updateTime;
  50 + }
  51 +
  52 + public void setUpdateTime(Date updateTime) {
  53 + this.updateTime = updateTime;
  54 + }
  55 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +import java.util.Date;
  4 +
  5 +public class MessageRouterReciverFilter {
  6 + private String id;
  7 +
  8 + private String filter;
  9 +
  10 + private String filterValue;
  11 +
  12 + private String type;
  13 +
  14 + private Boolean status;
  15 +
  16 + private String messageRouterReciverId;
  17 +
  18 + private Date creatTime;
  19 +
  20 + private Date updateTime;
  21 +
  22 + public String getId() {
  23 + return id;
  24 + }
  25 +
  26 + public void setId(String id) {
  27 + this.id = id == null ? null : id.trim();
  28 + }
  29 +
  30 + public String getFilter() {
  31 + return filter;
  32 + }
  33 +
  34 + public void setFilter(String filter) {
  35 + this.filter = filter == null ? null : filter.trim();
  36 + }
  37 +
  38 + public String getFilterValue() {
  39 + return filterValue;
  40 + }
  41 +
  42 + public void setFilterValue(String filterValue) {
  43 + this.filterValue = filterValue == null ? null : filterValue.trim();
  44 + }
  45 +
  46 + public String getType() {
  47 + return type;
  48 + }
  49 +
  50 + public void setType(String type) {
  51 + this.type = type == null ? null : type.trim();
  52 + }
  53 +
  54 + public Boolean getStatus() {
  55 + return status;
  56 + }
  57 +
  58 + public void setStatus(Boolean status) {
  59 + this.status = status;
  60 + }
  61 +
  62 + public String getMessageRouterReciverId() {
  63 + return messageRouterReciverId;
  64 + }
  65 +
  66 + public void setMessageRouterReciverId(String messageRouterReciverId) {
  67 + this.messageRouterReciverId = messageRouterReciverId == null ? null : messageRouterReciverId.trim();
  68 + }
  69 +
  70 + public Date getCreatTime() {
  71 + return creatTime;
  72 + }
  73 +
  74 + public void setCreatTime(Date creatTime) {
  75 + this.creatTime = creatTime;
  76 + }
  77 +
  78 + public Date getUpdateTime() {
  79 + return updateTime;
  80 + }
  81 +
  82 + public void setUpdateTime(Date updateTime) {
  83 + this.updateTime = updateTime;
  84 + }
  85 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +import java.util.List;
  4 +
  5 +/**
  6 + * @author mrz
  7 + */
  8 +public class MessageType extends BasePage{
  9 + private String id;
  10 +
  11 + private String name;
  12 +
  13 + private String parentId;
  14 +
  15 + private String des;
  16 +
  17 + private Integer type;
  18 +
  19 + private List<MessageType> children;
  20 +
  21 + public String getId() {
  22 + return id;
  23 + }
  24 +
  25 + public void setId(String id) {
  26 + this.id = id == null ? null : id.trim();
  27 + }
  28 +
  29 + public String getName() {
  30 + return name;
  31 + }
  32 +
  33 + public void setName(String name) {
  34 + this.name = name == null ? null : name.trim();
  35 + }
  36 +
  37 + public String getParentId() {
  38 + return parentId;
  39 + }
  40 +
  41 + public void setParentId(String parentId) {
  42 + this.parentId = parentId == null ? null : parentId.trim();
  43 + }
  44 +
  45 + public String getDes() {
  46 + return des;
  47 + }
  48 +
  49 + public void setDes(String des) {
  50 + this.des = des == null ? null : des.trim();
  51 + }
  52 +
  53 + public Integer getType() {
  54 + return type;
  55 + }
  56 +
  57 + public void setType(Integer type) {
  58 + this.type = type;
  59 + }
  60 +
  61 + public List<MessageType> getChildren() {
  62 + return children;
  63 + }
  64 +
  65 + public void setChildren(List<MessageType> children) {
  66 + this.children = children;
  67 + }
  68 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +public class Stype {
  4 + private String id;
  5 +
  6 + private String name;
  7 +
  8 + private String des;
  9 +
  10 + private String parentId;
  11 +
  12 + public String getId() {
  13 + return id;
  14 + }
  15 +
  16 + public void setId(String id) {
  17 + this.id = id == null ? null : id.trim();
  18 + }
  19 +
  20 + public String getName() {
  21 + return name;
  22 + }
  23 +
  24 + public void setName(String name) {
  25 + this.name = name == null ? null : name.trim();
  26 + }
  27 +
  28 + public String getDes() {
  29 + return des;
  30 + }
  31 +
  32 + public void setDes(String des) {
  33 + this.des = des == null ? null : des.trim();
  34 + }
  35 +
  36 + public String getParentId() {
  37 + return parentId;
  38 + }
  39 +
  40 + public void setParentId(String parentId) {
  41 + this.parentId = parentId == null ? null : parentId.trim();
  42 + }
  43 +}
  1 +package com.sunyo.wlpt.message.bus.service.model;
  2 +
  3 +public class UserTopic {
  4 + private String id;
  5 +
  6 + private String userId;
  7 +
  8 + private String busQueueId;
  9 +
  10 + private String username;
  11 +
  12 + private String topic;
  13 +
  14 + public String getId() {
  15 + return id;
  16 + }
  17 +
  18 + public void setId(String id) {
  19 + this.id = id == null ? null : id.trim();
  20 + }
  21 +
  22 + public String getUserId() {
  23 + return userId;
  24 + }
  25 +
  26 + public void setUserId(String userId) {
  27 + this.userId = userId == null ? null : userId.trim();
  28 + }
  29 +
  30 + public String getBusQueueId() {
  31 + return busQueueId;
  32 + }
  33 +
  34 + public void setBusQueueId(String busQueueId) {
  35 + this.busQueueId = busQueueId == null ? null : busQueueId.trim();
  36 + }
  37 +
  38 + public String getUsername() {
  39 + return username;
  40 + }
  41 +
  42 + public void setUsername(String username) {
  43 + this.username = username == null ? null : username.trim();
  44 + }
  45 +
  46 + public String getTopic() {
  47 + return topic;
  48 + }
  49 +
  50 + public void setTopic(String topic) {
  51 + this.topic = topic == null ? null : topic.trim();
  52 + }
  53 +
  54 + public UserTopic(String id, String userId, String busQueueId, String username, String topic) {
  55 + this.id = id;
  56 + this.userId = userId;
  57 + this.busQueueId = busQueueId;
  58 + this.username = username;
  59 + this.topic = topic;
  60 + }
  61 + public UserTopic(){
  62 +
  63 + }
  64 +}
@@ -150,7 +150,7 @@ public class ResultJson<T> implements Serializable { @@ -150,7 +150,7 @@ public class ResultJson<T> implements Serializable {
150 } else if (e.getCode() == CustomExceptionType.SYSTEM_ERROR.getCode()) { 150 } else if (e.getCode() == CustomExceptionType.SYSTEM_ERROR.getCode()) {
151 result.setMsg(e.getMessage() + ";请将该异常发送给管理员"); 151 result.setMsg(e.getMessage() + ";请将该异常发送给管理员");
152 } else { 152 } else {
153 - result.setMsg("系统出现未知异常,请联系管理员!"); 153 + result.setMsg(e.getMessage());
154 } 154 }
155 // 可以尝试着做异常信息持久化 155 // 可以尝试着做异常信息持久化
156 return result; 156 return result;
1 package com.sunyo.wlpt.message.bus.service.service; 1 package com.sunyo.wlpt.message.bus.service.service;
2 2
3 import com.sunyo.wlpt.message.bus.service.domain.es.ElasticSearchInfo; 3 import com.sunyo.wlpt.message.bus.service.domain.es.ElasticSearchInfo;
  4 +import com.sunyo.wlpt.message.bus.service.model.ESPage;
4 import com.sunyo.wlpt.message.bus.service.response.ResultJson; 5 import com.sunyo.wlpt.message.bus.service.response.ResultJson;
  6 +import org.elasticsearch.search.builder.SearchSourceBuilder;
  7 +import org.springframework.data.domain.Page;
5 8
6 import java.util.List; 9 import java.util.List;
7 10
@@ -84,6 +87,8 @@ public interface ElasticSearchInfoService { @@ -84,6 +87,8 @@ public interface ElasticSearchInfoService {
84 * @return 87 * @return
85 */ 88 */
86 ResultJson batchRemoveByIds(String ids); 89 ResultJson batchRemoveByIds(String ids);
  90 +
  91 + <T> Page<T> search(SearchSourceBuilder builder, Class<T> c, ESPage page);
87 } 92 }
88 93
89 94
  1 +package com.sunyo.wlpt.message.bus.service.service;
  2 +
  3 +import com.sunyo.wlpt.message.bus.service.model.ConsumerGroupOffsets;
  4 +
  5 +import java.util.List;
  6 +
  7 +public interface KafkaService {
  8 +
  9 + boolean addTopic(String TopicName,int partitionNum);
  10 +
  11 + public void updateAdminclient();
  12 +
  13 + List<ConsumerGroupOffsets> queueMonitor();
  14 +}
  1 +package com.sunyo.wlpt.message.bus.service.service;
  2 +
  3 +import com.github.pagehelper.PageHelper;
  4 +import com.github.pagehelper.PageInfo;
  5 +import com.sunyo.wlpt.message.bus.service.model.MessageType;
  6 +
  7 +import java.util.List;
  8 +
  9 +public interface MessageTypeService {
  10 +
  11 + PageInfo<MessageType> pageList(MessageType record);
  12 +
  13 + List<MessageType> list(MessageType messageType);
  14 +
  15 +}
@@ -2,15 +2,19 @@ package com.sunyo.wlpt.message.bus.service.service.impl; @@ -2,15 +2,19 @@ package com.sunyo.wlpt.message.bus.service.service.impl;
2 2
3 import com.github.pagehelper.PageHelper; 3 import com.github.pagehelper.PageHelper;
4 import com.github.pagehelper.PageInfo; 4 import com.github.pagehelper.PageInfo;
  5 +import com.sunyo.wlpt.message.bus.service.exception.CustomException;
  6 +import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType;
  7 +import com.sunyo.wlpt.message.bus.service.mapper.*;
5 import com.sunyo.wlpt.message.bus.service.domain.BusQueue; 8 import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
6 -import com.sunyo.wlpt.message.bus.service.mapper.BusQueueMapper;  
7 -import com.sunyo.wlpt.message.bus.service.mapper.UserInfoMapper;  
8 -import com.sunyo.wlpt.message.bus.service.mapper.UserMessageBindingMapper; 9 +import com.sunyo.wlpt.message.bus.service.model.ConsumerGroup;
  10 +import com.sunyo.wlpt.message.bus.service.model.UserTopic;
9 import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils; 11 import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
10 import com.sunyo.wlpt.message.bus.service.response.ResultJson; 12 import com.sunyo.wlpt.message.bus.service.response.ResultJson;
11 import com.sunyo.wlpt.message.bus.service.service.BusQueueService; 13 import com.sunyo.wlpt.message.bus.service.service.BusQueueService;
  14 +import com.sunyo.wlpt.message.bus.service.service.KafkaService;
12 import com.sunyo.wlpt.message.bus.service.utils.IdUtils; 15 import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
13 import io.netty.util.internal.StringUtil; 16 import io.netty.util.internal.StringUtil;
  17 +import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.stereotype.Service; 18 import org.springframework.stereotype.Service;
15 import org.springframework.transaction.annotation.Propagation; 19 import org.springframework.transaction.annotation.Propagation;
16 import org.springframework.transaction.annotation.Transactional; 20 import org.springframework.transaction.annotation.Transactional;
@@ -38,10 +42,19 @@ public class BusQueueServiceImpl implements BusQueueService { @@ -38,10 +42,19 @@ public class BusQueueServiceImpl implements BusQueueService {
38 @Resource 42 @Resource
39 private UserInfoMapper userInfoMapper; 43 private UserInfoMapper userInfoMapper;
40 44
  45 + @Resource
  46 + UserTopicMapper userTopicMapper;
  47 +
41 48
42 @Resource 49 @Resource
43 private UserMessageBindingMapper userMessageBindingMapper; 50 private UserMessageBindingMapper userMessageBindingMapper;
44 51
  52 + @Resource
  53 + private ConsumerGroupMapper consumerGroupMapper;
  54 +
  55 + @Autowired
  56 + KafkaService kafkaService;
  57 +
45 58
46 @Override 59 @Override
47 @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED) 60 @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
@@ -119,19 +132,34 @@ public class BusQueueServiceImpl implements BusQueueService { @@ -119,19 +132,34 @@ public class BusQueueServiceImpl implements BusQueueService {
119 * <p> 132 * <p>
120 * 存储创建的队列信息于数据库 133 * 存储创建的队列信息于数据库
121 */ 134 */
  135 + @Transactional(rollbackFor = Exception.class)
122 @Override 136 @Override
123 - public ResultJson insertSelective(BusQueue record) throws IOException, TimeoutException  
124 - {  
125 - if (userInfoMapper.selectUserExist(record.getUsername()).size() == 0) {  
126 - return new ResultJson<>("400", "该MQ用户信息,不存在");  
127 - }  
128 - if (userInfoMapper.selectByBusQueue(record).size() == 0) {  
129 - return new ResultJson<>("400", "该MQ用户关系,不存在");  
130 - } 137 + public ResultJson insertSelective(BusQueue record) {
  138 +
  139 + String id = IdUtils.generateId();
  140 + ConsumerGroup consumerGroup =new ConsumerGroup();
  141 + consumerGroup.setId(id);
  142 + consumerGroup.setName(record.getConsumerGroupName());
  143 + consumerGroup.setUserId(record.getUserId());
  144 + consumerGroup.setUsername(record.getUsername());
  145 + int i =consumerGroupMapper.insertSelective(consumerGroup);
  146 +
  147 + UserTopic userTopic = new UserTopic(id,record.getUserId(),id,record.getUsername(),record.getQueueName());
  148 + int iii = userTopicMapper.insertSelective(userTopic);
  149 +// if (true){
  150 +// throw new RuntimeException("test");
  151 +// }
131 // 设置id 152 // 设置id
132 - record.setId(IdUtils.generateId());  
133 - rabbitUtils.toCreateQueue(record);  
134 - return busQueueMapper.insertSelective(record) > 0 153 + record.setId(id);
  154 + int ii = busQueueMapper.insertSelective(record);
  155 +
  156 +
  157 + //插入到kafka服务topic
  158 + boolean addResult = kafkaService.addTopic(record.getQueueName(),record.getPartitionCount());
  159 + if (!addResult){
  160 + throw new CustomException(CustomExceptionType.KAFKA_QUEUE_ADD_ERR);
  161 + }
  162 + return ii > 0
135 ? new ResultJson<>("200", "添加消息队列,成功") 163 ? new ResultJson<>("200", "添加消息队列,成功")
136 : new ResultJson<>("500", "添加消息队列,失败"); 164 : new ResultJson<>("500", "添加消息队列,失败");
137 } 165 }
@@ -8,6 +8,7 @@ import com.sunyo.wlpt.message.bus.service.mapper.UserInfoMapper; @@ -8,6 +8,7 @@ import com.sunyo.wlpt.message.bus.service.mapper.UserInfoMapper;
8 import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils; 8 import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
9 import com.sunyo.wlpt.message.bus.service.service.*; 9 import com.sunyo.wlpt.message.bus.service.service.*;
10 import com.sunyo.wlpt.message.bus.service.utils.AESUtils; 10 import com.sunyo.wlpt.message.bus.service.utils.AESUtils;
  11 +import org.springframework.beans.factory.annotation.Autowired;
11 import org.springframework.stereotype.Service; 12 import org.springframework.stereotype.Service;
12 import org.springframework.transaction.annotation.Propagation; 13 import org.springframework.transaction.annotation.Propagation;
13 import org.springframework.transaction.annotation.Transactional; 14 import org.springframework.transaction.annotation.Transactional;
@@ -50,6 +51,7 @@ public class BusServerServiceImpl implements BusServerService { @@ -50,6 +51,7 @@ public class BusServerServiceImpl implements BusServerService {
50 private BusServerMapper busServerMapper; 51 private BusServerMapper busServerMapper;
51 52
52 53
  54 +
53 @Override 55 @Override
54 public PageInfo selectBusServerList(BusServer busServer, Integer pageNum, Integer pageSize) 56 public PageInfo selectBusServerList(BusServer busServer, Integer pageNum, Integer pageSize)
55 { 57 {
@@ -110,6 +112,7 @@ public class BusServerServiceImpl implements BusServerService { @@ -110,6 +112,7 @@ public class BusServerServiceImpl implements BusServerService {
110 // 超级用户的密码,使用AES加密 112 // 超级用户的密码,使用AES加密
111 String superPassword = server.getSuperPassword(); 113 String superPassword = server.getSuperPassword();
112 server.setSuperPassword(AESUtils.encrypt(superPassword)); 114 server.setSuperPassword(AESUtils.encrypt(superPassword));
  115 +
113 return busServerMapper.insertSelective(server); 116 return busServerMapper.insertSelective(server);
114 } 117 }
115 118
1 package com.sunyo.wlpt.message.bus.service.service.impl; 1 package com.sunyo.wlpt.message.bus.service.service.impl;
2 2
  3 +import com.alibaba.fastjson.JSON;
3 import com.github.pagehelper.PageHelper; 4 import com.github.pagehelper.PageHelper;
4 import com.github.pagehelper.PageInfo; 5 import com.github.pagehelper.PageInfo;
5 import com.sunyo.wlpt.message.bus.service.domain.es.ElasticSearchInfo; 6 import com.sunyo.wlpt.message.bus.service.domain.es.ElasticSearchInfo;
6 import com.sunyo.wlpt.message.bus.service.mapper.ElasticSearchInfoMapper; 7 import com.sunyo.wlpt.message.bus.service.mapper.ElasticSearchInfoMapper;
  8 +
  9 +import com.sunyo.wlpt.message.bus.service.model.ESPage;
  10 +
7 import com.sunyo.wlpt.message.bus.service.response.ResultJson; 11 import com.sunyo.wlpt.message.bus.service.response.ResultJson;
8 import com.sunyo.wlpt.message.bus.service.service.ElasticSearchInfoService; 12 import com.sunyo.wlpt.message.bus.service.service.ElasticSearchInfoService;
9 import com.sunyo.wlpt.message.bus.service.utils.IdUtils; 13 import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
10 import io.netty.util.internal.StringUtil; 14 import io.netty.util.internal.StringUtil;
  15 +import lombok.extern.slf4j.Slf4j;
  16 +import org.elasticsearch.action.search.SearchRequest;
  17 +import org.elasticsearch.action.search.SearchResponse;
  18 +import org.elasticsearch.client.RequestOptions;
  19 +import org.elasticsearch.client.RestHighLevelClient;
  20 +import org.elasticsearch.search.SearchHit;
  21 +import org.elasticsearch.search.SearchHits;
  22 +import org.elasticsearch.search.builder.SearchSourceBuilder;
  23 +import org.elasticsearch.search.sort.SortOrder;
  24 +import org.springframework.beans.factory.annotation.Autowired;
  25 +import org.springframework.beans.factory.annotation.Qualifier;
  26 +import org.springframework.data.domain.Page;
  27 +import org.springframework.data.domain.PageImpl;
  28 +import org.springframework.data.domain.Sort;
11 import org.springframework.stereotype.Service; 29 import org.springframework.stereotype.Service;
12 30
13 import javax.annotation.Resource; 31 import javax.annotation.Resource;
  32 +import java.util.ArrayList;
  33 +import java.util.Iterator;
14 import java.util.List; 34 import java.util.List;
15 35
16 import static com.sunyo.wlpt.message.bus.service.common.Constant.RESULT_SUCCESS; 36 import static com.sunyo.wlpt.message.bus.service.common.Constant.RESULT_SUCCESS;
@@ -20,12 +40,23 @@ import static com.sunyo.wlpt.message.bus.service.common.Constant.RESULT_SUCCESS; @@ -20,12 +40,23 @@ import static com.sunyo.wlpt.message.bus.service.common.Constant.RESULT_SUCCESS;
20 * Description: 40 * Description:
21 * 时间:2020/9/8 15:49 41 * 时间:2020/9/8 15:49
22 */ 42 */
  43 +@Slf4j
23 @Service 44 @Service
24 public class ElasticSearchInfoServiceImpl implements ElasticSearchInfoService { 45 public class ElasticSearchInfoServiceImpl implements ElasticSearchInfoService {
25 46
26 @Resource 47 @Resource
27 private ElasticSearchInfoMapper elasticSearchInfoMapper; 48 private ElasticSearchInfoMapper elasticSearchInfoMapper;
28 49
  50 +
  51 + @Qualifier("EsHighLevelClient")
  52 + @Resource
  53 + private RestHighLevelClient restHighLevelClient;
  54 +
  55 + /**
  56 + * 索引名称
  57 + */
  58 + private static final String INDEX_NAME = "messagebus";
  59 +
29 @Override 60 @Override
30 public ResultJson deleteByPrimaryKey(String id) 61 public ResultJson deleteByPrimaryKey(String id)
31 { 62 {
@@ -199,6 +230,58 @@ public class ElasticSearchInfoServiceImpl implements ElasticSearchInfoService { @@ -199,6 +230,58 @@ public class ElasticSearchInfoServiceImpl implements ElasticSearchInfoService {
199 : ResultJson.success("新增ES信息,通过检验!"); 230 : ResultJson.success("新增ES信息,通过检验!");
200 } 231 }
201 232
  233 + /**
  234 + * @author WCNGS@QQ.COM
  235 + * @See
  236 + * @date 2019/10/17 17:14
  237 + * @param builder 查询参数
  238 + * @param c 结果类对象
  239 + * @return java.util.List<T>
  240 + * @throws
  241 + * @since
  242 + */
  243 + @Override
  244 + public <T> Page<T> search( SearchSourceBuilder builder, Class<T> c,ESPage page) {
  245 + SearchRequest request = new SearchRequest(INDEX_NAME);
  246 + builder = wrapperBuilder(builder,page);
  247 + request.source(builder);
  248 + try {
  249 + SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
  250 + SearchHits searchHits = response.getHits();
  251 + SearchHit[] hits = searchHits.getHits();
  252 + long total = searchHits.getTotalHits().value;
  253 + log.info("搜索总条数:{}",total);
  254 + List<T> res = new ArrayList<>(hits.length);
  255 +
  256 + String s = "";
  257 + for (SearchHit hit : hits) {
  258 + s += hit.getSourceAsString()+"\n";
  259 + res.add(JSON.parseObject(hit.getSourceAsString(), c));
  260 + }
  261 +
  262 + //返回分页实体
  263 + Page<T> ResponsePage =new PageImpl<T>(res,page,total);
  264 + log.info("查询到{}条记录,分别是s:{}",hits.length,s);
  265 + return ResponsePage;
  266 + } catch (Exception e) {
  267 + throw new RuntimeException(e);
  268 + }
  269 + }
  270 +
  271 + private SearchSourceBuilder wrapperBuilder(SearchSourceBuilder builder, ESPage pageable) {
  272 + builder.from(pageable.getPageNumber() * pageable.getPageSize());
  273 + builder.size(pageable.getPageSize());
  274 + Sort sort = pageable.getSort();
  275 + Iterator iterator = sort.iterator();
  276 + while (iterator.hasNext()) {
  277 + Sort.Order order = (Sort.Order) iterator.next();
  278 + //用keyword字段来排序,所以在建立索引的时候,就必须同步建立keyword字段
  279 + builder.sort(order.getProperty(), order.getDirection() == Sort.Direction.ASC ? SortOrder.ASC : SortOrder.DESC);
  280 + }
  281 + return builder;
  282 + }
  283 +
  284 +
202 } 285 }
203 286
204 287
@@ -51,7 +51,7 @@ public class ElasticsearchService { @@ -51,7 +51,7 @@ public class ElasticsearchService {
51 public ResultJson selectMessageNoteList(MessageNote messageNote, Integer pageNum, Integer pageSize) throws IOException 51 public ResultJson selectMessageNoteList(MessageNote messageNote, Integer pageNum, Integer pageSize) throws IOException
52 { 52 {
53 // 条件搜索 53 // 条件搜索
54 - SearchRequest searchRequest = new SearchRequest("message"); 54 + SearchRequest searchRequest = new SearchRequest("es01");
55 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 55 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
56 56
57 // 高亮设置 57 // 高亮设置
@@ -110,7 +110,7 @@ public class ElasticsearchService { @@ -110,7 +110,7 @@ public class ElasticsearchService {
110 } 110 }
111 if (!StringUtil.isNullOrEmpty(messageNote.getAlias_sendContent())) { 111 if (!StringUtil.isNullOrEmpty(messageNote.getAlias_sendContent())) {
112 String content = messageNote.getAlias_sendContent(); 112 String content = messageNote.getAlias_sendContent();
113 - WildcardQueryBuilder sendContentQuery = QueryBuilders.wildcardQuery("alias_sendContent", content.matches(".*[a-zA-z].*") ? content.toLowerCase() : content); 113 + WildcardQueryBuilder sendContentQuery = QueryBuilders.wildcardQuery("name", content.matches(".*[a-zA-z].*") ? content.toLowerCase() : content);
114 boolQueryBuilder.must(sendContentQuery); 114 boolQueryBuilder.must(sendContentQuery);
115 } 115 }
116 if (messageNote.getSendTimeBegin() != null && messageNote.getSendTimeEnd() != null) { 116 if (messageNote.getSendTimeBegin() != null && messageNote.getSendTimeEnd() != null) {
  1 +package com.sunyo.wlpt.message.bus.service.service.impl;
  2 +
  3 +import com.github.pagehelper.PageHelper;
  4 +import com.github.pagehelper.PageInfo;
  5 +import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
  6 +import com.sunyo.wlpt.message.bus.service.mapper.MessageTypeMapper;
  7 +import com.sunyo.wlpt.message.bus.service.model.MessageType;
  8 +import com.sunyo.wlpt.message.bus.service.service.MessageTypeService;
  9 +import org.springframework.stereotype.Service;
  10 +
  11 +import javax.annotation.Resource;
  12 +import java.util.List;
  13 +
  14 +@Service
  15 +public class MessageTypeServiceImp implements MessageTypeService {
  16 +
  17 + @Resource
  18 + MessageTypeMapper messageTypeMapper;
  19 +
  20 + @Override
  21 + public List<MessageType> list(MessageType messageType) {
  22 + List<MessageType> list = messageTypeMapper.selectAll(messageType);
  23 + return list;
  24 + }
  25 +
  26 + @Override
  27 + public PageInfo<MessageType> pageList(MessageType messageType) {
  28 + PageHelper.startPage(messageType.getPageNum(), messageType.getPageSize());
  29 + List<MessageType> list = messageTypeMapper.selectAll(messageType);
  30 + PageInfo<MessageType> pageInfo = new PageInfo<>(list);
  31 + return pageInfo;
  32 + }
  33 +}
  1 +package com.sunyo.wlpt.message.bus.service.service.kafka;
  2 +
  3 +
  4 +import com.sunyo.wlpt.message.bus.service.domain.BusServer;
  5 +import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper;
  6 +import com.sunyo.wlpt.message.bus.service.mapper.ConsumerGroupMapper;
  7 +import com.sunyo.wlpt.message.bus.service.model.ConsumerGroup;
  8 +import com.sunyo.wlpt.message.bus.service.model.ConsumerGroupOffsets;
  9 +import com.sunyo.wlpt.message.bus.service.service.KafkaService;
  10 +import lombok.extern.slf4j.Slf4j;
  11 +import org.apache.kafka.clients.admin.*;
  12 +import org.apache.kafka.clients.consumer.KafkaConsumer;
  13 +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  14 +import org.apache.kafka.common.KafkaFuture;
  15 +import org.apache.kafka.common.TopicPartition;
  16 +import org.apache.kafka.common.TopicPartitionReplica;
  17 +import org.springframework.kafka.core.KafkaAdmin;
  18 +import org.springframework.stereotype.Service;
  19 +import javax.annotation.Resource;
  20 +import java.util.*;
  21 +import java.util.concurrent.ExecutionException;
  22 +import java.util.stream.Collectors;
  23 +
  24 +/**
  25 + * @author mrz
  26 + */
  27 +@Slf4j
  28 +@Service
  29 +public class KafkaServiceImp implements KafkaService {
  30 +
  31 + @Resource
  32 + private BusServerMapper busServerMapper;
  33 +
  34 + private static AdminClient KAFKA_ADMIN_CLIENT ;
  35 +
  36 + private static KafkaConsumer KAFKA_CONSUMER;
  37 +
  38 + @Resource
  39 + ConsumerGroupMapper consumerGroupMapper;
  40 +
  41 + /**
  42 + * 根据用户名列表批量添加topic
  43 + */
  44 + @Override
  45 + public boolean addTopic(String TopicName,int partitionNum){
  46 +
  47 + intAdminClient();
  48 + NewTopic newTopic = new NewTopic(TopicName, partitionNum, (short) 1);
  49 + List<NewTopic> topicList = Arrays.asList(newTopic);
  50 + KAFKA_ADMIN_CLIENT.createTopics(topicList);
  51 + return true;
  52 + }
  53 +
  54 + public void intAdminClient(){
  55 + if (KAFKA_ADMIN_CLIENT!=null){
  56 + return;
  57 + }else {
  58 + updateAdminclient();
  59 + }
  60 + }
  61 +
  62 + public void intConsumer(){
  63 + if (KAFKA_CONSUMER!=null){
  64 + return;
  65 + }else {
  66 + consumer();
  67 + }
  68 + }
  69 +
  70 + @Override
  71 + public void updateAdminclient(){
  72 + Map<String, Object> configs = new HashMap<>();
  73 + String serverMap = ServerListForMap();
  74 + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
  75 + serverMap);
  76 + configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  77 + configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  78 + configs.put(AdminClientConfig.CLIENT_ID_CONFIG,"MessageBusManager");
  79 + KafkaAdmin admin = new KafkaAdmin(configs);
  80 + KAFKA_ADMIN_CLIENT = AdminClient.create(admin.getConfig());
  81 + }
  82 + /**
  83 + * 获取topicList
  84 + */
  85 + public void topicList(){
  86 +
  87 + }
  88 +
  89 +
  90 + /**
  91 + * 删除kafka中的topic
  92 + */
  93 + public void delTopic(){
  94 +
  95 + }
  96 +
  97 + @Override
  98 + public List<ConsumerGroupOffsets> queueMonitor() {
  99 + List<ConsumerGroupOffsets> result = new ArrayList<ConsumerGroupOffsets>();
  100 + try{
  101 + intAdminClient();
  102 + intConsumer();
  103 +
  104 + /**
  105 + * 1. 获取consumerGroup 列表
  106 + */
  107 + List<String> consumerGroups = consumerGroupMapper.groups();
  108 +
  109 + for (String groupname :consumerGroups) {
  110 +
  111 + /**
  112 + * 2. 获取获取consumerGroup消费信息
  113 + */
  114 + ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult= KAFKA_ADMIN_CLIENT.listConsumerGroupOffsets(groupname);
  115 + listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get().forEach(
  116 + (TopicPartition k, OffsetAndMetadata v)-> {
  117 + /**
  118 + * 3.获取consumerGroup消费的每个topic的partion的lastoffset的值
  119 + */
  120 + TopicPartition topicPartition = new TopicPartition(k.topic(),k.partition());
  121 + long endoffset = getLogEndOffset(topicPartition);
  122 +
  123 + ConsumerGroupOffsets consumerGroupOffsets = new ConsumerGroupOffsets(groupname,k.topic(),k.partition(),v.offset(),endoffset);
  124 + consumerGroupOffsets.setLag();
  125 + result.add(consumerGroupOffsets);
  126 + log.info("cconsumerGroup:{},[topic]:{},[partition]:{},[offset]:{},[endoffset] = {} \n",
  127 + groupname,k.topic(),k.partition(),v.offset(),endoffset);
  128 +
  129 +
  130 + }
  131 + );
  132 + }
  133 + return result;
  134 +
  135 + }catch (ExecutionException e){
  136 +
  137 + }catch (InterruptedException e){
  138 +
  139 + }finally {
  140 +
  141 + }
  142 + return null;
  143 + }
  144 +
  145 + public long getLogEndOffset(TopicPartition topicPartition){
  146 + KAFKA_CONSUMER.assign(Arrays.asList(topicPartition));
  147 + KAFKA_CONSUMER.seekToEnd(Arrays.asList(topicPartition));
  148 + long endOffset = KAFKA_CONSUMER.position(topicPartition);
  149 + return endOffset;
  150 + }
  151 +
  152 + public void consumer(){
  153 + Properties props = new Properties();
  154 + props.put("bootstrap.servers", ServerListForMap());
  155 + props.put("group.id", "test");
  156 + props.put("enable.auto.commit", "true");
  157 + props.put("auto.offset.reset", "earliest");
  158 + props.put("auto.commit.interval.ms", "1000");
  159 + props.put("auto.commit.interval.ms", "1000");
  160 + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  161 + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  162 + KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  163 + KAFKA_CONSUMER= consumer;
  164 + }
  165 +
  166 + public String ServerListForMap(){
  167 + List<BusServer> serverList = busServerMapper.selectBusServerList(new BusServer());
  168 +
  169 + String KAFKA_SERVERS = serverList.stream().map(item -> {
  170 + return item.getServerIp()+":"+item.getServerPort();
  171 + })
  172 + .collect(Collectors.joining(","));
  173 + return KAFKA_SERVERS;
  174 + }
  175 +
  176 +}
  1 +<?xml version="1.0" encoding="UTF-8"?>
  2 +<!DOCTYPE generatorConfiguration
  3 + PUBLIC "-//mybatis.org//DTD MyBatis Generator Configuration 1.0//EN"
  4 + "http://mybatis.org/dtd/mybatis-generator-config_1_0.dtd">
  5 +<generatorConfiguration>
  6 + <!-- 数据库驱动:选择你的本地硬盘上面的数据库驱动包-->
  7 + <classPathEntry location="/Users/mrz/Documents/maven/mysql-connector-java-5.1.7-bin.jar"/>
  8 + <!--<classPathEntry location="/Users/mrz/Documents/maven/ojdbc6.jar"/>-->
  9 + <context id="DB2Tables" targetRuntime="MyBatis3">
  10 + <commentGenerator>
  11 + <property name="suppressDate" value="true"/>
  12 + <!-- 是否去除自动生成的注释 true:是 : false:否 -->
  13 + <property name="suppressAllComments" value="true"/>
  14 + </commentGenerator>
  15 + <!--数据库链接URL,用户名、密码 -->
  16 + <jdbcConnection driverClass="com.mysql.jdbc.Driver"
  17 + connectionURL="jdbc:mysql://118.31.66.166:3306/bus_service"
  18 + userId="110"
  19 + password="QAHqCJf2kFYCLirM">
  20 + </jdbcConnection>
  21 + <!--<jdbcConnection driverClass="oracle.jdbc.driver.OracleDriver"-->
  22 + <!--connectionURL="jdbc:oracle:thin:@10.50.3.68:1521:CGODW"-->
  23 + <!--userId="CGOETL"-->
  24 + <!--password="1q2w3e4r">-->
  25 + <!--</jdbcConnection>-->
  26 + <!-- 默认false,把JDBC DECIMAL 和 NUMERIC 类型解析为 Integer,为 true时把JDBC DECIMAL 和
  27 + NUMERIC 类型解析为java.math.BigDecimal -->
  28 + <!--<javaTypeResolver>-->
  29 + <!--<property name="forceBigDecimals" value="true" />-->
  30 + <!--</javaTypeResolver>-->
  31 + <javaTypeResolver>
  32 + <property name="forceBigDecimals" value="false"/>
  33 + </javaTypeResolver>
  34 + <!-- 生成模型的包名和位置-->
  35 + <javaModelGenerator targetPackage="com.sunyo.wlpt.message.bus.service.model" targetProject="src/main/java">
  36 + <property name="enableSubPackages" value="true"/>
  37 + <property name="trimStrings" value="true"/>
  38 + </javaModelGenerator>
  39 + <!-- 生成映射文件的包名和位置-->
  40 + <sqlMapGenerator targetPackage="mapper" targetProject="src/main/resources">
  41 + <property name="enableSubPackages" value="true"/>
  42 + </sqlMapGenerator>
  43 + <!-- 生成DAO的包名和位置-->
  44 + <javaClientGenerator type="XMLMAPPER" targetPackage="com.sunyo.wlpt.message.bus.service.mapper" targetProject="src/main/java">
  45 + <property name="enableSubPackages" value="true"/>
  46 + </javaClientGenerator>
  47 + <!-- 要生成的表 tableName是数据库中的表名或视图名 domainObjectName是实体类名-->
  48 + <table tableName="message_router_reciver_filter" domainObjectName="MessageRouterReciverFilter" enableCountByExample="false" enableUpdateByExample="false" enableDeleteByExample="false" enableSelectByExample="false" selectByExampleQueryId="false"></table>
  49 + </context>
  50 +</generatorConfiguration>
  1 +<?xml version="1.0" encoding="UTF-8" ?>
  2 +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3 +<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.BtypeMapper" >
  4 + <resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.model.Btype" >
  5 + <id column="id" property="id" jdbcType="VARCHAR" />
  6 + <result column="name" property="name" jdbcType="VARCHAR" />
  7 + <result column="parent_id" property="parentId" jdbcType="VARCHAR" />
  8 + <result column="des" property="des" jdbcType="VARCHAR" />
  9 + </resultMap>
  10 + <sql id="Base_Column_List" >
  11 + id, name, parent_id, des
  12 + </sql>
  13 + <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
  14 + select
  15 + <include refid="Base_Column_List" />
  16 + from btype
  17 + where id = #{id,jdbcType=VARCHAR}
  18 + </select>
  19 + <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
  20 + delete from btype
  21 + where id = #{id,jdbcType=VARCHAR}
  22 + </delete>
  23 + <insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.model.Btype" >
  24 + insert into btype (id, name, parent_id,
  25 + des)
  26 + values (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR}, #{parentId,jdbcType=VARCHAR},
  27 + #{des,jdbcType=VARCHAR})
  28 + </insert>
  29 + <insert id="insertSelective" parameterType="com.sunyo.wlpt.message.bus.service.model.Btype" >
  30 + insert into btype
  31 + <trim prefix="(" suffix=")" suffixOverrides="," >
  32 + <if test="id != null" >
  33 + id,
  34 + </if>
  35 + <if test="name != null" >
  36 + name,
  37 + </if>
  38 + <if test="parentId != null" >
  39 + parent_id,
  40 + </if>
  41 + <if test="des != null" >
  42 + des,
  43 + </if>
  44 + </trim>
  45 + <trim prefix="values (" suffix=")" suffixOverrides="," >
  46 + <if test="id != null" >
  47 + #{id,jdbcType=VARCHAR},
  48 + </if>
  49 + <if test="name != null" >
  50 + #{name,jdbcType=VARCHAR},
  51 + </if>
  52 + <if test="parentId != null" >
  53 + #{parentId,jdbcType=VARCHAR},
  54 + </if>
  55 + <if test="des != null" >
  56 + #{des,jdbcType=VARCHAR},
  57 + </if>
  58 + </trim>
  59 + </insert>
  60 + <update id="updateByPrimaryKeySelective" parameterType="com.sunyo.wlpt.message.bus.service.model.Btype" >
  61 + update btype
  62 + <set >
  63 + <if test="name != null" >
  64 + name = #{name,jdbcType=VARCHAR},
  65 + </if>
  66 + <if test="parentId != null" >
  67 + parent_id = #{parentId,jdbcType=VARCHAR},
  68 + </if>
  69 + <if test="des != null" >
  70 + des = #{des,jdbcType=VARCHAR},
  71 + </if>
  72 + </set>
  73 + where id = #{id,jdbcType=VARCHAR}
  74 + </update>
  75 + <update id="updateByPrimaryKey" parameterType="com.sunyo.wlpt.message.bus.service.model.Btype" >
  76 + update btype
  77 + set name = #{name,jdbcType=VARCHAR},
  78 + parent_id = #{parentId,jdbcType=VARCHAR},
  79 + des = #{des,jdbcType=VARCHAR}
  80 + where id = #{id,jdbcType=VARCHAR}
  81 + </update>
  82 +</mapper>
1 <?xml version="1.0" encoding="UTF-8"?> 1 <?xml version="1.0" encoding="UTF-8"?>
2 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> 2 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
3 <mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.BusQueueMapper"> 3 <mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.BusQueueMapper">
4 - <cache-ref namespace="com.sunyo.wlpt.message.bus.service.mapper.VirtualHostMapper"/>  
5 <resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.domain.BusQueue"> 4 <resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.domain.BusQueue">
6 <!--@mbg.generated--> 5 <!--@mbg.generated-->
7 <!--@Table bus_queue--> 6 <!--@Table bus_queue-->
@@ -16,6 +15,9 @@ @@ -16,6 +15,9 @@
16 <result column="description" jdbcType="VARCHAR" property="description"/> 15 <result column="description" jdbcType="VARCHAR" property="description"/>
17 <result column="gmt_create" jdbcType="TIMESTAMP" property="gmtCreate"/> 16 <result column="gmt_create" jdbcType="TIMESTAMP" property="gmtCreate"/>
18 <result column="gmt_modified" jdbcType="TIMESTAMP" property="gmtModified"/> 17 <result column="gmt_modified" jdbcType="TIMESTAMP" property="gmtModified"/>
  18 + <result column="queue_type" jdbcType="INTEGER" property="queueType"/>
  19 + <result column="server_type" jdbcType="INTEGER" property="serverType"/>
  20 + <result column="partition_count" jdbcType="INTEGER" property="partitionCount"/>
19 </resultMap> 21 </resultMap>
20 22
21 <!-- 该Mapper映射关系的作用,是队列与虚拟主机的1:1的关系映射 --> 23 <!-- 该Mapper映射关系的作用,是队列与虚拟主机的1:1的关系映射 -->
@@ -29,7 +31,7 @@ @@ -29,7 +31,7 @@
29 <sql id="Base_Column_List"> 31 <sql id="Base_Column_List">
30 <!--@mbg.generated--> 32 <!--@mbg.generated-->
31 id, queue_name, user_id, username, virtual_host_id, durability, auto_delete, arguments, 33 id, queue_name, user_id, username, virtual_host_id, durability, auto_delete, arguments,
32 - description, gmt_create, gmt_modified 34 + description, gmt_create, gmt_modified, queue_type, partition_count, server_type
33 </sql> 35 </sql>
34 36
35 <select id="selectByPrimaryKey" parameterType="java.lang.String" resultMap="BaseResultMap"> 37 <select id="selectByPrimaryKey" parameterType="java.lang.String" resultMap="BaseResultMap">
@@ -109,6 +111,12 @@ @@ -109,6 +111,12 @@
109 <if test="gmtModified != null"> 111 <if test="gmtModified != null">
110 gmt_modified, 112 gmt_modified,
111 </if> 113 </if>
  114 + <if test="partitionCount >0">
  115 + partition_count,
  116 + </if>
  117 + <if test="queueType >0">
  118 + queue_type,
  119 + </if>
112 </trim> 120 </trim>
113 <trim prefix="values (" suffix=")" suffixOverrides=","> 121 <trim prefix="values (" suffix=")" suffixOverrides=",">
114 <if test="id != null"> 122 <if test="id != null">
@@ -144,6 +152,12 @@ @@ -144,6 +152,12 @@
144 <if test="gmtModified != null"> 152 <if test="gmtModified != null">
145 #{gmtModified,jdbcType=TIMESTAMP}, 153 #{gmtModified,jdbcType=TIMESTAMP},
146 </if> 154 </if>
  155 + <if test="partitionCount >0">
  156 + #{partitionCount,jdbcType=TIMESTAMP},
  157 + </if>
  158 + <if test="queueType >0">
  159 + #{partitionCount,jdbcType=TIMESTAMP},
  160 + </if>
147 </trim> 161 </trim>
148 </insert> 162 </insert>
149 <update id="updateByPrimaryKeySelective" parameterType="com.sunyo.wlpt.message.bus.service.domain.BusQueue"> 163 <update id="updateByPrimaryKeySelective" parameterType="com.sunyo.wlpt.message.bus.service.domain.BusQueue">
@@ -180,6 +194,9 @@ @@ -180,6 +194,9 @@
180 <if test="gmtModified != null"> 194 <if test="gmtModified != null">
181 gmt_modified = #{gmtModified,jdbcType=TIMESTAMP}, 195 gmt_modified = #{gmtModified,jdbcType=TIMESTAMP},
182 </if> 196 </if>
  197 + <if test="partitionCount >0">
  198 + partition_count = #{partitionCount,jdbcType=INTEGER},
  199 + </if>
183 </set> 200 </set>
184 where id = #{id,jdbcType=VARCHAR} 201 where id = #{id,jdbcType=VARCHAR}
185 </update> 202 </update>
@@ -213,14 +230,9 @@ @@ -213,14 +230,9 @@
213 q.username, 230 q.username,
214 q.gmt_create, 231 q.gmt_create,
215 q.gmt_modified, 232 q.gmt_modified,
216 - v.virtual_host_name  
217 - from bus_queue as q,  
218 - virtual_host v 233 + q.partition_count
  234 + from bus_queue as q
219 <where> 235 <where>
220 - <!-- 所属虚拟主机Id -->  
221 - <if test="virtualHostId != null and virtualHostId != ''">  
222 - virtual_host_id = #{virtualHostId,jdbcType=VARCHAR}  
223 - </if>  
224 <!-- 用户名称 --> 236 <!-- 用户名称 -->
225 <if test="username != null and username != ''"> 237 <if test="username != null and username != ''">
226 and username = #{username,jdbcType=VARCHAR} 238 and username = #{username,jdbcType=VARCHAR}
@@ -229,7 +241,7 @@ @@ -229,7 +241,7 @@
229 <if test="queueName != null and queueName != ''"> 241 <if test="queueName != null and queueName != ''">
230 and queue_name = #{queueName,jdbcType=VARCHAR} 242 and queue_name = #{queueName,jdbcType=VARCHAR}
231 </if> 243 </if>
232 - and v.id = q.virtual_host_id 244 + and server_type = 1
233 </where> 245 </where>
234 </select> 246 </select>
235 247
@@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
16 <result column="description" jdbcType="VARCHAR" property="description"/> 16 <result column="description" jdbcType="VARCHAR" property="description"/>
17 <result column="gmt_create" jdbcType="TIMESTAMP" property="gmtCreate"/> 17 <result column="gmt_create" jdbcType="TIMESTAMP" property="gmtCreate"/>
18 <result column="gmt_modified" jdbcType="TIMESTAMP" property="gmtModified"/> 18 <result column="gmt_modified" jdbcType="TIMESTAMP" property="gmtModified"/>
  19 + <result column="server_type" jdbcType="INTEGER" property="serverType"/>
19 </resultMap> 20 </resultMap>
20 21
21 <!-- 该Mapper映射关系的作用,是服务器与虚拟主机的1:n的关系映射 --> 22 <!-- 该Mapper映射关系的作用,是服务器与虚拟主机的1:n的关系映射 -->
@@ -46,7 +47,7 @@ @@ -46,7 +47,7 @@
46 <sql id="Base_Column_List"> 47 <sql id="Base_Column_List">
47 <!--@mbg.generated--> 48 <!--@mbg.generated-->
48 id, `server_name`, server_ip, server_port, client_port, super_username, super_password, 49 id, `server_name`, server_ip, server_port, client_port, super_username, super_password,
49 - description, gmt_create, gmt_modified 50 + description, gmt_create, gmt_modified, server_type
50 </sql> 51 </sql>
51 <select id="selectByPrimaryKey" parameterType="java.lang.String" resultMap="BaseResultMap"> 52 <select id="selectByPrimaryKey" parameterType="java.lang.String" resultMap="BaseResultMap">
52 <!--@mbg.generated--> 53 <!--@mbg.generated-->
@@ -106,6 +107,9 @@ @@ -106,6 +107,9 @@
106 <if test="gmtModified != null"> 107 <if test="gmtModified != null">
107 gmt_modified, 108 gmt_modified,
108 </if> 109 </if>
  110 + <if test="serverType >0">
  111 + server_type,
  112 + </if>
109 </trim> 113 </trim>
110 <trim prefix="values (" suffix=")" suffixOverrides=","> 114 <trim prefix="values (" suffix=")" suffixOverrides=",">
111 <if test="id != null"> 115 <if test="id != null">
@@ -138,6 +142,9 @@ @@ -138,6 +142,9 @@
138 <if test="gmtModified != null"> 142 <if test="gmtModified != null">
139 #{gmtModified,jdbcType=TIMESTAMP}, 143 #{gmtModified,jdbcType=TIMESTAMP},
140 </if> 144 </if>
  145 + <if test="serverType >0 ">
  146 + #{serverType,jdbcType=INTEGER},
  147 + </if>
141 </trim> 148 </trim>
142 </insert> 149 </insert>
143 <update id="updateByPrimaryKeySelective" parameterType="com.sunyo.wlpt.message.bus.service.domain.BusServer"> 150 <update id="updateByPrimaryKeySelective" parameterType="com.sunyo.wlpt.message.bus.service.domain.BusServer">
@@ -247,6 +254,7 @@ @@ -247,6 +254,7 @@
247 <if test="serverName != null and serverName != ''"> 254 <if test="serverName != null and serverName != ''">
248 server_name = #{serverName,jdbcType=VARCHAR} 255 server_name = #{serverName,jdbcType=VARCHAR}
249 </if> 256 </if>
  257 + and server_type =1
250 </where> 258 </where>
251 </select> 259 </select>
252 260
  1 +<?xml version="1.0" encoding="UTF-8" ?>
  2 +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3 +<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.ConsumerGroupMapper" >
  4 + <resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.model.ConsumerGroup" >
  5 + <id column="id" property="id" jdbcType="VARCHAR" />
  6 + <result column="name" property="name" jdbcType="VARCHAR" />
  7 + <result column="des" property="des" jdbcType="VARCHAR" />
  8 + <result column="user_id" property="userId" jdbcType="VARCHAR" />
  9 + <result column="username" property="username" jdbcType="VARCHAR" />
  10 + </resultMap>
  11 + <sql id="Base_Column_List" >
  12 + id, name, des, user_id, username
  13 + </sql>
  14 + <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
  15 + select
  16 + <include refid="Base_Column_List" />
  17 + from consumer_group
  18 + where id = #{id,jdbcType=VARCHAR}
  19 + </select>
  20 + <select id="groups" resultType="java.lang.String" parameterType="java.lang.String" >
  21 + select
  22 + name
  23 + from consumer_group
  24 + </select>
  25 + <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
  26 + delete from consumer_group
  27 + where id = #{id,jdbcType=VARCHAR}
  28 + </delete>
  29 + <insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.model.ConsumerGroup" >
  30 + insert into consumer_group (id, name, des,
  31 + user_id, username)
  32 + values (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR}, #{des,jdbcType=VARCHAR},
  33 + #{userId,jdbcType=VARCHAR}, #{username,jdbcType=VARCHAR})
  34 + </insert>
  35 + <insert id="insertSelective" parameterType="com.sunyo.wlpt.message.bus.service.model.ConsumerGroup" >
  36 + insert into consumer_group
  37 + <trim prefix="(" suffix=")" suffixOverrides="," >
  38 + <if test="id != null" >
  39 + id,
  40 + </if>
  41 + <if test="name != null" >
  42 + name,
  43 + </if>
  44 + <if test="des != null" >
  45 + des,
  46 + </if>
  47 + <if test="userId != null" >
  48 + user_id,
  49 + </if>
  50 + <if test="username != null" >
  51 + username,
  52 + </if>
  53 + </trim>
  54 + <trim prefix="values (" suffix=")" suffixOverrides="," >
  55 + <if test="id != null" >
  56 + #{id,jdbcType=VARCHAR},
  57 + </if>
  58 + <if test="name != null" >
  59 + #{name,jdbcType=VARCHAR},
  60 + </if>
  61 + <if test="des != null" >
  62 + #{des,jdbcType=VARCHAR},
  63 + </if>
  64 + <if test="userId != null" >
  65 + #{userId,jdbcType=VARCHAR},
  66 + </if>
  67 + <if test="username != null" >
  68 + #{username,jdbcType=VARCHAR},
  69 + </if>
  70 + </trim>
  71 + </insert>
  72 + <update id="updateByPrimaryKeySelective" parameterType="com.sunyo.wlpt.message.bus.service.model.ConsumerGroup" >
  73 + update consumer_group
  74 + <set >
  75 + <if test="name != null" >
  76 + name = #{name,jdbcType=VARCHAR},
  77 + </if>
  78 + <if test="des != null" >
  79 + des = #{des,jdbcType=VARCHAR},
  80 + </if>
  81 + <if test="userId != null" >
  82 + user_id = #{userId,jdbcType=VARCHAR},
  83 + </if>
  84 + <if test="username != null" >
  85 + username = #{username,jdbcType=VARCHAR},
  86 + </if>
  87 + </set>
  88 + where id = #{id,jdbcType=VARCHAR}
  89 + </update>
  90 + <update id="updateByPrimaryKey" parameterType="com.sunyo.wlpt.message.bus.service.model.ConsumerGroup" >
  91 + update consumer_group
  92 + set name = #{name,jdbcType=VARCHAR},
  93 + des = #{des,jdbcType=VARCHAR},
  94 + user_id = #{userId,jdbcType=VARCHAR},
  95 + username = #{username,jdbcType=VARCHAR}
  96 + where id = #{id,jdbcType=VARCHAR}
  97 + </update>
  98 +</mapper>
  1 +<?xml version="1.0" encoding="UTF-8" ?>
  2 +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3 +<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.MessageRouterMapper" >
  4 + <resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.model.MessageRouter" >
  5 + <id column="id" property="id" jdbcType="VARCHAR" />
  6 + <result column="sndr" property="sndr" jdbcType="VARCHAR" />
  7 + <result column="btype" property="btype" jdbcType="VARCHAR" />
  8 + <result column="stype" property="stype" jdbcType="VARCHAR" />
  9 + <result column="optype" property="optype" jdbcType="VARCHAR" />
  10 + <result column="msg_limit" property="msgLimit" jdbcType="TINYINT" />
  11 + <result column="character" property="character" jdbcType="VARCHAR" />
  12 + <result column="status" property="status" jdbcType="BIT" />
  13 + <result column="usage" property="usage" jdbcType="BIT" />
  14 + <result column="des" property="des" jdbcType="VARCHAR" />
  15 + <result column="ver" property="ver" jdbcType="VARCHAR" />
  16 + <result column="creat_time" property="creatTime" jdbcType="TIMESTAMP" />
  17 + <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
  18 + </resultMap>
  19 + <sql id="Base_Column_List" >
  20 + id, sndr, btype, stype, optype, msg_limit, character, status, usage, des, ver, creat_time,
  21 + update_time
  22 + </sql>
  23 + <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
  24 + select
  25 + <include refid="Base_Column_List" />
  26 + from message_router
  27 + where id = #{id,jdbcType=VARCHAR}
  28 + </select>
  29 + <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
  30 + delete from message_router
  31 + where id = #{id,jdbcType=VARCHAR}
  32 + </delete>
  33 + <insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouter" >
  34 + insert into message_router (id, sndr, btype,
  35 + stype, optype, msg_limit,
  36 + character, status, usage, des,
  37 + ver, creat_time, update_time
  38 + )
  39 + values (#{id,jdbcType=VARCHAR}, #{sndr,jdbcType=VARCHAR}, #{btype,jdbcType=VARCHAR},
  40 + #{stype,jdbcType=VARCHAR}, #{optype,jdbcType=VARCHAR}, #{msgLimit,jdbcType=TINYINT},
  41 + #{character,jdbcType=VARCHAR}, #{status,jdbcType=BIT}, #{usage,jdbcType=BIT}, #{des,jdbcType=VARCHAR},
  42 + #{ver,jdbcType=VARCHAR}, #{creatTime,jdbcType=TIMESTAMP}, #{updateTime,jdbcType=TIMESTAMP}
  43 + )
  44 + </insert>
  45 + <insert id="insertSelective" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouter" >
  46 + insert into message_router
  47 + <trim prefix="(" suffix=")" suffixOverrides="," >
  48 + <if test="id != null" >
  49 + id,
  50 + </if>
  51 + <if test="sndr != null" >
  52 + sndr,
  53 + </if>
  54 + <if test="btype != null" >
  55 + btype,
  56 + </if>
  57 + <if test="stype != null" >
  58 + stype,
  59 + </if>
  60 + <if test="optype != null" >
  61 + optype,
  62 + </if>
  63 + <if test="msgLimit != null" >
  64 + msg_limit,
  65 + </if>
  66 + <if test="character != null" >
  67 + character,
  68 + </if>
  69 + <if test="status != null" >
  70 + status,
  71 + </if>
  72 + <if test="usage != null" >
  73 + usage,
  74 + </if>
  75 + <if test="des != null" >
  76 + des,
  77 + </if>
  78 + <if test="ver != null" >
  79 + ver,
  80 + </if>
  81 + <if test="creatTime != null" >
  82 + creat_time,
  83 + </if>
  84 + <if test="updateTime != null" >
  85 + update_time,
  86 + </if>
  87 + </trim>
  88 + <trim prefix="values (" suffix=")" suffixOverrides="," >
  89 + <if test="id != null" >
  90 + #{id,jdbcType=VARCHAR},
  91 + </if>
  92 + <if test="sndr != null" >
  93 + #{sndr,jdbcType=VARCHAR},
  94 + </if>
  95 + <if test="btype != null" >
  96 + #{btype,jdbcType=VARCHAR},
  97 + </if>
  98 + <if test="stype != null" >
  99 + #{stype,jdbcType=VARCHAR},
  100 + </if>
  101 + <if test="optype != null" >
  102 + #{optype,jdbcType=VARCHAR},
  103 + </if>
  104 + <if test="msgLimit != null" >
  105 + #{msgLimit,jdbcType=TINYINT},
  106 + </if>
  107 + <if test="character != null" >
  108 + #{character,jdbcType=VARCHAR},
  109 + </if>
  110 + <if test="status != null" >
  111 + #{status,jdbcType=BIT},
  112 + </if>
  113 + <if test="usage != null" >
  114 + #{usage,jdbcType=BIT},
  115 + </if>
  116 + <if test="des != null" >
  117 + #{des,jdbcType=VARCHAR},
  118 + </if>
  119 + <if test="ver != null" >
  120 + #{ver,jdbcType=VARCHAR},
  121 + </if>
  122 + <if test="creatTime != null" >
  123 + #{creatTime,jdbcType=TIMESTAMP},
  124 + </if>
  125 + <if test="updateTime != null" >
  126 + #{updateTime,jdbcType=TIMESTAMP},
  127 + </if>
  128 + </trim>
  129 + </insert>
  130 + <update id="updateByPrimaryKeySelective" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouter" >
  131 + update message_router
  132 + <set >
  133 + <if test="sndr != null" >
  134 + sndr = #{sndr,jdbcType=VARCHAR},
  135 + </if>
  136 + <if test="btype != null" >
  137 + btype = #{btype,jdbcType=VARCHAR},
  138 + </if>
  139 + <if test="stype != null" >
  140 + stype = #{stype,jdbcType=VARCHAR},
  141 + </if>
  142 + <if test="optype != null" >
  143 + optype = #{optype,jdbcType=VARCHAR},
  144 + </if>
  145 + <if test="msgLimit != null" >
  146 + msg_limit = #{msgLimit,jdbcType=TINYINT},
  147 + </if>
  148 + <if test="character != null" >
  149 + character = #{character,jdbcType=VARCHAR},
  150 + </if>
  151 + <if test="status != null" >
  152 + status = #{status,jdbcType=BIT},
  153 + </if>
  154 + <if test="usage != null" >
  155 + usage = #{usage,jdbcType=BIT},
  156 + </if>
  157 + <if test="des != null" >
  158 + des = #{des,jdbcType=VARCHAR},
  159 + </if>
  160 + <if test="ver != null" >
  161 + ver = #{ver,jdbcType=VARCHAR},
  162 + </if>
  163 + <if test="creatTime != null" >
  164 + creat_time = #{creatTime,jdbcType=TIMESTAMP},
  165 + </if>
  166 + <if test="updateTime != null" >
  167 + update_time = #{updateTime,jdbcType=TIMESTAMP},
  168 + </if>
  169 + </set>
  170 + where id = #{id,jdbcType=VARCHAR}
  171 + </update>
  172 + <update id="updateByPrimaryKey" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouter" >
  173 + update message_router
  174 + set sndr = #{sndr,jdbcType=VARCHAR},
  175 + btype = #{btype,jdbcType=VARCHAR},
  176 + stype = #{stype,jdbcType=VARCHAR},
  177 + optype = #{optype,jdbcType=VARCHAR},
  178 + msg_limit = #{msgLimit,jdbcType=TINYINT},
  179 + character = #{character,jdbcType=VARCHAR},
  180 + status = #{status,jdbcType=BIT},
  181 + usage = #{usage,jdbcType=BIT},
  182 + des = #{des,jdbcType=VARCHAR},
  183 + ver = #{ver,jdbcType=VARCHAR},
  184 + creat_time = #{creatTime,jdbcType=TIMESTAMP},
  185 + update_time = #{updateTime,jdbcType=TIMESTAMP}
  186 + where id = #{id,jdbcType=VARCHAR}
  187 + </update>
  188 +</mapper>
  1 +<?xml version="1.0" encoding="UTF-8" ?>
  2 +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3 +<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.MessageRouterReciverFilterMapper" >
  4 + <resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter" >
  5 + <id column="id" property="id" jdbcType="VARCHAR" />
  6 + <result column="filter" property="filter" jdbcType="VARCHAR" />
  7 + <result column="filter_value" property="filterValue" jdbcType="VARCHAR" />
  8 + <result column="type" property="type" jdbcType="VARCHAR" />
  9 + <result column="status" property="status" jdbcType="BIT" />
  10 + <result column="message_router_reciver_id" property="messageRouterReciverId" jdbcType="VARCHAR" />
  11 + <result column="creat_time" property="creatTime" jdbcType="TIMESTAMP" />
  12 + <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
  13 + </resultMap>
  14 + <sql id="Base_Column_List" >
  15 + id, filter, filter_value, type, status, message_router_reciver_id, creat_time, update_time
  16 + </sql>
  17 + <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
  18 + select
  19 + <include refid="Base_Column_List" />
  20 + from message_router_reciver_filter
  21 + where id = #{id,jdbcType=VARCHAR}
  22 + </select>
  23 + <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
  24 + delete from message_router_reciver_filter
  25 + where id = #{id,jdbcType=VARCHAR}
  26 + </delete>
  27 + <insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter" >
  28 + insert into message_router_reciver_filter (id, filter, filter_value,
  29 + type, status, message_router_reciver_id,
  30 + creat_time, update_time)
  31 + values (#{id,jdbcType=VARCHAR}, #{filter,jdbcType=VARCHAR}, #{filterValue,jdbcType=VARCHAR},
  32 + #{type,jdbcType=VARCHAR}, #{status,jdbcType=BIT}, #{messageRouterReciverId,jdbcType=VARCHAR},
  33 + #{creatTime,jdbcType=TIMESTAMP}, #{updateTime,jdbcType=TIMESTAMP})
  34 + </insert>
  35 + <insert id="insertSelective" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter" >
  36 + insert into message_router_reciver_filter
  37 + <trim prefix="(" suffix=")" suffixOverrides="," >
  38 + <if test="id != null" >
  39 + id,
  40 + </if>
  41 + <if test="filter != null" >
  42 + filter,
  43 + </if>
  44 + <if test="filterValue != null" >
  45 + filter_value,
  46 + </if>
  47 + <if test="type != null" >
  48 + type,
  49 + </if>
  50 + <if test="status != null" >
  51 + status,
  52 + </if>
  53 + <if test="messageRouterReciverId != null" >
  54 + message_router_reciver_id,
  55 + </if>
  56 + <if test="creatTime != null" >
  57 + creat_time,
  58 + </if>
  59 + <if test="updateTime != null" >
  60 + update_time,
  61 + </if>
  62 + </trim>
  63 + <trim prefix="values (" suffix=")" suffixOverrides="," >
  64 + <if test="id != null" >
  65 + #{id,jdbcType=VARCHAR},
  66 + </if>
  67 + <if test="filter != null" >
  68 + #{filter,jdbcType=VARCHAR},
  69 + </if>
  70 + <if test="filterValue != null" >
  71 + #{filterValue,jdbcType=VARCHAR},
  72 + </if>
  73 + <if test="type != null" >
  74 + #{type,jdbcType=VARCHAR},
  75 + </if>
  76 + <if test="status != null" >
  77 + #{status,jdbcType=BIT},
  78 + </if>
  79 + <if test="messageRouterReciverId != null" >
  80 + #{messageRouterReciverId,jdbcType=VARCHAR},
  81 + </if>
  82 + <if test="creatTime != null" >
  83 + #{creatTime,jdbcType=TIMESTAMP},
  84 + </if>
  85 + <if test="updateTime != null" >
  86 + #{updateTime,jdbcType=TIMESTAMP},
  87 + </if>
  88 + </trim>
  89 + </insert>
  90 + <update id="updateByPrimaryKeySelective" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter" >
  91 + update message_router_reciver_filter
  92 + <set >
  93 + <if test="filter != null" >
  94 + filter = #{filter,jdbcType=VARCHAR},
  95 + </if>
  96 + <if test="filterValue != null" >
  97 + filter_value = #{filterValue,jdbcType=VARCHAR},
  98 + </if>
  99 + <if test="type != null" >
  100 + type = #{type,jdbcType=VARCHAR},
  101 + </if>
  102 + <if test="status != null" >
  103 + status = #{status,jdbcType=BIT},
  104 + </if>
  105 + <if test="messageRouterReciverId != null" >
  106 + message_router_reciver_id = #{messageRouterReciverId,jdbcType=VARCHAR},
  107 + </if>
  108 + <if test="creatTime != null" >
  109 + creat_time = #{creatTime,jdbcType=TIMESTAMP},
  110 + </if>
  111 + <if test="updateTime != null" >
  112 + update_time = #{updateTime,jdbcType=TIMESTAMP},
  113 + </if>
  114 + </set>
  115 + where id = #{id,jdbcType=VARCHAR}
  116 + </update>
  117 + <update id="updateByPrimaryKey" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciverFilter" >
  118 + update message_router_reciver_filter
  119 + set filter = #{filter,jdbcType=VARCHAR},
  120 + filter_value = #{filterValue,jdbcType=VARCHAR},
  121 + type = #{type,jdbcType=VARCHAR},
  122 + status = #{status,jdbcType=BIT},
  123 + message_router_reciver_id = #{messageRouterReciverId,jdbcType=VARCHAR},
  124 + creat_time = #{creatTime,jdbcType=TIMESTAMP},
  125 + update_time = #{updateTime,jdbcType=TIMESTAMP}
  126 + where id = #{id,jdbcType=VARCHAR}
  127 + </update>
  128 +</mapper>
  1 +<?xml version="1.0" encoding="UTF-8" ?>
  2 +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3 +<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.MessageRouterReciverMapper" >
  4 + <resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciver" >
  5 + <id column="id" property="id" jdbcType="VARCHAR" />
  6 + <result column="rcvr_topic" property="rcvrTopic" jdbcType="VARCHAR" />
  7 + <result column="message_router_id" property="messageRouterId" jdbcType="VARCHAR" />
  8 + <result column="creat_time" property="creatTime" jdbcType="TIMESTAMP" />
  9 + <result column="update_time" property="updateTime" jdbcType="TIMESTAMP" />
  10 + </resultMap>
  11 + <sql id="Base_Column_List" >
  12 + id, rcvr_topic, message_router_id, creat_time, update_time
  13 + </sql>
  14 + <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
  15 + select
  16 + <include refid="Base_Column_List" />
  17 + from message_router_reciver
  18 + where id = #{id,jdbcType=VARCHAR}
  19 + </select>
  20 + <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
  21 + delete from message_router_reciver
  22 + where id = #{id,jdbcType=VARCHAR}
  23 + </delete>
  24 + <insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciver" >
  25 + insert into message_router_reciver (id, rcvr_topic, message_router_id,
  26 + creat_time, update_time)
  27 + values (#{id,jdbcType=VARCHAR}, #{rcvrTopic,jdbcType=VARCHAR}, #{messageRouterId,jdbcType=VARCHAR},
  28 + #{creatTime,jdbcType=TIMESTAMP}, #{updateTime,jdbcType=TIMESTAMP})
  29 + </insert>
  30 + <insert id="insertSelective" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciver" >
  31 + insert into message_router_reciver
  32 + <trim prefix="(" suffix=")" suffixOverrides="," >
  33 + <if test="id != null" >
  34 + id,
  35 + </if>
  36 + <if test="rcvrTopic != null" >
  37 + rcvr_topic,
  38 + </if>
  39 + <if test="messageRouterId != null" >
  40 + message_router_id,
  41 + </if>
  42 + <if test="creatTime != null" >
  43 + creat_time,
  44 + </if>
  45 + <if test="updateTime != null" >
  46 + update_time,
  47 + </if>
  48 + </trim>
  49 + <trim prefix="values (" suffix=")" suffixOverrides="," >
  50 + <if test="id != null" >
  51 + #{id,jdbcType=VARCHAR},
  52 + </if>
  53 + <if test="rcvrTopic != null" >
  54 + #{rcvrTopic,jdbcType=VARCHAR},
  55 + </if>
  56 + <if test="messageRouterId != null" >
  57 + #{messageRouterId,jdbcType=VARCHAR},
  58 + </if>
  59 + <if test="creatTime != null" >
  60 + #{creatTime,jdbcType=TIMESTAMP},
  61 + </if>
  62 + <if test="updateTime != null" >
  63 + #{updateTime,jdbcType=TIMESTAMP},
  64 + </if>
  65 + </trim>
  66 + </insert>
  67 + <update id="updateByPrimaryKeySelective" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciver" >
  68 + update message_router_reciver
  69 + <set >
  70 + <if test="rcvrTopic != null" >
  71 + rcvr_topic = #{rcvrTopic,jdbcType=VARCHAR},
  72 + </if>
  73 + <if test="messageRouterId != null" >
  74 + message_router_id = #{messageRouterId,jdbcType=VARCHAR},
  75 + </if>
  76 + <if test="creatTime != null" >
  77 + creat_time = #{creatTime,jdbcType=TIMESTAMP},
  78 + </if>
  79 + <if test="updateTime != null" >
  80 + update_time = #{updateTime,jdbcType=TIMESTAMP},
  81 + </if>
  82 + </set>
  83 + where id = #{id,jdbcType=VARCHAR}
  84 + </update>
  85 + <update id="updateByPrimaryKey" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageRouterReciver" >
  86 + update message_router_reciver
  87 + set rcvr_topic = #{rcvrTopic,jdbcType=VARCHAR},
  88 + message_router_id = #{messageRouterId,jdbcType=VARCHAR},
  89 + creat_time = #{creatTime,jdbcType=TIMESTAMP},
  90 + update_time = #{updateTime,jdbcType=TIMESTAMP}
  91 + where id = #{id,jdbcType=VARCHAR}
  92 + </update>
  93 +</mapper>
  1 +<?xml version="1.0" encoding="UTF-8" ?>
  2 +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3 +<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.MessageTypeMapper" >
  4 + <resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.model.MessageType" >
  5 + <id column="ID" property="id" jdbcType="VARCHAR" />
  6 + <result column="name" property="name" jdbcType="VARCHAR" />
  7 + <result column="parent_id" property="parentId" jdbcType="VARCHAR" />
  8 + <result column="des" property="des" jdbcType="VARCHAR" />
  9 + <result column="type" property="type" jdbcType="INTEGER" />
  10 + <collection column="ID" property="children" select="selectByParentId" />
  11 + </resultMap>
  12 + <sql id="Base_Column_List" >
  13 + ID, name, parent_id, des, type
  14 + </sql>
  15 + <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
  16 + select
  17 + <include refid="Base_Column_List" />
  18 + from message_type
  19 + where ID = #{id,jdbcType=VARCHAR}
  20 + </select>
  21 + <select id="selectByParentId" resultMap="BaseResultMap" parameterType="java.lang.String" >
  22 + select
  23 + <include refid="Base_Column_List" />
  24 + from message_type
  25 + where parent_id = #{parentId,jdbcType=VARCHAR}
  26 + </select>
  27 + <select id="selectAll" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageType" resultMap="BaseResultMap">
  28 + select
  29 + <include refid="Base_Column_List" />
  30 + from message_type
  31 + <where>
  32 + parent_id = 0
  33 + <if test="id != null and id != ''">
  34 + and ID = #{id,jdbcType=VARCHAR}
  35 + </if>
  36 + <if test="name != null and name != ''">
  37 + and name = #{name,jdbcType=VARCHAR}
  38 + </if>
  39 + <if test="type !=null and type > 0">
  40 + and type = #{type,jdbcType=INTEGER}
  41 + </if>
  42 + </where>
  43 + </select>
  44 + <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
  45 + delete from message_type
  46 + where ID = #{id,jdbcType=VARCHAR}
  47 + </delete>
  48 + <insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageType" >
  49 + insert into message_type (ID, name, parent_id,
  50 + des, type)
  51 + values (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR}, #{parentId,jdbcType=VARCHAR},
  52 + #{des,jdbcType=VARCHAR}, #{type,jdbcType=INTEGER})
  53 + </insert>
  54 + <insert id="insertSelective" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageType" >
  55 + insert into message_type
  56 + <trim prefix="(" suffix=")" suffixOverrides="," >
  57 + <if test="id != null" >
  58 + ID,
  59 + </if>
  60 + <if test="name != null" >
  61 + name,
  62 + </if>
  63 + <if test="parentId != null" >
  64 + parent_id,
  65 + </if>
  66 + <if test="des != null" >
  67 + des,
  68 + </if>
  69 + <if test="type != null" >
  70 + type,
  71 + </if>
  72 + </trim>
  73 + <trim prefix="values (" suffix=")" suffixOverrides="," >
  74 + <if test="id != null" >
  75 + #{id,jdbcType=VARCHAR},
  76 + </if>
  77 + <if test="name != null" >
  78 + #{name,jdbcType=VARCHAR},
  79 + </if>
  80 + <if test="parentId != null" >
  81 + #{parentId,jdbcType=VARCHAR},
  82 + </if>
  83 + <if test="des != null" >
  84 + #{des,jdbcType=VARCHAR},
  85 + </if>
  86 + <if test="type != null" >
  87 + #{type,jdbcType=INTEGER},
  88 + </if>
  89 + </trim>
  90 + </insert>
  91 + <update id="updateByPrimaryKeySelective" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageType" >
  92 + update message_type
  93 + <set >
  94 + <if test="name != null" >
  95 + name = #{name,jdbcType=VARCHAR},
  96 + </if>
  97 + <if test="parentId != null" >
  98 + parent_id = #{parentId,jdbcType=VARCHAR},
  99 + </if>
  100 + <if test="des != null" >
  101 + des = #{des,jdbcType=VARCHAR},
  102 + </if>
  103 + <if test="type != null" >
  104 + type = #{type,jdbcType=INTEGER},
  105 + </if>
  106 + </set>
  107 + where ID = #{id,jdbcType=VARCHAR}
  108 + </update>
  109 + <update id="updateByPrimaryKey" parameterType="com.sunyo.wlpt.message.bus.service.model.MessageType" >
  110 + update message_type
  111 + set name = #{name,jdbcType=VARCHAR},
  112 + parent_id = #{parentId,jdbcType=VARCHAR},
  113 + des = #{des,jdbcType=VARCHAR},
  114 + type = #{type,jdbcType=INTEGER}
  115 + where ID = #{id,jdbcType=VARCHAR}
  116 + </update>
  117 +</mapper>
  1 +<?xml version="1.0" encoding="UTF-8" ?>
  2 +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3 +<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.StypeMapper" >
  4 + <resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.model.Stype" >
  5 + <id column="id" property="id" jdbcType="VARCHAR" />
  6 + <result column="name" property="name" jdbcType="VARCHAR" />
  7 + <result column="des" property="des" jdbcType="VARCHAR" />
  8 + <result column="parent_id" property="parentId" jdbcType="VARCHAR" />
  9 + </resultMap>
  10 + <sql id="Base_Column_List" >
  11 + id, name, des, parent_id
  12 + </sql>
  13 + <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
  14 + select
  15 + <include refid="Base_Column_List" />
  16 + from stype
  17 + where id = #{id,jdbcType=VARCHAR}
  18 + </select>
  19 + <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
  20 + delete from stype
  21 + where id = #{id,jdbcType=VARCHAR}
  22 + </delete>
  23 + <insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.model.Stype" >
  24 + insert into stype (id, name, des,
  25 + parent_id)
  26 + values (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR}, #{des,jdbcType=VARCHAR},
  27 + #{parentId,jdbcType=VARCHAR})
  28 + </insert>
  29 + <insert id="insertSelective" parameterType="com.sunyo.wlpt.message.bus.service.model.Stype" >
  30 + insert into stype
  31 + <trim prefix="(" suffix=")" suffixOverrides="," >
  32 + <if test="id != null" >
  33 + id,
  34 + </if>
  35 + <if test="name != null" >
  36 + name,
  37 + </if>
  38 + <if test="des != null" >
  39 + des,
  40 + </if>
  41 + <if test="parentId != null" >
  42 + parent_id,
  43 + </if>
  44 + </trim>
  45 + <trim prefix="values (" suffix=")" suffixOverrides="," >
  46 + <if test="id != null" >
  47 + #{id,jdbcType=VARCHAR},
  48 + </if>
  49 + <if test="name != null" >
  50 + #{name,jdbcType=VARCHAR},
  51 + </if>
  52 + <if test="des != null" >
  53 + #{des,jdbcType=VARCHAR},
  54 + </if>
  55 + <if test="parentId != null" >
  56 + #{parentId,jdbcType=VARCHAR},
  57 + </if>
  58 + </trim>
  59 + </insert>
  60 + <update id="updateByPrimaryKeySelective" parameterType="com.sunyo.wlpt.message.bus.service.model.Stype" >
  61 + update stype
  62 + <set >
  63 + <if test="name != null" >
  64 + name = #{name,jdbcType=VARCHAR},
  65 + </if>
  66 + <if test="des != null" >
  67 + des = #{des,jdbcType=VARCHAR},
  68 + </if>
  69 + <if test="parentId != null" >
  70 + parent_id = #{parentId,jdbcType=VARCHAR},
  71 + </if>
  72 + </set>
  73 + where id = #{id,jdbcType=VARCHAR}
  74 + </update>
  75 + <update id="updateByPrimaryKey" parameterType="com.sunyo.wlpt.message.bus.service.model.Stype" >
  76 + update stype
  77 + set name = #{name,jdbcType=VARCHAR},
  78 + des = #{des,jdbcType=VARCHAR},
  79 + parent_id = #{parentId,jdbcType=VARCHAR}
  80 + where id = #{id,jdbcType=VARCHAR}
  81 + </update>
  82 +</mapper>
  1 +<?xml version="1.0" encoding="UTF-8" ?>
  2 +<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
  3 +<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.UserTopicMapper" >
  4 + <resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.model.UserTopic" >
  5 + <id column="id" property="id" jdbcType="VARCHAR" />
  6 + <result column="user_id" property="userId" jdbcType="VARCHAR" />
  7 + <result column="bus_queue_id" property="busQueueId" jdbcType="VARCHAR" />
  8 + <result column="username" property="username" jdbcType="VARCHAR" />
  9 + <result column="topic" property="topic" jdbcType="VARCHAR" />
  10 + </resultMap>
  11 + <sql id="Base_Column_List" >
  12 + id, user_id, bus_queue_id, username, topic
  13 + </sql>
  14 + <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.String" >
  15 + select
  16 + <include refid="Base_Column_List" />
  17 + from user_topic
  18 + where id = #{id,jdbcType=VARCHAR}
  19 + </select>
  20 + <delete id="deleteByPrimaryKey" parameterType="java.lang.String" >
  21 + delete from user_topic
  22 + where id = #{id,jdbcType=VARCHAR}
  23 + </delete>
  24 + <insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.model.UserTopic" >
  25 + insert into user_topic (id, user_id, bus_queue_id,
  26 + username, topic)
  27 + values (#{id,jdbcType=VARCHAR}, #{userId,jdbcType=VARCHAR}, #{busQueueId,jdbcType=VARCHAR},
  28 + #{username,jdbcType=VARCHAR}, #{topic,jdbcType=VARCHAR})
  29 + </insert>
  30 + <insert id="insertSelective" parameterType="com.sunyo.wlpt.message.bus.service.model.UserTopic" >
  31 + insert into user_topic
  32 + <trim prefix="(" suffix=")" suffixOverrides="," >
  33 + <if test="id != null" >
  34 + id,
  35 + </if>
  36 + <if test="userId != null" >
  37 + user_id,
  38 + </if>
  39 + <if test="busQueueId != null" >
  40 + bus_queue_id,
  41 + </if>
  42 + <if test="username != null" >
  43 + username,
  44 + </if>
  45 + <if test="topic != null" >
  46 + topic,
  47 + </if>
  48 + </trim>
  49 + <trim prefix="values (" suffix=")" suffixOverrides="," >
  50 + <if test="id != null" >
  51 + #{id,jdbcType=VARCHAR},
  52 + </if>
  53 + <if test="userId != null" >
  54 + #{userId,jdbcType=VARCHAR},
  55 + </if>
  56 + <if test="busQueueId != null" >
  57 + #{busQueueId,jdbcType=VARCHAR},
  58 + </if>
  59 + <if test="username != null" >
  60 + #{username,jdbcType=VARCHAR},
  61 + </if>
  62 + <if test="topic != null" >
  63 + #{topic,jdbcType=VARCHAR},
  64 + </if>
  65 + </trim>
  66 + </insert>
  67 + <update id="updateByPrimaryKeySelective" parameterType="com.sunyo.wlpt.message.bus.service.model.UserTopic" >
  68 + update user_topic
  69 + <set >
  70 + <if test="userId != null" >
  71 + user_id = #{userId,jdbcType=VARCHAR},
  72 + </if>
  73 + <if test="busQueueId != null" >
  74 + bus_queue_id = #{busQueueId,jdbcType=VARCHAR},
  75 + </if>
  76 + <if test="username != null" >
  77 + username = #{username,jdbcType=VARCHAR},
  78 + </if>
  79 + <if test="topic != null" >
  80 + topic = #{topic,jdbcType=VARCHAR},
  81 + </if>
  82 + </set>
  83 + where id = #{id,jdbcType=VARCHAR}
  84 + </update>
  85 + <update id="updateByPrimaryKey" parameterType="com.sunyo.wlpt.message.bus.service.model.UserTopic" >
  86 + update user_topic
  87 + set user_id = #{userId,jdbcType=VARCHAR},
  88 + bus_queue_id = #{busQueueId,jdbcType=VARCHAR},
  89 + username = #{username,jdbcType=VARCHAR},
  90 + topic = #{topic,jdbcType=VARCHAR}
  91 + where id = #{id,jdbcType=VARCHAR}
  92 + </update>
  93 +</mapper>