作者 王勇

拆分服务--接收消息服务

正在显示 48 个修改的文件 包含 4709 行增加0 行删除

要显示太多修改。

为保证性能只显示 48 of 48+ 个文件。

HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
... ...
server:
port: 9032
# spring 配置
spring:
security:
user:
name: admin
password: 123456
application:
name: message-bus-receive
# 数据源配置
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://118.31.66.166:3306/bus_service?characterEncoding=utf8&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
username: 110
password: QAHqCJf2kFYCLirM
# redis设置
redis:
database: 0 # Redis 数据库索引(默认为 0)
host: 192.168.37.139 # Redis 服务器地址
port: 6379 # Redis 服务器连接端口
password: 123456 # Redis 服务器连接密码(默认为空)
lettuce:
pool:
max-active: 8 # 连接池最大连接数(使用负值表示没有限制) 默认 8
max-wait: -1 # 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
max-idle: 8 # 连接池中的最大空闲连接 默认 8
min-idle: 0 # 连接池中的最小空闲连接 默认 0
# rabbitmq配置
rabbitmq:
host: 192.168.37.139
port: 5672
username: rabbit
password: 123456
virtual-host: /
# 多环境配置
profiles:
active: dev
# Jackson配置
jackson:
default-property-inclusion: ALWAYS
time-zone: GMT+8
date-format: yyyy-MM-dd HH:mm:ss
# zipkin 链路追踪配置
zipkin:
base-url: http://192.168.1.63:9411
sleuth:
sampler:
probability: 1
# mybatis 配置
mybatis:
mapper-locations: classpath:mapper/*.xml
type-aliases-package: com.sunyo.wlpt.message.bus.service.domain
# 日志配置
logging:
config: config/logback-dev.xml
level:
com.sunyo.wlpt.message.bus.service.mapper: debug
logback:
appname: message-bus-service
logdir: ./log
#eureka 配置
eureka:
instance:
status-page-url: http://${eureka.instance.hostname}:${server.port}/index
prefer-ip-address: true
instance-id: ${spring.cloud.client.ip-address}:${server.port}
hostname: ${spring.cloud.client.ip-address}
metadata-map:
user:
name: "admin"
password: "123456"
client:
healthcheck:
enabled: true
service-url:
defaultZone: http://192.168.1.53:12345/eureka/
# boot admin
management:
endpoints:
enabled-by-default: true
web:
exposure:
include: "*"
endpoint:
health:
show-details: always
shutdown:
enabled: true
# 基础信息配置
info:
version: 1.0
description: "消息总线平台——发送消息服务"
... ...
# 解决eureka的多网卡配置问题,指定网卡的IP地址的前缀
spring:
cloud:
inetutils:
preferred-networks: 192.168.1.
eureka:
instance:
prefer-ip-address: true
\ No newline at end of file
... ...
<?xml version="1.0" encoding="UTF-8"?>
<!--参考文档链接:https://blog.csdn.net/qq_34912478/article/details/80877132-->
<!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 -->
<!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true -->
<!-- scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 -->
<!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 -->
<configuration scan="true" scanPeriod="10 seconds">
<!--<include resource="org/springframework/boot/logging/logback/base.xml" />-->
<contextName>logback</contextName>
<!-- name的值是变量的名称,value的值时变量定义的值。通过定义的值会被插入到logger上下文中。定义变量后,可以使“${}”来使用变量。 -->
<property name="log.path" value="./logs" />
<!-- 彩色日志 -->
<!-- 彩色日志依赖的渲染类 -->
<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
<!-- 彩色日志格式 -->
<property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
<!--输出到控制台-->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息-->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>debug</level>
</filter>
<encoder>
<Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
<!-- 设置字符集 windows系统这里设置成GBK-->
<charset>UTF-8</charset>
</encoder>
</appender>
<!--输出到文件-->
<!-- 时间滚动输出 level为 DEBUG 日志 -->
<appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_debug.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset> <!-- 设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志归档 -->
<fileNamePattern>${log.path}/debug/log-debug-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录debug级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>debug</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 时间滚动输出 level为 INFO 日志 -->
<appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_info.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 每天日志归档路径以及格式 -->
<fileNamePattern>${log.path}/info/log-info-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录info级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>info</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 时间滚动输出 level为 WARN 日志 -->
<appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_warn.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset> <!-- 此处设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}/warn/log-warn-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录warn级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>warn</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 时间滚动输出 level为 ERROR 日志 -->
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_error.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset> <!-- 此处设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}/error/log-error-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录ERROR级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 时间滚动输出 level为 trace 日志 -->
<appender name="TRACE_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_trace.log</file>
<!--日志文件输出格式-->
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
<charset>UTF-8</charset> <!-- 此处设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}/trace/log-trace-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录trace级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>TRACE</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!--
<logger>用来设置某一个包或者具体的某一个类的日志打印级别、
以及指定<appender>。<logger>仅有一个name属性,
一个可选的level和一个可选的addtivity属性。
name:用来指定受此logger约束的某一个包或者具体的某一个类。
level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF,
还有一个特俗值INHERITED或者同义词NULL,代表强制执行上级的级别。
如果未设置此属性,那么当前logger将会继承上级的级别。
addtivity:是否向上级logger传递打印信息。默认是true。
-->
<!--<logger name="org.springframework.web" level="info"/>-->
<!--<logger name="org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor" level="INFO"/>-->
<!--
使用mybatis的时候,sql语句是debug下才会打印,而这里我们只配置了info,所以想要查看sql语句的话,有以下两种操作:
第一种把<root level="info">改成<root level="DEBUG">这样就会打印sql,不过这样日志那边会出现很多其他消息
第二种就是单独给dao下目录配置debug模式,代码如下,这样配置sql语句会打印,其他还是正常info级别:
-->
<!--
root节点是必选节点,用来指定最基础的日志输出级别,只有一个level属性
level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF,
不能设置为INHERITED或者同义词NULL。默认是DEBUG
可以包含零个或多个元素,标识这个appender将会添加到这个logger。
-->
<!--<logger name="com.tianbo.analysis" level="trace">-->
<!--<appender-ref ref="CONSOLE" />-->
<!--<appender-ref ref="TRACE_FILE" />-->
<!--<appender-ref ref="DEBUG_FILE" />-->
<!--<appender-ref ref="INFO_FILE" />-->
<!--<appender-ref ref="WARN_FILE" />-->
<!--<appender-ref ref="ERROR_FILE" />-->
<!--</logger>-->
<!--开发环境:打印控制台-->
<springProfile name="dev">
<logger name="org.springframework" level="info"/>
<logger name="com.sunyo.wlpt.nmms.mapper" level="debug">
<appender-ref ref="CONSOLE" />
<appender-ref ref="DEBUG_FILE" />
</logger>
<logger name="org.apache.tomcat" level="info" />
<root level="info">
<appender-ref ref="CONSOLE" />
<appender-ref ref="TRACE_FILE" />
<appender-ref ref="DEBUG_FILE" />
<appender-ref ref="INFO_FILE" />
<appender-ref ref="WARN_FILE" />
<appender-ref ref="ERROR_FILE" />
</root>
</springProfile>
<!--生产环境:输出到文件-->
<springProfile name="pro">
<logger name="org.springframework" level="INFO"/>
<root level="info">
<appender-ref ref="CONSOLE" />
<appender-ref ref="DEBUG_FILE" />
<appender-ref ref="INFO_FILE" />
<appender-ref ref="ERROR_FILE" />
<appender-ref ref="WARN_FILE" />
<appender-ref ref="TRACE_FILE" />
</root>
</springProfile>
</configuration>
\ No newline at end of file
... ...
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="ture" scanPeriod="60 seconds" debug="false">
<springProperty scope="context" name="appname" source="logback.appname"/>
<springProperty scope="context" name="logdir" source="logback.logdir"/>
<!--文件名-->
<contextName>${appname}</contextName>
<!--输出到控制面板-->
<appender name="consoleLog1" class="ch.qos.logback.core.ConsoleAppender">
<!-- layout输出方式输出-->
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d [%thread] %-5level %logger{36} - %msg%n</pattern>
</layout>
</appender>
<!--输出到控制面板-->
<appender name="consoleLog" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<!--输出info级别日志-->
<appender name="fileInfoLog" class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!--过滤 Error-->
<level>ERROR</level>
<!--匹配到就禁止-->
<onMatch>DENY</onMatch>
<!--没有匹配到就允许-->
<onMismatch>ACCEPT</onMismatch>
</filter>
<!--<File>../logs</File>-->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${logdir}/info.${appname}.%d{yyyy-MM-dd}.log</FileNamePattern>
<maxHistory>100</maxHistory>
<totalSizeCap>100MB</totalSizeCap>
</rollingPolicy>
<encoder>
<charset>UTF-8</charset>
<pattern>%d [%thread] %-5level %logger{36} %line - %msg%n</pattern>
</encoder>
</appender>
<!--输出Error级别日志-->
<!--<appender name="fileErrorLog" class="ch.qos.logback.core.rolling.RollingFileAppender">-->
<!--<filter class="ch.qos.logback.classic.filter.LevelFilter">-->
<!--&lt;!&ndash;过滤 Error &ndash;&gt;-->
<!--<level>ERROR</level>-->
<!--</filter>-->
<!--&lt;!&ndash;<File>../logs</File>&ndash;&gt;-->
<!--<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">-->
<!--<FileNamePattern>${logdir}/error.${appname}.%d{yyyy-MM-dd}.log</FileNamePattern>-->
<!--<maxHistory>100</maxHistory>-->
<!--<totalSizeCap>100MB</totalSizeCap>-->
<!--</rollingPolicy>-->
<!--<encoder>-->
<!--<charset>UTF-8</charset>-->
<!--<pattern>%d [%thread] %-5level %logger{36} %line - %msg%n</pattern>-->
<!--</encoder>-->
<!--</appender>-->
<!--监控下列类的所有日志,定义输出级别-->
<logger name="java.sql.PreparedStatement" level="DEBUG" additivity="false">
<appender-ref ref="consoleLog"/>
</logger>    
<logger name="java.sql.Connection" level="DEBUG" additivity="false"> 
<appender-ref ref="consoleLog"/>
</logger>  
<logger name="java.sql.Statement" level="DEBUG" additivity="false">
<appender-ref ref="consoleLog"/>
</logger>    
<logger name="com.ibatis" level="DEBUG" additivity="false">
<appender-ref ref="consoleLog"/>
</logger>    
<logger name="com.ibatis.common.jdbc.SimpleDataSource" level="DEBUG" additivity="false">
<appender-ref ref="consoleLog"/>
</logger>    
<logger name="com.ibatis.common.jdbc.ScriptRunner" level="DEBUG" additivity="false">
<appender-ref ref="consoleLog"/>
</logger>    
<logger name="com.ibatis.sqlmap.engine.impl.SqlMapClientDelegate" level="DEBUG" additivity="false">
<appender-ref ref="consoleLog"/>
</logger> 
<!--输出-->
<root level="INFO">
<appender-ref ref="consoleLog"/>
<!--<appender-ref ref="consoleLog"/>-->
<appender-ref ref="fileInfoLog"/>
<!--<appender-ref ref="fileErrorLog"/>-->
</root>
</configuration>
... ...
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.sunyo.wlpt.message.bus.service</groupId>
<artifactId>message_bus_receive</artifactId>
<version>1.0.0</version>
<name>message_bus_receive</name>
<description>消息总线平台——接收消息服务</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-cloud.version>Hoxton.RELEASE</spring-cloud.version>
<!-- springboot 2.2.1默认的es版本是6.8.4,下面的es的版本要和ES的版本一致 -->
<elasticsearch.version>7.4.0</elasticsearch.version>
</properties>
<dependencies>
<!-- SpringBoot start -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>http-client</artifactId>
<version>3.7.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<!-- SpringBoot end -->
<!-- SpringCloud start -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
<dependency>
<groupId>de.codecentric</groupId>
<artifactId>spring-boot-admin-starter-client</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- SpringCloud end -->
<!-- database start -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
<version>1.2.12</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.9</version>
</dependency>
<!-- database end -->
<!-- tools start -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>swagger-bootstrap-ui</artifactId>
<version>1.9.6</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10.5</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!-- dom4j解析xml -->
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- tools end -->
</dependencies>
<dependencyManagement>
<dependencies>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-dependencies</artifactId>-->
<!-- <version>${spring-boot.version}</version>-->
<!-- <type>pom</type>-->
<!-- <scope>import</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestone</id>
<name>Spring Milestone</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-snapshot</id>
<name>Spring Snapshot</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-milestone</id>
<name>Spring Milestone</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-snapshot</id>
<name>Spring Snapshot</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>
... ...
package com.sunyo.wlpt.message.bus.service;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* @author 子诚
*/
@EnableCaching
@SpringBootApplication
@MapperScan("com.sunyo.wlpt.message.bus.service.mapper")
@EnableFeignClients
@EnableEurekaClient
@EnableTransactionManagement
@EnableScheduling
@EnableAsync
public class MessageBusReceiveApplication {
public static void main(String[] args)
{
SpringApplication.run(MessageBusReceiveApplication.class, args);
}
}
... ...
package com.sunyo.wlpt.message.bus.service.cache;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* @author 子诚
* Description:用来获取springboot创建好的工厂
* 时间:2020/8/6 9:31
*/
@Component
public class ApplicationContextUtils implements ApplicationContextAware {
/**
* 保留下来工厂
*/
private static ApplicationContext context;
/**
* 将创建好工厂以参数形式传递给这个类
*
* @param applicationContext 上下文
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
{
context = applicationContext;
}
/**
* 提供在工厂中获取对象的方法 例如:RedisTemplate redisTemplate
*
* @param beanName bean的名称
* @return
*/
public static Object getBean(String beanName)
{
return context.getBean(beanName);
}
}
... ...
package com.sunyo.wlpt.message.bus.service.cache;
import org.apache.ibatis.cache.Cache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.util.DigestUtils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author 子诚
* Description:自定义Redis作为mybatis二级缓存实现
* 问题描述:缓存穿透、缓存雪崩
* <p>
* 缓存穿透:就是说利用一些列措施,使得访问避开了缓存,直接访问数据库,使得数据库不堪重负引起的问题。比如(压测)访问数据库中不存在的数据
* 解决方案:读取数据库,不存在;依旧生成对应的key,放到缓存中,但是对应的value是null(mybatis的二级缓存是这样解决的)。
* 下次再次访问的话,就是读取缓存。
* <p>
* 缓存雪崩:是指在某一特殊时刻,缓存中的缓存全部失效,然后这一时刻又有大量的数据库访问,导致数据库不堪重负。
* 解决方案:根据业务的不同设置不同的缓存失效时间。
* 比如:这个项目,做了3个namespace的缓存,其中一个namespace,共有5个Mapper指向它。所以选择使用范围内的随机值,来做缓存失效时间
* <p>
* 时间:2020/8/6 9:37
*/
public class RedisCache implements Cache {
/**
* slf4j的日志记录器
*/
private static final Logger logger = LoggerFactory.getLogger(RedisCache.class);
/**
* 用于事务性缓存操作的读写锁
*/
private static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
/**
* 当前放入缓存的mapper的namespace
*/
private final String id;
/**
* 必须存在构造方法
*/
public RedisCache(String id)
{
this.id = id;
}
/**
* 返回cache唯一标识
*/
@Override
public String getId()
{
return this.id;
}
/**
* 缓存放入值 redis RedisTemplate StringRedisTemplate
*
* @param key hash_key
* @param value hash_value
*/
@Override
public void putObject(Object key, Object value)
{
RedisTemplate redisTemplate = getRedisTemplate();
// 使用redis的hash类型作为缓存存储模型
redisTemplate.opsForHash().put(id.toString(), encryptionKey(key.toString()), value);
/**
* 根据业务的不同,设置不同的缓存时间,解决掉缓存雪崩
*/
if (id.equals("com.sunyo.wlpt.message.bus.service.mapper.VirtualHostMapper")) {
// 设置缓存时间
redisTemplate.expire(id.toString(), random(1, 3), TimeUnit.HOURS);
}
if (id.equals("com.sunyo.wlpt.message.bus.service.mapper.UserMessageBindingMapper")) {
// 设置缓存时间
redisTemplate.expire(id.toString(), random(60, 100), TimeUnit.MINUTES);
}
if (id.equals("com.sunyo.wlpt.message.bus.service.mapper.UserInfoMapper")) {
// 设置缓存时间
redisTemplate.expire(id.toString(), random(30, 50), TimeUnit.MINUTES);
}
}
/**
*
*/
public int random(int low, int high)
{
int num = ((int) (Math.random() * (high - low))) + low;
return num;
}
/**
* 缓存中中获取数据
*/
@Override
public Object getObject(Object key)
{
RedisTemplate redisTemplate = getRedisTemplate();
//根据key 从redis的hash类型中获取数据
return redisTemplate.opsForHash().get(id.toString(), encryptionKey(key.toString()));
}
/**
* 注意:这个方法为mybatis保留方法 默认没有实现 后续版本可能会实现
*
* @param key hash_key
* @return
*/
@Override
public Object removeObject(Object key)
{
RedisTemplate redisTemplate = getRedisTemplate();
redisTemplate.delete(key);
return null;
}
@Override
public void clear()
{
logger.info("清空->{}<-缓存", id);
RedisTemplate redisTemplate = getRedisTemplate();
// 清空 namespace
redisTemplate.delete(id.toString());
}
/**
* 用来计算缓存数量
*/
@Override
public int getSize()
{
RedisTemplate redisTemplate = getRedisTemplate();
// 获取hash中key value数量
return redisTemplate.opsForHash().size(id.toString()).intValue();
}
/**
* 封装redisTemplate
*
* @return RedisTemplate
*/
private RedisTemplate getRedisTemplate()
{
//通过application工具类获取redisTemplate
RedisTemplate redisTemplate = (RedisTemplate) ApplicationContextUtils.getBean("redisTemplate");
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
return redisTemplate;
}
/**
* 封装一个对key进行md5处理方法
*/
private String encryptionKey(String key)
{
return DigestUtils.md5DigestAsHex(key.getBytes());
}
@Override
public ReadWriteLock getReadWriteLock()
{
return readWriteLock;
}
}
... ...
package com.sunyo.wlpt.message.bus.service.common;
/**
* @author 子诚
* Description:常量公共类
* 时间:2020/7/17 9:49
*/
public class Constant {
public static final String EXIST_SERVERNAME = "MQ服务器名称已存在";
public static final String EXIST_SERVER_INFO = "该-MQ服务器(ServerIp以及ServerPort)-信息已存在,请谨慎输入";
public static final String EXIST_HOST_INFO = "该-虚拟主机(虚拟主机名称)-信息已存在,请谨慎输入";
public static final String EXIST_EXCHANGE_INFO = "该-交换机(交换机名称)-信息已存在,请谨慎输入";
public static final String EXIST_QUEUE_INFO = "该-消息队列-信息已存在,请谨慎输入";
public static final String EXIST_ROUTINGKEY_INFO = "该-路由键(路由键名称)-信息已存在,请谨慎输入";
public static final String EXIST_UMB = "该绑定关系已经存在了";
public static final String RESULT_SUCCESS = "200";
}
... ...
package com.sunyo.wlpt.message.bus.service.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
/**
* @author 子诚
* Description:ES的配置文件
* 时间:2020/8/5 10:23
*/
@Configuration
public class ElasticSearchConfig extends AbstractElasticsearchConfiguration {
private String hostname;
private Integer port;
private String scheme;
@Override
@Bean
public RestHighLevelClient elasticsearchClient()
{
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
// 天生契合集群,有几个es环境,就 new HttpHost 几个,用,相隔
new HttpHost("192.168.37.139", 9200, "http")
)
);
return client;
}
@Bean
public ElasticsearchRestTemplate elasticsearchRestTemplate()
{
return new ElasticsearchRestTemplate(elasticsearchClient());
}
}
... ...
package com.sunyo.wlpt.message.bus.service.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
/**
* @author 子诚
* Description:SpringSecurity 权限配置框架
* 时间:2020/7/5 13:38
*/
@EnableWebSecurity
@Configuration
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http.csrf().disable();
}
}
... ...
package com.sunyo.wlpt.message.bus.service.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
/**
* @author 子诚
* Description:swagger-knife4j 的配置文件
* 时间:2020/7/10 11:48
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket createRestApi()
{
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
.apis(RequestHandlerSelectors.basePackage("com.sunyo.wlpt.message.bus.service.controller"))
.paths(PathSelectors.any())
.build();
}
private ApiInfo apiInfo()
{
return new ApiInfoBuilder()
.title("消息总线平台--接收消息服务")
.description("消息总线平台--接收消息服务")
.termsOfServiceUrl("http://localhost:9032/")
.contact("子诚")
.version("1.0.0")
.build();
}
}
... ...
package com.sunyo.wlpt.message.bus.service.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author 子诚
* Description:为异步任务,创建线程池
* 时间:2020/8/20 14:15
*/
@Configuration
public class TaskConfiguration {
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(20);
// 最大线程数
executor.setMaxPoolSize(100);
// 缓冲队列
executor.setQueueCapacity(200);
// 允许线程的空闲时间
executor.setKeepAliveSeconds(60);
// 线程池名的前缀
executor.setThreadNamePrefix("taskExecutor-");
// 优雅的关闭线程池,
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
// 线程池对拒绝任务的处理策略,CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;
// 如果执行程序已关闭,则会丢弃该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
... ...
package com.sunyo.wlpt.message.bus.service.controller;
import com.sunyo.wlpt.message.bus.service.domain.XmlData;
import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.DirectUtils;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.UserMessageBindingService;
import com.sunyo.wlpt.message.bus.service.service.impl.AsyncTaskService;
import com.sunyo.wlpt.message.bus.service.utils.XmlUtils;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import static com.sunyo.wlpt.message.bus.service.common.Constant.RESULT_SUCCESS;
/**
* @author 子诚
* Description:
* 时间:2020/7/16 14:46
*/
@Slf4j
@CrossOrigin
@RequestMapping("bus/rabbit")
@RestController
public class RabbitController {
@Resource
private UserMessageBindingService userMessageBindingService;
@Resource
private XmlUtils xmlUtils;
@Resource
private DirectUtils directUtils;
@Resource
private AsyncTaskService asyncTaskService;
/**
* 消费消息,多个
*
* @param receiver 接收者
* @param SERV 服务器
* @param VSHT 虚拟主机
* @param RCVR 队列
* @return
*/
@GetMapping("/consumer_one")
public ResultJson consumerOne(
@RequestParam(value = "receiver") String receiver,
@RequestParam(value = "SERV") String SERV,
@RequestParam(value = "VSHT") String VSHT,
@RequestParam(value = "RCVR") String RCVR)
{
try {
XmlData xmlData = XmlData.builder()
.receiver(receiver)
.serverName(SERV)
.virtualHostName(VSHT)
.queueName(RCVR)
.build();
ResultJson validate = xmlUtils.validateReceiveParam(xmlData);
if (!RESULT_SUCCESS.equals(validate.getCode())) {
return validate;
}
ResultJson result = directUtils.directConsumerByPull(xmlData);
return result;
} catch (IOException | TimeoutException e) {
return ResultJson.error(CustomExceptionType.RECEIVE_SERVER_EXCEPTION);
}
}
/**
* 消费消息,多个
*
* @param receiver 接收者
* @param SERV 服务器
* @param VSHT 虚拟主机
* @param RCVR 队列
* @param count 消息数量
* @return
*/
@GetMapping("/consumer_more")
public ResultJson consumerMore(
@RequestParam(value = "receiver") String receiver,
@RequestParam(value = "SERV") String SERV,
@RequestParam(value = "VSHT") String VSHT,
@RequestParam(value = "RCVR") String RCVR,
@RequestParam(value = "count", required = false) Integer count)
{
try {
XmlData xmlData = XmlData.builder()
.receiver(receiver)
.serverName(SERV)
.virtualHostName(VSHT)
.queueName(RCVR)
.build();
ResultJson validate = xmlUtils.validateReceiveParam(xmlData);
if (!RESULT_SUCCESS.equals(validate.getCode())) {
return validate;
}
if (count == null || count < 1) {
count = 1;
}
ResultJson result = directUtils.directConsumerByPullMore(xmlData, count);
return result;
} catch (IOException | TimeoutException e) {
return ResultJson.error(CustomExceptionType.RECEIVE_SERVER_EXCEPTION);
}
}
}
... ...
package com.sunyo.wlpt.message.bus.service.domain;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
/**
* @author 子诚
* Description:MQ路由交换表
* 时间:2020/7/1 10:08
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class BusExchange implements Serializable {
private static final long serialVersionUID = -8533771450468596177L;
/**
* 交换机ID
*/
private String id;
/**
* 交换机名称
*/
private String exchangeName;
/**
* 所属虚拟主机ID
*/
private String virtualHostId;
/**
* 交换机类型:Direct、Fanout、Topic、Headers
*/
private String exchangeType;
/**
* 是否持久化,默认true:1是true;0是false
*/
private Boolean durability;
/**
* 是否自动删除,默认false:1是true;0是false
*/
private Boolean autoDelete;
/**
* 是否是RabbitMQ内部使用,默认false:1是true;0是false
*/
private Boolean internal;
/**
* 扩展参数,以JSON格式存储
*/
private String arguments;
/**
* 交换机相关描述
*/
private String description;
/**
* 交换机创建时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date gmtCreate;
/**
* 交换机修改时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date gmtModified;
private VirtualHost virtualHost;
/**
* 交换机名称的别名
*/
private String aliasName;
/**
* 定义有参构造器,封装 BusExchangeController 的查询参数
*
* @param exchangeName 交换机名称
* @param virtualHostId 所属虚拟主机ID
*/
public BusExchange(String exchangeName, String virtualHostId) {
this.exchangeName = exchangeName;
this.virtualHostId = virtualHostId;
}
}
... ...
package com.sunyo.wlpt.message.bus.service.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
/**
* @author 子诚
* Description:队列表
* 时间:2020/7/23 17:16
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class BusQueue implements Serializable {
private static final long serialVersionUID = 6999154061907346103L;
/**
* 队列的ID
*/
private String id;
/**
* 队列名称
*/
private String queueName;
/**
* 所属用户id
*/
private String userId;
/**
* 所属用户登录名称
*/
private String username;
/**
* 所属虚拟主机ID
*/
private String virtualHostId;
/**
* 是否持久化,默认true:1是true;0是false
*/
private Boolean durability;
/**
* 是否自动删除,默认false:1是true;0是false
*/
private Boolean autoDelete;
/**
* 扩展参数,以JSON格式存储
*/
private String arguments;
/**
* 队列相关描述
*/
private String description;
/**
* 队列创建时间
*/
private Date gmtCreate;
/**
* 队列修改时间
*/
private Date gmtModified;
/**
* 队列名称的别名
*/
private String aliasName;
/**
* 一个队列对应一个虚拟机
*/
private VirtualHost virtualHost;
}
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.domain;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
/**
* @author 子诚
* Description:MQ服务器静态信息表
* 时间:2020/8/12 14:07
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class BusServer implements Serializable {
private static final long serialVersionUID = -5662106932282473768L;
/**
* 服务器ID
*/
private String id;
/**
* 服务器名称
*/
private String serverName;
/**
* 服务器IP地址
*/
private String serverIp;
/**
* 服务器端口号
*/
private Integer serverPort;
/**
* 客户端的端口号
*/
private Integer clientPort;
/**
* 备用字段,超级用户的用户名
*/
private String superUsername;
/**
* 备用字段,超级用户的密码
*/
private String superPassword;
/**
* 服务器相关描述
*/
private String description;
/**
* 服务器创建时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date gmtCreate;
/**
* 服务器修改时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date gmtModified;
/**
* 一个服务器对应多个虚拟主机
*/
private List<VirtualHost> virtualHosts;
/**
* 对应的多个虚拟主机的别名,前端特殊使用,误删
*/
private List<VirtualHost> aliasList;
/**
* 服务器名称的别名
*/
private String aliasName;
}
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
/**
* @author 子诚
* Description:交换机、队列、routing_key绑定关系表
* 时间:2020/7/2 16:47
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ExchangeQueueRouting implements Serializable {
private static final long serialVersionUID = 3765854915972798115L;
/**
* 交换机、队列、routing_key绑定关系的ID
*/
private String id;
/**
* 对应交换机ID
*/
private String exchangeId;
/**
* 对应队列的ID
*/
private String queueId;
/**
* 对应路由键的ID
*/
private String routingKeyId;
/**
* 对应路由键的名称
*/
private String routingKeyName;
/**
* 创建时间
*/
private Date gmtCreate;
/**
* 修改时间
*/
private Date gmtModified;
}
... ...
package com.sunyo.wlpt.message.bus.service.domain;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.io.Serializable;
import java.util.Date;
/**
* @author 子诚
* Description:消息收发记录表(默认存储七天)
* 时间:2020/7/15 10:45
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(indexName = "message_note")
public class MessageNote implements Serializable {
private static final long serialVersionUID = -2119333801860569470L;
/**
* 消息收发记录表的ID
*/
@Id
private String id;
/**
* 用户的ID
*/
private String userId;
/**
* 所属用户登陆名称
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String username;
/**
* 所属服务器的ID
*/
@Field(type = FieldType.Text)
private String serverId;
/**
* 所属服务器名称
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String serverName;
/**
* 所属虚拟主机的ID
*/
@Field(type = FieldType.Text)
private String virtualHostId;
/**
* 所属虚拟主机名称
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String virtualHostName;
/**
* 所属交换机的ID
*/
@Field(type = FieldType.Text)
private String exchangeId;
/**
* 所属交换机名称
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String exchangeName;
/**
* 所属队列的ID
*/
@Field(type = FieldType.Text)
private String queueId;
/**
* 所属队列名称
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String queueName;
/**
* 所属路由键的ID
*/
@Field(type = FieldType.Text)
private String routingKeyId;
/**
* 所属路由键的名称
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String routingKeyName;
/**
* 消息发送时间
*/
@Field(type = FieldType.Date)
@JsonFormat(timezone = "GMT+8")
private Date sendTime;
@Field(type = FieldType.Text)
private String alias_sendTime;
/**
* 查询时间段,开始时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date sendTimeBegin;
/**
* 查询时间段,结束时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date sendTimeEnd;
/**
* 消息获取时间
*/
private Date receiveTime;
/**
* 发送消息内容
*/
private byte[] sendContent;
/**
* 发送消息内容,别名
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String alias_sendContent;
/**
* 相关描述
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String description;
/**
* 创建时间
*/
@Field(type = FieldType.Date)
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date gmtCreate;
/**
* 修改时间
*/
private Date gmtModified;
/**
* 自定义有参构造器,封装 MessageNoteController 的查询参数
*
* @param username 用户登陆名称
* @param serverName MQ服务器名称
* @param virtualHostName 虚拟主机名称
* @param exchangeName 交换机名称
* @param queueName 队列名称
* @param routingKeyName 路由键名称
* @param sendTime 发送消息时间
* @param receiveTime 接收消息时间
*/
public MessageNote(String username, String serverName, String virtualHostName, String exchangeName, String queueName, String routingKeyName, Date sendTime, Date receiveTime)
{
this.username = username;
this.serverName = serverName;
this.virtualHostName = virtualHostName;
this.exchangeName = exchangeName;
this.queueName = queueName;
this.routingKeyName = routingKeyName;
this.sendTime = sendTime;
this.receiveTime = receiveTime;
}
}
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.domain;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
/**
* @author 子诚
* Description:Routing key静态配置表/二级类
* 时间:2020/6/29 16:45
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RoutingKey implements Serializable {
private static final long serialVersionUID = 4318583918719126325L;
/**
* routing_key的ID
*/
private String id;
/**
* routing_key的名称
*/
private String routingKeyName;
/**
* 所属交换机ID
*/
private String exchangeId;
/**
* routing_key相关描述
*/
private String description;
/**
* routing_key创建时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date gmtCreate;
/**
* routing_key修改时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date gmtModified;
/**
* 路由键名称的别名
*/
private String aliasName;
/**
* 一个路由键对应一个交换机
*/
private BusExchange busExchange;
/**
* 定义有参构造器,封装 RoutingKeyController 的查询参数
*
* @param routingKeyName 路由键名称
* @param exchangeId 所属交换机ID
*/
public RoutingKey(String routingKeyName, String exchangeId) {
this.routingKeyName = routingKeyName;
this.exchangeId = exchangeId;
}
}
... ...
package com.sunyo.wlpt.message.bus.service.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author 子诚
* Description:自动定时删除的时间设置表
* 时间:2020/7/15 10:28
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SchedulingDelete implements Serializable {
private static final long serialVersionUID = -4810544767961191508L;
/**
* 时间设置表的ID
*/
private String id;
/**
* 设置时间
*/
private Integer deleteTime;
/**
* 类型
*/
private String deleteType;
/**
* 相关描述
*/
private String description;
}
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
/**
* @author 子诚
* Description:MQ账户信息表
* 时间:2020/8/10 17:44
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class UserInfo implements Serializable {
private static final long serialVersionUID = 8510385519421924997L;
/**
* 用户的ID
*/
private String id;
/**
* 用户登陆名称
*/
private String username;
/**
* 用户登陆密码
*/
private String password;
/**
* 所属服务器id
*/
private String serverId;
/**
* 所属服务器名称
*/
private String serverName;
/**
* 对应虚拟主机的id
*/
private String virtualHostId;
/**
* 对应虚拟主机的名称
*/
private String virtualHostName;
/**
* 用户的姓名(备用字段)
*/
private String realName;
/**
* 用户相关描述
*/
private String description;
/**
* 用户信息创建时间
*/
private Date gmtCreate;
/**
* 用户信息修改时间
*/
private Date gmtModified;
}
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
/**
* @author 子诚
* Description:账户信息绑定配置表
* 时间:2020/7/15 10:37
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class UserMessageBinding implements Serializable {
private static final long serialVersionUID = 1036230195492305641L;
/**
* 账户信息绑定配置表的ID
*/
private String id;
/**
* 用户的ID
*/
private String userId;
/**
* 所属用户登陆名称
*/
private String username;
/**
* 所属服务器的ID
*/
private String serverId;
/**
* 所属服务器名称
*/
private String serverName;
/**
* 所属虚拟主机的ID
*/
private String virtualHostId;
/**
* 所属虚拟主机名称
*/
private String virtualHostName;
/**
* 所属交换机的ID
*/
private String exchangeId;
/**
* 所属交换机名称
*/
private String exchangeName;
/**
* 所属队列的ID
*/
private String queueId;
/**
* 所属队列名称
*/
private String queueName;
/**
* 所属路由键的ID
*/
private String routingKeyId;
/**
* 所属路由键的名称
*/
private String routingKeyName;
/**
* 订阅者
*/
private String subscriber;
/**
* 相关描述
*/
private String description;
/**
* 创建时间
*/
private Date gmtCreate;
/**
* 修改时间
*/
private Date gmtModified;
/**
* 自定义有参构造器,封装 UserMessageBindingController 的查询参数
*
* @param username 用户名称
* @param serverName 服务器名称
* @param virtualHostName 虚拟主机名称
* @param exchangeName 交换机名称
* @param queueName 队列名称
* @param routingKeyName 路由键名称
* @param subscriber 订阅者
*/
public UserMessageBinding(String username, String serverName, String virtualHostName, String exchangeName, String queueName, String routingKeyName, String subscriber) {
this.username = username;
this.serverName = serverName;
this.virtualHostName = virtualHostName;
this.exchangeName = exchangeName;
this.queueName = queueName;
this.routingKeyName = routingKeyName;
this.subscriber = subscriber;
}
}
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.domain;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
/**
* @author 子诚
* Description:用户、服务器、虚拟机信息绑定关系表
* 时间:2020/7/2 11:35
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class UserServerVirtualHost implements Serializable {
private static final long serialVersionUID = -7376053283857221145L;
/**
* 用户、服务器、虚拟机信息绑定关系的ID
*/
private String id;
/**
* 所属用户登陆名称
*/
private String username;
/**
* 所属服务器ID
*/
private String serverId;
/**
* 所属虚拟主机ID
*/
private String virtualHostId;
/**
* 创建时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date gmtCreate;
/**
* 修改时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date gmtModified;
}
... ...
package com.sunyo.wlpt.message.bus.service.domain;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
/**
* @author 子诚
* Description:MQ虚拟主机名称表
* 时间:2020/6/29 16:36
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class VirtualHost implements Serializable {
private static final long serialVersionUID = -3174914872904776153L;
/**
* 虚拟主机ID
*/
private String id;
/**
* 虚拟主机名称
*/
private String virtualHostName;
/**
* 所属服务器ID
*/
private String serverId;
/**
* 虚拟主机相关描述
*/
private String description;
/**
* 虚拟主机创建时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date gmtCreate;
/**
* 虚拟主机修改时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date gmtModified;
/**
* 虚拟主机名称的别名
*/
private String aliasName;
/**
* 一个虚拟主机对应一个服务器
*/
private BusServer busServer;
/**
* 一个虚拟主机对应多个交换机
*/
private List<BusExchange> busExchanges;
/**
* 对应的多个交换机的别名,前端特殊使用,误删
*/
private List<BusExchange> aliasList;
/**
* 定义有参构造器
* 封装 VirtualHostController 的查询参数
*
* @param serverId 所属服务器ID
* @param virtualHostName 虚拟机名称
*/
public VirtualHost(String virtualHostName, String serverId) {
this.virtualHostName = virtualHostName;
this.serverId = serverId;
}
}
... ...
package com.sunyo.wlpt.message.bus.service.domain;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
// 报文模板,如下
/*
<?xml version="1.0" encoding="UTF-8"?>
<MSG>
<META>
<SNDR> 消息发送者账号 </SNDR>
<RCVR> 指定接受者 </RCVR>
<DDTM> 发送报文时间 </DDTM>
<TYPE> 大类型--交换机名称 </TYPE>
<STYP> 子类型--路由键名称 </STYP>
<SEQN> 序列号(唯一) </SEQN>
<TOKN> token值 </TOKN>
<VSHT> 虚拟机名称 </VSHT>
<SERV> MQ服务器名称 </SERV>
</META>
<BODY>
发送内容
</BODY>
</MSG>
*/
/**
* @author 子诚
* Description:
* 时间:2020/7/28 10:30
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class XmlData implements Serializable {
private static final long serialVersionUID = -2539052741259114774L;
/**
* 对应:根标签 MSG ->报文
*/
private String message;
/**
* 对应:标签 BODY -> 发送内容
*/
private String sendContent;
/**
* 对应:标签 META -> 报文的头部信息
*/
private String meta;
/**
* 对应:标签 SNDR -> 报文发送者(用户)
*/
private String sender;
/**
* 接收者
*/
private String receiver;
/**
* 对应:标签 RCVR -> 指定接收者(队列名称)
*/
private String queueName;
/**
* 对应:标签 DDTM -> 发送报文时间
*/
private String sendTime;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date sendDateTime;
/**
* 对应:标签 TYPE -> (一级)交换机名称
*/
private String exchangeName;
/**
* 对应:标签 STYP -> (二级)路由键名称
*/
private String routingKeyName;
/**
* 对应:标签 SEQN -> 序列(唯一)
*/
private String sequence;
/**
* 对应:标签 TOKN -> JWT生成的 token 值
*/
private String token;
/**
* 对应:标签 VSHT -> 指定的虚拟主机
*/
private String virtualHostName;
/**
* 对应:标签 SERV -> 指定的服务器
*/
private String serverName;
/**
* 普通用户的密码
*/
private String password;
/**
* 服务器ip地址
*/
private String serverIp;
/**
* 服务器端口号
*/
private Integer serverPort;
/**
* 超级用户名称
*/
private String superUsername;
/**
* 超级用户的密码
*/
private String superPassword;
}
... ...
package com.sunyo.wlpt.message.bus.service.domain.view;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.rabbitmq.http.client.domain.ExchangeInfo;
import com.rabbitmq.http.client.domain.ExchangeMessageStats;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author 子诚
* Description:
* 时间:2020/8/31 14:38
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class TempExchangeInfo extends ExchangeInfo {
@JsonProperty("message_stats")
private ExchangeMessageStats messageStats;
}
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.domain.view;
import com.rabbitmq.http.client.domain.ExchangeMessageStats;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author 子诚
* Description:自定义MQ返回信息
* 时间:2020/8/27 16:11
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ViewExchangeInfo implements Serializable {
private static final long serialVersionUID = -3353890396017709770L;
private String serverName;
private TempExchangeInfo tempExchangeInfo;
public long getPublishIn()
{
TempExchangeInfo tempExchangeInfo = getTempExchangeInfo();
ExchangeMessageStats messageStats = tempExchangeInfo.getMessageStats();
if (messageStats == null) {
return 0;
}
return messageStats.getPublishIn();
}
}
... ...
package com.sunyo.wlpt.message.bus.service.domain.view;
import com.rabbitmq.http.client.domain.QueueInfo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author 子诚
* Description:
* 时间:2020/8/27 14:29
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ViewQueueInfo implements Serializable {
private static final long serialVersionUID = -1016067862213381683L;
private String serverName;
private QueueInfo queueInfo;
/**
* 排序参数,消息挤压数量
*
* @return
*/
public long getMessageReady()
{
return queueInfo.getMessagesReady();
}
/**
* 排序参数,消息总数量
*
* @return
*/
public long getTotalMessages()
{
return queueInfo.getTotalMessages();
}
}
... ...
package com.sunyo.wlpt.message.bus.service.elasticsearch.dao;
import com.sunyo.wlpt.message.bus.service.domain.MessageNote;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
/**
* @author 子诚
* Description:
* 时间:2020/8/5 14:43
*/
public interface MessageNoteRepository extends ElasticsearchRepository<MessageNote, String> {
}
... ...
package com.sunyo.wlpt.message.bus.service.exception;
/**
* @author 子诚
* Description:
* 时间:2020/7/17 16:43
*/
public class CustomException extends RuntimeException {
private static final long serialVersionUID = 6098063244016154220L;
/**
* 异常错误编码
*/
private String code;
/**
* 异常信息
*/
private String message;
public CustomException(CustomExceptionType exceptionTypeEnum, String message) {
this.code = exceptionTypeEnum.getCode();
this.message = message;
}
public CustomException(CustomExceptionType exceptionTypeEnum) {
this.code = exceptionTypeEnum.getCode();
this.message = exceptionTypeEnum.getMsg();
}
public String getCode() {
return code;
}
@Override
public String getMessage() {
return message;
}
}
... ...
package com.sunyo.wlpt.message.bus.service.exception;
/**
* @author 子诚
* Description:枚举,定制异常类型
* 时间:2020/7/17 16:27
*/
public enum CustomExceptionType {
MESSAGE_SUCCESS("10200", "消息发送成功"),
RECEIVE_SUCCESS("20200", "接收消息,成功!"),
RECEIVE_SERVER_EXCEPTION("20500", "服务器异常,接收消息失败!请联系管理员处理"),
SERVER_EXCEPTION("10500", "服务器异常,发送消息失败!"),
CLIENT_EXCEPTION("10400", "报文格式错误,请检查报文格式!"),
BINDING_ERROR("10501", "配置信息,未进行绑定!"),
SENDER_ERROR("10401", "报文格式错误,发送者不能为空!"),
CONTENT_ERROR("10402", "报文格式错误,消息内容不能为空!"),
SERVER_ERROR("10403", "报文格式错误,服务器名称不能为空!"),
HOST_ERROR("10404", "报文格式错误,虚拟主机名称不能为空!"),
EXCHANGE_ERROR("10405", "报文格式错误,交换机名称不能为空!"),
ROUTING_KEY_ERROR("10406", "报文格式错误,路由键名称不能为空!"),
SEQUENCE_ERROR("10407", "报文格式错误,序列不能为空!"),
TOKEN_ERROR("10408", "报文格式错误,token不能为空!"),
SENDER_NO_EXIST("20401", "报文数据错误,发送者不存在!"),
SERVER_NO_EXIST("20403", "报文数据错误,服务器不存在!"),
HOST_NO_EXIST("20404", "报文数据错误,虚拟主机不存在!"),
EXCHANGE_NO_EXIST("20405", "报文数据错误,交换机不存在!"),
ROUTING_KEY_NO_EXIST("20406", "报文数据错误,路由键不存在!"),
RECEIVE_RECEIVER_ERROR("30401", "请仔细检查,接收者不存在!"),
RECEIVE_SERVER_ERROR("30402", "参数错误,服务器名称不存在!"),
RECEIVE_HOST_ERROR("30403", "参数错误,虚拟主机名称不存在!"),
RECEIVE_QUEUE_ERROR("30404", "参数错误,队列名称不存在!"),
RECEIVE_SERVER_HOST_ERROR("30405", "请仔细检查,该虚拟主机不属于该MQ服务器!"),
RECEIVE_HOST_QUEUE_ERROR("30406", "请仔细检查,该队列不属于该虚拟主机!"),
RECEIVE_USER_SERVER_HOST_ERROR("30407", "请仔细检查,该用户无权操作此虚拟主机"),
RECEIVE_USER_QUEUE_ERROR("30408", "请仔细检查,该用户无权操作此队列"),
RECEIVE_RECEIVER_NO_EXIST("30411", "请仔细检查,接收者不能为空!"),
RECEIVE_PASSWORD_NO_EXIST("30412", "请仔细检查,接收者不能为空!"),
RECEIVE_SERVER_NO_EXIST("30413", "参数错误,服务器名称不能为空!"),
RECEIVE_HOST_NO_EXIST("30414", "参数错误,虚拟主机名称不能为空!"),
RECEIVE_QUEUE_NO_EXIST("30415", "参数错误,队列名称不能为空!"),
CLIENT_ERROR("400", "客户端异常"),
SYSTEM_ERROR("500", "系统服务异常"),
OTHER_ERROR("999", "其他未知异常");
/**
* 响应业务状态
*/
private String code;
/**
* 响应消息
*/
private String msg;
CustomExceptionType(String code, String msg)
{
this.code = code;
this.msg = msg;
}
public String getCode()
{
return code;
}
public String getMsg()
{
return msg;
}
}
... ...
package com.sunyo.wlpt.message.bus.service.exception;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
/**
* @author 子诚
* Description:自定义全局异常处理类
* 时间:2020/7/17 17:44
*/
//@ControllerAdvice
public class GlobalExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(GlobalExceptionHandler.class);
//处理程序员主动转换的自定义异常
@ExceptionHandler(CustomException.class)
@ResponseBody
public ResultJson customerException(CustomException e) {
if(e.getCode() == CustomExceptionType.SYSTEM_ERROR.getCode()){
//400异常不需要持久化,将异常信息以友好的方式告知用户就可以
//TODO 将500异常信息持久化处理,方便运维人员处理
logger.error("");
}
return ResultJson.error(e);
}
}
... ...
package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.domain.BusExchange;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
/**
* @author 子诚
* Description:
* 时间:2020/7/1 10:08
*/
@Mapper
public interface BusExchangeMapper {
/**
* 删除,根据主键
*
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id);
/**
* 根据虚拟主机删除交换机
*
* @param virtualHostId 虚拟主机id
* @return
*/
int deleteByVirtualHostId(String virtualHostId);
/**
* 新增
*
* @param record the record
* @return insert count
*/
int insert(BusExchange record);
/**
* 新增,选择性
*
* @param record the record
* @return insert count
*/
int insertSelective(BusExchange record);
/**
* 查询,根据主键
*
* @param id primary key
* @return object by primary key
*/
BusExchange selectByPrimaryKey(String id);
/**
* 查询,根据交换机名称
*
* @param exchangeName 交换机名称
* @return
*/
BusExchange selectByExchangeName(String exchangeName);
/**
* 更新,选择性,根据主键
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKeySelective(BusExchange record);
/**
* 更新,根据主键
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKey(BusExchange record);
/**
* 查询交换机列表,选择性
*
* @param busExchange 交换机以及参数
* @return 交换机列表
*/
List<BusExchange> selectBusExchangeList(BusExchange busExchange);
/**
* 校验该服务器信息是否存在
*
* @param busExchange {@link BusExchange}
* @return List<BusExchange>
*/
List<BusExchange> validateBusExchange(BusExchange busExchange);
/**
* 仅,查询交换机列表
*
* @param busExchange {@link BusExchange}
* @return List<BusExchange>
*/
List<BusExchange> getExchangeList(BusExchange busExchange);
/**
* 检验交换机是否存在
*
* @param exchange {@link BusExchange}
* @return List<BusExchange>
*/
List<BusExchange> selectExchangeExist(BusExchange exchange);
/**
* 查询交换机列表,根据虚拟主机id
*
* @param virtualHostId 虚拟主机
* @return
*/
List<BusExchange> selectByVirtualHostId(String virtualHostId);
}
... ...
package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @author 子诚
* Description:
* 时间:2020/7/23 17:16
*/
@Mapper
public interface BusQueueMapper {
/**
* delete by primary key
*
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id);
/**
* 删除队列,根据虚拟主机id
*
* @param virtualHostId 虚拟主机id
* @return
*/
int deleteByVirtualHostId(String virtualHostId);
/**
* insert record to table
*
* @param record the record
* @return insert count
*/
int insert(BusQueue record);
/**
* insert record to table selective
*
* @param record the record
* @return insert count
*/
int insertSelective(BusQueue record);
/**
* select by primary key
*
* @param id primary key
* @return object by primary key
*/
BusQueue selectByPrimaryKey(String id);
/**
* 根据虚拟主机id,查询队列列表
*
* @param virtualHostId 虚拟主机id
* @return
*/
List<BusQueue> selectByVirtualHostId(String virtualHostId);
/**
* update record selective
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKeySelective(BusQueue record);
/**
* update record
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKey(BusQueue record);
/**
* 查询,消息队列列表,选择性
*
* @param busQueue 消息队列 {@link BusQueue}
* @return List<BusQueue>
*/
List<BusQueue> selectBusQueueList(BusQueue busQueue);
/**
* 校验-消息队列,是否已存在
*
* @param busQueue {@link BusQueue}
* @return List<BusQueue>
*/
List<BusQueue> validateBusQueue(BusQueue busQueue);
/**
* 校验队列是否存在,根据用户名以及队列名称
*
* @param username 用户名称
* @param queueName 队列名称
* @return
*/
List<BusQueue> validateByUserNameAndQueueName(@Param("username") String username, @Param("queueName") String queueName);
/**
* 仅,查询队列列表
*
* @param busQueue {@link BusQueue}
* @return
*/
List<BusQueue> getQueueList(BusQueue busQueue);
/**
* 根据用户名称查询队列信息
*
* @param username 用户名称
* @return
*/
List<BusQueue> selectByUsername(String username);
/**
* 根据用户名查询队列信息
*
* @param queueName 队列名称
* @return
*/
BusQueue selectByQueueName(@Param("queueName") String queueName);
/**
* 根据用户名称和虚拟主机Id查询队列信息
*
* @param username 用户名称
* @param virtualHostId 虚拟主机Id
* @return
*/
List<BusQueue> selectByUsernameAndHostId(@Param("username") String username, @Param("virtualHostId") String virtualHostId);
}
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.domain.BusServer;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @author 子诚
* Description:
* 时间:2020/8/12 14:07
*/
@Mapper
public interface BusServerMapper {
/**
* delete by primary key
*
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id);
/**
* insert record to table
*
* @param record the record
* @return insert count
*/
int insert(BusServer record);
/**
* insert record to table selective
*
* @param record the record
* @return insert count
*/
int insertSelective(BusServer record);
/**
* select by primary key
*
* @param id primary key
* @return object by primary key
*/
BusServer selectByPrimaryKey(String id);
/**
* update record selective
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKeySelective(BusServer record);
/**
* update record
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKey(BusServer record);
/**
* 查询,根据服务器名称
*
* @param serverName 服务器名称
* @return
*/
BusServer selectByServerName(String serverName);
/**
* 查询服务器列表,选择性
*
* @param busServer 服务器以及参数
* @return 服务器列表
*/
List<BusServer> selectBusServerList(BusServer busServer);
/**
* 查询服务器列表(不包含密码)
*
* @return 服务器列表
*/
List<BusServer> selectServerList();
/**
* 先校验该服务器名称是否存在
*
* @param busServer {@link BusServer}
* @return List<BusServer>
*/
List<BusServer> validateServerName(BusServer busServer);
/**
* 校验该服务器名称是否存在
*
* @param serverName 服务器名称
* @return
*/
List<BusServer> validateByServerName(@Param("serverName") String serverName);
/**
* 校验该服务器信息是否存在
*
* @param busServer {@link BusServer}
* @return List<BusServer>
*/
List<BusServer> validateBusServer(BusServer busServer);
/**
* 查询服务器列表(包含密码)
*
* @return 服务器列表
*/
List<BusServer> getServerList();
/**
* 查询,服务器(1:n虚拟主机)的基本信息
*
* @return List<BusServer>
*/
List<BusServer> getServerAndHostList(BusServer busServer);
/**
* 服务器与虚拟主机是1:n的关系
* 虚拟主机与交换机是1: n的关系
* 查询,服务器列表(包含虚拟机、交换机)
*
* @return List<BusServer>
*/
List<BusServer> getServerAndHostAndExchangeList();
/**
* 检验用户名是否存在
*
* @param serverName 服务器名称
* @return List<BusServer>
*/
List<BusServer> selectServerExist(@Param("serverName") String serverName);
}
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.domain.ExchangeQueueRouting;
import org.apache.ibatis.annotations.Mapper;
/**
* @author 子诚
* Description:
* 时间:2020/7/2 16:47
*/
@Mapper
public interface ExchangeQueueRoutingMapper {
/**
* 删除,根据主键
*
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id);
/**
* 新增
*
* @param record the record
* @return insert count
*/
int insert(ExchangeQueueRouting record);
/**
* 新增,选择性
*
* @param record the record
* @return insert count
*/
int insertSelective(ExchangeQueueRouting record);
/**
* 查询,根据主键
*
* @param id primary key
* @return object by primary key
*/
ExchangeQueueRouting selectByPrimaryKey(String id);
/**
* 更新,根据主键,选择性
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKeySelective(ExchangeQueueRouting record);
/**
* 更新,根据主键
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKey(ExchangeQueueRouting record);
}
... ...
package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.domain.MessageNote;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @author 子诚
* Description:
* 时间:2020/7/15 10:45
*/
@Mapper
public interface MessageNoteMapper {
/**
* delete by primary key
*
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id);
/**
* insert record to table
*
* @param record the record
* @return insert count
*/
int insert(MessageNote record);
/**
* insert record to table selective
*
* @param record the record
* @return insert count
*/
int insertSelective(MessageNote record);
/**
* select by primary key
*
* @param id primary key
* @return object by primary key
*/
MessageNote selectByPrimaryKey(String id);
/**
* update record selective
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKeySelective(MessageNote record);
/**
* update record
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKey(MessageNote record);
/**
* 获取,消息收发记录-列表
*
* @param messageNote {@link MessageNote}
* @return List<MessageNote>
*/
List<MessageNote> selectMessageNoteList(MessageNote messageNote);
/**
* 自动删除消息收发记录
*
* @param deleteTime 删除的时间
*/
void autoDelete(@Param("deleteTime") Integer deleteTime);
}
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.domain.RoutingKey;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @author 子诚
* Description:
* 时间:2020/6/29 16:45
*/
@Mapper
public interface RoutingKeyMapper {
/**
* 删除,根据主键
*
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id);
/**
* 新增
*
* @param record the record
* @return insert count
*/
int insert(RoutingKey record);
/**
* 新增,选择性
*
* @param record the record
* @return insert count
*/
int insertSelective(RoutingKey record);
/**
* 查询,根据主键
*
* @param id primary key
* @return object by primary key
*/
RoutingKey selectByPrimaryKey(String id);
/**
* 查询,根据路由键名称
*
* @param routingKeyName 路由键名称
* @return
*/
RoutingKey selectByRoutingKeyName(String routingKeyName);
/**
* 更新,选择性,根据主键
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKeySelective(RoutingKey record);
/**
* 更新,根据主键
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKey(RoutingKey record);
/**
* 查询,路由键列表,选择性
*
* @param routingKey 路由键参数类
* @return List<RoutingKey> {@link RoutingKey}
*/
List<RoutingKey> selectRoutingKeyList(RoutingKey routingKey);
/**
* 校验,路由键是否已存在
*
* @param routingKey 路由键 {@link RoutingKey}
* @return List<RoutingKey>
*/
List<RoutingKey> validateRoutingKey(RoutingKey routingKey);
/**
* 仅,查询路由键列表
*
* @param routingKey 路由键 {@link RoutingKey}
* @return List<RoutingKey>
*/
List<RoutingKey> getRoutingKeyList(RoutingKey routingKey);
/**
* 根据exchangeID查询路由键
*
* @param exchangeId 交换机id
* @return
*/
List<RoutingKey> selectByExchangeId(@Param("exchangeId") String exchangeId);
/**
* 检验路由键是否存在,根据交换机id和路由键名称
*
* @param routingKey 路由键
* @return List<RoutingKey>
*/
List<RoutingKey> selectRoutingKeyExist(RoutingKey routingKey);
/**
* 根据交换机id,删除路由键信息
*
* @param exchangeId 交换机id
* @return
*/
int deleteByExchangeId(String exchangeId);
}
... ...
package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.domain.SchedulingDelete;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
/**
* @author 子诚
* Description:
* 时间:2020/7/15 10:28
*/
@Mapper
public interface SchedulingDeleteMapper {
/**
* delete by primary key
*
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id);
/**
* insert record to table
*
* @param record the record
* @return insert count
*/
int insert(SchedulingDelete record);
/**
* insert record to table selective
*
* @param record the record
* @return insert count
*/
int insertSelective(SchedulingDelete record);
/**
* select by primary key
*
* @param id primary key
* @return object by primary key
*/
SchedulingDelete selectByPrimaryKey(String id);
/**
* update record selective
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKeySelective(SchedulingDelete record);
/**
* update record
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKey(SchedulingDelete record);
/**
* 获取时间设置表
*
* @param deleteType 删除的类型
* @return {@link SchedulingDelete}
*/
SchedulingDelete selectByType(@Param("deleteType") String deleteType);
}
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import com.sunyo.wlpt.message.bus.service.domain.UserInfo;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @author 子诚
* Description:
* 时间:2020/8/10 17:44
*/
@Mapper
public interface UserInfoMapper {
/**
* delete by primary key
*
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id);
/**
* insert record to table
*
* @param record the record
* @return insert count
*/
int insert(UserInfo record);
/**
* insert record to table selective
*
* @param record the record
* @return insert count
*/
int insertSelective(UserInfo record);
/**
* select by primary key
*
* @param id primary key
* @return object by primary key
*/
UserInfo selectByPrimaryKey(String id);
/**
* update record selective
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKeySelective(UserInfo record);
/**
* update record
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKey(UserInfo record);
/**
* 查询,根据用户名称
*
* @param username 用户登录名称
* @return {@link UserInfo}
*/
List<UserInfo> selectByUsername(String username);
/**
* 仅,查询用户列表
*
* @return 用户信息集合
*/
List<UserInfo> getUserInfoList();
/**
* 判断该用户名是否存在
*
* @param username 用户登录名
* @return 用户列表
*/
List<UserInfo> selectUserExist(@Param("username") String username);
/**
* 分页查询用户信息
*
* @param userInfo 用户信息 {@link UserInfo}
* @return
*/
List<UserInfo> selectUserInfoList(UserInfo userInfo);
/**
* 校验用户信息
*
* @param userInfo 用户信息 {@link UserInfo}
* @return
*/
List<UserInfo> validateUserInfo(UserInfo userInfo);
/**
* 校验用户信息
*
* @param username 用户名称
* @param serverName 服务器名称
* @param virtualHostName 虚拟主机名称
* @return
*/
List<UserInfo> validateUserInfoByParams(@Param("username") String username,
@Param("serverName") String serverName,
@Param("virtualHostName") String virtualHostName);
/**
* 根据队列中的用户名,虚拟主机名称和虚拟主机id,查询用户关系
*
* @param busQueue {@link BusQueue}
* @return
*/
List<UserInfo> selectByBusQueue(BusQueue busQueue);
/**
* 根据用户名称删除用户信息
*
* @param username
* @return
*/
int deleteByUsername(String username);
/**
* 根据用户名称和服务器名称删除用户信息
*
* @param username 用户名称
* @param serverName 服务器名称
* @return
*/
int deleteByUsernameAndServerName(@Param("username") String username, @Param("serverName") String serverName);
/**
* 根据服务器名称,删除用户关系
*
* @param serverName 服务器名称
* @return
*/
int deleteByServerName(String serverName);
/**
* 根据虚拟主机名称,删除用户关系
*
* @param hostName 虚拟主机名称
* @return
*/
int deleteByHostName(String hostName);
/**
* 根据用户名称和服务器名称,查询用户信息
*
* @param username 用户名称
* @param serverName 服务器名称
* @return
*/
List<UserInfo> selectByUsernameAndServerName(@Param("username") String username, @Param("serverName") String serverName);
}
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.domain.MessageNote;
import com.sunyo.wlpt.message.bus.service.domain.UserMessageBinding;
import com.sunyo.wlpt.message.bus.service.domain.XmlData;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
/**
* @author 子诚
* Description:
* 时间:2020/7/15 10:37
*/
@Mapper
public interface UserMessageBindingMapper {
/**
* delete by primary key
*
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id);
/**
* 根据交换机id删除配置关系
*
* @param exchangeId 交换机id
* @return 删除成功
*/
int deleteByExchangeId(String exchangeId);
/**
* 根据队列Id,删除绑定关系
*
* @param queueId 队列id
* @return
*/
int deleteByQueueId(String queueId);
/**
* 根据路由键id删除配置关系
*
* @param routingKeyId 路由键id
* @return 删除成功
*/
int deleteByRoutingKeyId(String routingKeyId);
/**
* 删除配置关系
*
* @param virtualHostId 虚拟主机id
* @return
*/
int deleteByVirtualHostId(String virtualHostId);
/**
* 根据服务器id,删除绑定关系
*
* @param serverId 服务器id
* @return
*/
int deleteByServerId(String serverId);
/**
* insert record to table
*
* @param record the record
* @return insert count
*/
int insert(UserMessageBinding record);
/**
* insert record to table selective
*
* @param record the record
* @return insert count
*/
int insertSelective(UserMessageBinding record);
/**
* select by primary key
*
* @param id primary key
* @return object by primary key
*/
UserMessageBinding selectByPrimaryKey(String id);
/**
* 根据路由键id
*
* @param routingKeyId 路由键id
* @return
*/
List<UserMessageBinding> selectByRoutingKeyId(String routingKeyId);
/**
* 根据虚拟主机id,查询配置关系列表
*
* @param virtualHostId 虚拟主机id
* @return List<UserMessageBinding>
*/
List<UserMessageBinding> selectByVirtualHostId(String virtualHostId);
/**
* 根据服务器id,查询配置关系列表
*
* @param serverId 服务器id
* @return List<UserMessageBinding>
*/
List<UserMessageBinding> selectByServerId(String serverId);
/**
* update record selective
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKeySelective(UserMessageBinding record);
/**
* update record
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKey(UserMessageBinding record);
/**
* 分页查询,获取账户信息绑定配置列表
*
* @param userMessageBinding {@link UserMessageBinding}
* @return List<UserMessageBinding>
*/
List<UserMessageBinding> selectUserMessageBindingList(UserMessageBinding userMessageBinding);
/**
* 在执行填充和添加之前先校验一下是否存在?
*
* @param userMessageBinding {@link UserMessageBinding}
* @return 判断校验是否通过
*/
List<UserMessageBinding> validateBinding(UserMessageBinding userMessageBinding);
/**
* 发送信息之前的检验配置是否存在
*
* @param messageNote {@link MessageNote}
* @return 判断校验是否通过
*/
List<UserMessageBinding> validateNote(MessageNote messageNote);
/**
* 校验报文配置与否?
*
* @param xmlData {@link XmlData}
* @return 判断校验是否通过
*/
List<UserMessageBinding> validateXmlBinding(XmlData xmlData);
/**
* 进行校验该MQ上的绑定关系,是否存在
*
* @param userMessageBinding {@link UserMessageBinding}
* @return 配置关系,集合
*/
List<UserMessageBinding> validateRabbitBinding(UserMessageBinding userMessageBinding);
}
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.domain.UserServerVirtualHost;
import org.apache.ibatis.annotations.Mapper;
/**
* @author 子诚
* Description:
* 时间:2020/7/2 11:35
*/
@Mapper
public interface UserServerVirtualHostMapper {
/**
* 删除,根据主键
*
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id);
/**
* 新增
*
* @param record the record
* @return insert count
*/
int insert(UserServerVirtualHost record);
/**
* 新增,选择性
*
* @param record the record
* @return insert count
*/
int insertSelective(UserServerVirtualHost record);
/**
* 查询,根据主键
*
* @param id primary key
* @return object by primary key
*/
UserServerVirtualHost selectByPrimaryKey(String id);
/**
* 更新,选择性,根据主键
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKeySelective(UserServerVirtualHost record);
/**
* 更新,根据主键
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKey(UserServerVirtualHost record);
}
... ...
package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.domain.VirtualHost;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
/**
* @author 子诚
* Description:
* 时间:2020/6/29 16:36
*/
@Mapper
public interface VirtualHostMapper {
/**
* 删除,根据主键
*
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id);
/**
* 根据服务器id,删除虚拟主机
*
* @param serverId 服务器id
* @return
*/
int deleteByServerId(String serverId);
/**
* 新增
*
* @param record the record
* @return insert count
*/
int insert(VirtualHost record);
/**
* 新增,选择性
*
* @param record the record
* @return insert count
*/
int insertSelective(VirtualHost record);
/**
* 查询,根据主键
*
* @param id primary key
* @return object by primary key
*/
VirtualHost selectByPrimaryKey(String id);
/**
* 查询,根据虚拟主机名称
*
* @param virtualHostName 虚拟主机名称
* @return
*/
VirtualHost selectByVirtualHostName(String virtualHostName);
/**
* 根据服务器id,查询虚拟主机列表
*
* @param serverId 服务器id
* @return
*/
List<VirtualHost> selectByServerId(String serverId);
/**
* 更新,选择性,根据主键
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKeySelective(VirtualHost record);
/**
* 更新,根据主键
*
* @param record the updated record
* @return update count
*/
int updateByPrimaryKey(VirtualHost record);
/**
* 查询虚拟主机列表,选择性
*
* @param virtualHost 虚拟主机,参数
* @return 虚拟主机列表
*/
List<VirtualHost> selectVirtualHostList(VirtualHost virtualHost);
/**
* 校验该服务器信息是否存在
*
* @param virtualHost {@link VirtualHost}
* @return List<VirtualHost>
*/
List<VirtualHost> validateVirtualHost(VirtualHost virtualHost);
/**
* 查询虚拟主机列表
*
* @param virtualHost {@link VirtualHost}
* @return 虚拟主机列表
*/
List<VirtualHost> getVirtualHostList(VirtualHost virtualHost);
/**
* 检验是否存在
* @param virtualHost {@link VirtualHost}
* @return List<VirtualHost>
*/
List<VirtualHost> selectVirtualHostExist(VirtualHost virtualHost);
}
... ...
package com.sunyo.wlpt.message.bus.service.rabbit.utils;
import com.sunyo.wlpt.message.bus.service.domain.BusExchange;
import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import com.sunyo.wlpt.message.bus.service.domain.UserMessageBinding;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author 子诚
* Description:
* 时间:2020/7/16 16:32
*/
@Slf4j
@RefreshScope
@Component
public class BootRabbitUtils {
@Resource
private AmqpAdmin amqpAdmin;
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 创建交换机(交换机名称,是否持久化,是否删除)
*
* @param busExchange {@link BusExchange}
*/
public void createExchange(BusExchange busExchange) {
// 类型-直连路由
String type_direct = "direct";
// 类型-动态路由
String type_topic = "topic";
// 类型-广播
String type_fanout = "fanout";
// 类型-头部
String type_headers = "headers";
// 创建交换机,直连接类型
if (type_direct.equals(busExchange.getExchangeType())) {
amqpAdmin.declareExchange(
new DirectExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
);
log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_direct);
}
// 创建交换机,扇形交换机
if (type_topic.equals(busExchange.getExchangeType())) {
amqpAdmin.declareExchange(
new TopicExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
);
log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_topic);
}
// 创建交换机,广播(主题)交换机
if (type_fanout.equals(busExchange.getExchangeType())) {
amqpAdmin.declareExchange(
new FanoutExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
);
log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_fanout);
}
// 创建交换机,首部交换机
if (type_headers.equals(busExchange.getExchangeType())) {
amqpAdmin.declareExchange(
new HeadersExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
);
log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_headers);
}
}
/**
* 创建交换机,通过 exchangeName 创建
*/
public void createExchange(String exchangeName) {
amqpAdmin.declareExchange(
new DirectExchange(exchangeName)
);
log.info("创建了交换机:{};类型:{};", exchangeName, "DirectExchange");
}
/**
* 根据交换机名称,删除虚拟机
*
* @param exchangeName 交换机名称
*/
public void deleteExchange(String exchangeName) {
boolean flag = amqpAdmin.deleteExchange(exchangeName);
}
/**
* 创建队列
*
* @param busQueue {@link BusQueue}
*/
public void createQueue(BusQueue busQueue) {
amqpAdmin.declareQueue(
new Queue(busQueue.getQueueName(), busQueue.getDurability(), false, busQueue.getAutoDelete())
);
log.info("创建了队列,队列名称->{}", busQueue.getQueueName());
}
/**
* 创建队列
*
* @param queueName 队列名称
*/
public void createQueue(String queueName) {
amqpAdmin.declareQueue(
new Queue(queueName)
);
log.info("创建了队列,队列名称->{}", queueName);
}
/**
* 删除队列,根据队列名称
*
* @param queueName 队列名称
*/
public void deleteQueue(String queueName) {
boolean flag = amqpAdmin.deleteQueue(queueName);
}
/**
* 创建绑定关系
*
* @param userMessageBinding {@link UserMessageBinding}
*/
public void createBing(UserMessageBinding userMessageBinding) {
amqpAdmin.declareBinding(
new Binding(userMessageBinding.getQueueName(), Binding.DestinationType.QUEUE, userMessageBinding.getExchangeName(), userMessageBinding.getRoutingKeyName(), null)
);
log.info("创建了绑定关系,交换机->{};队列->{};路由键->{}", userMessageBinding.getQueueName(), userMessageBinding.getExchangeName(), userMessageBinding.getRoutingKeyName());
}
/**
* 创建绑定关系
*
* @param exchangeName 交换机名称
* @param queueName 队列名称
* @param routingKeyName 路由键名称
*/
public void createBing(String exchangeName, String queueName, String routingKeyName) {
amqpAdmin.declareBinding(
new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKeyName, null)
);
log.info("创建了绑定关系,交换机->{};队列->{};路由键->{}", exchangeName, queueName, routingKeyName);
}
}
... ...
package com.sunyo.wlpt.message.bus.service.rabbit.utils;
import com.rabbitmq.http.client.Client;
import com.rabbitmq.http.client.domain.ExchangeInfo;
import com.rabbitmq.http.client.domain.ExchangeMessageStats;
import com.rabbitmq.http.client.domain.QueueInfo;
import com.rabbitmq.http.client.domain.UserPermissions;
import com.sunyo.wlpt.message.bus.service.domain.BusServer;
import com.sunyo.wlpt.message.bus.service.domain.UserInfo;
import com.sunyo.wlpt.message.bus.service.domain.VirtualHost;
import com.sunyo.wlpt.message.bus.service.domain.view.TempExchangeInfo;
import com.sunyo.wlpt.message.bus.service.domain.view.ViewExchangeInfo;
import com.sunyo.wlpt.message.bus.service.domain.view.ViewQueueInfo;
import com.sunyo.wlpt.message.bus.service.utils.AESUtils;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
/**
* @author 子诚
* Description:com.rabbitmq.http.client的封装类
* 时间:2020/8/13 17:57
*/
public class ClientUtils {
public static Client connectClient(BusServer busServer) throws IOException, URISyntaxException
{
// 服务器的IP地址
String host = busServer.getServerIp();
// 该服务器超级用户的用户名称
String superUsername = busServer.getSuperUsername();
// 该服务器超级用户的用户密码
String superPassword = AESUtils.decrypt(busServer.getSuperPassword());
// 服务器的客户端端口号
String clientPort = busServer.getClientPort().toString();
// 访问客户端的url
String url = "http://" + host + ":" + clientPort + "/api";
Client client = new Client(url, superUsername, superPassword);
return client;
}
/**
* 创建MQ用户
*
* @param userInfo 用户信息
* @param busServer {@link BusServer}
* @param password 新增用户的密码
*/
public static void addRabbitUser(UserInfo userInfo, BusServer busServer, String password) throws IOException, URISyntaxException
{
// 新增用户的用户名称
String username = userInfo.getUsername();
// 与客户端建立连接
Client client = connectClient(busServer);
ArrayList<String> list = new ArrayList<>();
// 创建用户,权限为none
client.createUser(username, password.toCharArray(), list);
}
/**
* 用户与虚拟主机建立联系
*
* @param userInfo 用户信息
* @param busServer 服务器信息
* @throws IOException
* @throws URISyntaxException
*/
public static void userRelation(UserInfo userInfo, BusServer busServer) throws IOException, URISyntaxException
{
String username = userInfo.getUsername();
String virtualHostName = userInfo.getVirtualHostName();
Client client = connectClient(busServer);
UserPermissions p = new UserPermissions();
p.setConfigure(username + ".*");
p.setRead(username + ".*");
p.setWrite(username + ".*");
client.updatePermissions(virtualHostName, username, p);
// TopicPermissions topicPermissions = new TopicPermissions();
// topicPermissions.setVhost(virtualHostName);
// topicPermissions.setExchange("");
// topicPermissions.setRead(".*");
// topicPermissions.setWrite(".*");
// client.updateTopicPermissions(virtualHostName, username, topicPermissions);
}
/**
* 删除用户
*/
public static void deleteMQUser(UserInfo userInfo, BusServer busServer) throws IOException, URISyntaxException
{
String username = userInfo.getUsername();
Client client = connectClient(busServer);
client.deleteUser(username);
}
/**
* 删除用户
*/
public static void deleteMQUser(String username, BusServer busServer) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
client.deleteUser(username);
}
/**
* 修改MQ的用户密码
*/
public static void updatePassword(BusServer busServer, String username, String password) throws IOException, URISyntaxException
{
char[] newPassword = password.toCharArray();
Client client = connectClient(busServer);
ArrayList<String> tags = new ArrayList<>();
client.updateUser(username, newPassword, tags);
}
/**
* 创建虚拟主机
*/
public static void createVirtualHost(BusServer busServer, String vHost) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
client.createVhost(vHost);
}
/**
* 创建虚拟主机
*/
public static void createVirtualHost(BusServer busServer, VirtualHost vHost) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
client.createVhost(vHost.getVirtualHostName(), vHost.getDescription());
}
/**
* 删除虚拟主机
*/
public static void deleteVirtualHost(BusServer busServer, String vHost) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
client.deleteVhost(vHost);
}
/**
* 清楚用户与虚拟主机之间的关系
*/
public static void clearPermissions(BusServer busServer, String vHost, String username) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
client.clearPermissions(vHost, username);
client.clearTopicPermissions(vHost, username);
}
/**
* 获取MQ界面的队列信息,重载
*
* @param busServer MQ服务器
* @return
* @throws IOException
* @throws URISyntaxException
*/
public static List<ViewQueueInfo> getViewQueues(BusServer busServer) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
List<QueueInfo> queues = client.getQueues();
return reformatQueueInfo(busServer.getServerName(), queues);
}
/**
* 队列信息,重新格式化
*
* @param serverName MQ服务器名称
* @param queues 队列信息 {@link QueueInfo}
* @return
*/
public static List<ViewQueueInfo> reformatQueueInfo(String serverName, List<QueueInfo> queues)
{
List<ViewQueueInfo> list = new ArrayList<>();
// 将获取到的队列信息,拼接一个属性,服务器名称
for (QueueInfo queueInfo : queues) {
ViewQueueInfo viewQueueInfo = ViewQueueInfo.builder().queueInfo(queueInfo).serverName(serverName).build();
list.add(viewQueueInfo);
}
return list;
}
/**
* 获取MQ界面的队列信息,重载
*
* @param busServer MQ服务器
* @param vHostName 虚拟主机名称
* @return
* @throws IOException
* @throws URISyntaxException
*/
public static List<ViewQueueInfo> getViewQueues(BusServer busServer, String vHostName) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
List<QueueInfo> queues = client.getQueues(vHostName);
return reformatQueueInfo(busServer.getServerName(), queues);
}
/**
* 获取MQ界面的队列信息,重载
*
* @param busServer MQ服务器信息
* @param vHostName 虚拟主机
* @param queueName 队列名称
* @return
* @throws IOException
* @throws URISyntaxException
*/
public ViewQueueInfo getViewQueues(BusServer busServer, String vHostName, String queueName) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
QueueInfo queue = client.getQueue(vHostName, queueName);
ViewQueueInfo viewQueueInfo = ViewQueueInfo.builder().serverName(busServer.getServerName()).queueInfo(queue).build();
return viewQueueInfo;
}
/**
* 获取MQ界面的交换机信息,重载
*
* @param busServer MQ服务器信息
* @return
* @throws IOException
* @throws URISyntaxException
*/
public static List<ViewExchangeInfo> getViewExchanges(BusServer busServer) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
List<ExchangeInfo> exchanges = client.getExchanges();
List<ViewExchangeInfo> viewExchangeInfos = reformatExchangeInfo(busServer.getServerName(), exchanges);
return viewExchangeInfos;
}
public static List<ViewExchangeInfo> reformatExchangeInfo(String serverName, List<ExchangeInfo> exchanges)
{
List<ViewExchangeInfo> list = new ArrayList<>();
// 将获取到的队列信息,拼接一个属性,服务器名称
for (ExchangeInfo exchangeInfo : exchanges) {
TempExchangeInfo tempExchangeInfo = exchangeInfoToTemp(exchangeInfo);
ViewExchangeInfo viewExchangeInfo = ViewExchangeInfo.builder().serverName(serverName).tempExchangeInfo(tempExchangeInfo).build();
list.add(viewExchangeInfo);
}
return list;
}
/**
* mybatis封装的反射(根据属性名和对象,获取属性的值)
*
* @param fieldName 属性名
* @param object 对象
* @return
*/
private static Object getFieldValueByFieldName(String fieldName, Object object)
{
MetaObject metaObject = SystemMetaObject.forObject(object);
Object value = metaObject.getValue(fieldName);
return value;
}
/**
* 将ExchangeInfo转换成TempExchangeInfo
*
* @param exchangeInfo ExchangeInfo
* @return
*/
private static TempExchangeInfo exchangeInfoToTemp(ExchangeInfo exchangeInfo)
{
ExchangeMessageStats messageStats = (ExchangeMessageStats) getFieldValueByFieldName("messageStats", exchangeInfo);
TempExchangeInfo temp = new TempExchangeInfo();
temp.setVhost(exchangeInfo.getVhost());
temp.setName(exchangeInfo.getName());
temp.setType(exchangeInfo.getType());
temp.setDurable(exchangeInfo.isDurable());
temp.setAutoDelete(exchangeInfo.isAutoDelete());
temp.setInternal(exchangeInfo.isInternal());
temp.setArguments(exchangeInfo.getArguments());
temp.setMessageStats(messageStats);
return temp;
}
/**
* 获取MQ界面的交换机信息,重载
*
* @param busServer MQ服务器信息
* @param vHostName 虚拟主机
* @return
* @throws IOException
* @throws URISyntaxException
*/
public static List<ViewExchangeInfo> getViewExchanges(BusServer busServer, String vHostName) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
List<ExchangeInfo> exchanges = client.getExchanges(vHostName);
List<ViewExchangeInfo> viewExchangeInfos = reformatExchangeInfo(busServer.getServerName(), exchanges);
return viewExchangeInfos;
}
/**
* 获取MQ界面的交换机信息,重载
*
* @param busServer MQ服务器信息
* @param vHostName 虚拟主机
* @param exchangeName 交换机名称
* @return
* @throws IOException
* @throws URISyntaxException
*/
public static ViewExchangeInfo getViewExchanges(BusServer busServer, String vHostName, String exchangeName) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
ExchangeInfo exchangeInfo = client.getExchange(vHostName, exchangeName);
TempExchangeInfo tempExchangeInfo = exchangeInfoToTemp(exchangeInfo);
ViewExchangeInfo viewExchangeInfo = ViewExchangeInfo.builder().serverName(busServer.getServerName()).tempExchangeInfo(tempExchangeInfo).build();
return viewExchangeInfo;
}
}
... ...
package com.sunyo.wlpt.message.bus.service.rabbit.utils;
import com.rabbitmq.client.*;
import com.sunyo.wlpt.message.bus.service.domain.XmlData;
import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.utils.AESUtils;
import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import io.netty.util.internal.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author 子诚
* Description:
* 时间:2020/7/21 9:32
*/
@Slf4j
@Component
public class DirectUtils {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 链接 RabbitMQ
*
* @param hostIp mq服务器Ip地址
* @param hostPort mq服务器端口号
* @param vHostName VirtualHost名称
* @param userName 登录账号
* @param password 登录密码
* @return 返回链接
* @throws Exception
*/
public static Connection getConnection(String hostIp, int hostPort, String vHostName, String userName, String password) throws IOException, TimeoutException
{
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost(hostIp);
//端口
factory.setPort(hostPort);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost(vHostName);
factory.setUsername(userName);
factory.setPassword(AESUtils.decrypt(password));
// 通过工程获取连接
return factory.newConnection();
}
/**
* 关闭通道和关闭连接的工具方法
*
* @param channel 通道
* @param conn 连接
*/
public static void closeConnectionAndChanel(Channel channel, Connection conn)
{
try {
if (channel != null) {
channel.close();
}
if (conn != null) {
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public ResultJson sendMessage(XmlData xmlData) throws IOException, TimeoutException
{
/**
* 可以在这里根据类型的不同,进行不同的消息发送
*/
return directProducer(xmlData);
}
/**
* 发送消息,使用中
*
* @param xmlData {@link XmlData}
* @return
* @throws Exception
*/
public ResultJson directProducer(XmlData xmlData) throws IOException, TimeoutException
{
// 1、创建Connection
Connection connection = getConnection(xmlData.getServerIp(), xmlData.getServerPort(),
xmlData.getVirtualHostName(), xmlData.getSuperUsername(), xmlData.getSuperPassword());
// 2、 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
// 3、开启消息的确认机制(confirm:保证消息能够发送到 exchange)
channel.confirmSelect();
// 4、避免消息被重复消费
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
// 指定消息是否需要持久化,2:持久化;1.非持久化
.deliveryMode(2)
// 设置全局唯一消息机制id(雪花id)
.messageId(IdUtils.generateId())
.build();
// 5、开启 return 机制(保证消息,从 Exchange 分发到 Queue )
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException
{
// 当消息没有从 Exchange 分发到 Queue 时,才会执行
log.error(new String(body, "UTF8") + "->没有从 Exchange 分发到Queue中");
}
});
// 6、发送消息,并指定 mandatory 参数为true
channel.basicPublish(xmlData.getExchangeName(), xmlData.getRoutingKeyName(), true, properties,
xmlData.getSendContent().getBytes());
log.info("消息生产者,目标交换机:{};路由键:{};发送信息:{}", xmlData.getExchangeName(), xmlData.getRoutingKeyName(),
xmlData.getSendContent().getBytes());
// 7、添加一个异步 confirm 确认监听,用于发送消息到Broker端之后,回送消息的监听
channel.addConfirmListener(new ConfirmListener() {
// 发送成功
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException
{
log.info("消息发送成功,标识:{};是否是批量:{}", deliveryTag, multiple);
}
// 发送失败
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException
{
log.error("消息发送失败,标识:{};是否是批量:{}", deliveryTag, multiple);
}
});
// finally,关闭连接
closeConnectionAndChanel(channel, connection);
return ResultJson.success(CustomExceptionType.MESSAGE_SUCCESS);
}
/**
* byte字节数组 转 String
*
* @param byteArray 字节数组
* @return
*/
public static String byteArrayToStr(byte[] byteArray)
{
if (byteArray == null) {
return null;
}
String str = new String(byteArray);
return str;
}
/**
* DirectExchange的 消息消费者(推模式)
*
* @throws IOException IO异常
* @throws TimeoutException 超时异常
*/
public List<String> directConsumerByPush(XmlData xmlData) throws IOException, TimeoutException
{
List<String> list = new ArrayList<>();
// 1、创建ConnectionFactory
Connection connection = getConnection(xmlData.getServerIp(), xmlData.getServerPort(),
xmlData.getVirtualHostName(), xmlData.getSuperUsername(), xmlData.getSuperPassword());
// 2、 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
// 一次只接受一条未确认的消息
channel.basicQos(1);
// 4、开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
try {
// 0、获取出全局唯一的 信息业务id(messageId)
String messageId = properties.getMessageId();
// 必须保证 messageId 不为空,避免空指针异常
if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.MINUTES)) {
// 消费成功,将redis中的 messageId 对应的value修改为 1
redisTemplate.opsForValue().set(messageId, "1", 1, TimeUnit.MINUTES);
// 手动ack
channel.basicAck(envelope.getDeliveryTag(), false);
log.info("接收到消息:" + new String(body, "UTF-8"));
String message = new String(body, "UTF-8");
list.add(message);
} else {
// 获取redis中的value,如果是1,就手动ack。如果是0,就什么也不做(是0代表着,正在被消费中)
if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {
// 手动ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
} catch (Exception e) {
// 手动ack
channel.basicAck(envelope.getDeliveryTag(), false);
log.error("接收消息发送错误:" + e.getMessage());
}
}
};
// 消费消息
channel.basicConsume(xmlData.getQueueName(), false, consumer);
return list;
}
/**
* DirectExchange的 消息消费者(拉模式)
*
* @throws IOException IO异常
* @throws TimeoutException 超时异常
*/
public ResultJson directConsumerByPull(XmlData xmlData) throws IOException, TimeoutException
{
List<String> list = new ArrayList<>();
Connection connection = getConnection(xmlData.getServerIp(), xmlData.getServerPort(), xmlData.getVirtualHostName(), xmlData.getSuperUsername(), xmlData.getSuperPassword());
Channel channel = connection.createChannel();
channel.basicQos(1);
GetResponse response = channel.basicGet(xmlData.getQueueName(), false);
if (response != null && response.getMessageCount() >= 0) {
String messageId = response.getProps().getMessageId();
if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 1, TimeUnit.MINUTES)) {
redisTemplate.opsForValue().set(messageId, "1", 1, TimeUnit.MINUTES);
String data = byteArrayToStr(response.getBody());
list.add(data);
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
}
}
}
String message = list.get(0);
return StringUtil.isNullOrEmpty(message)
? ResultJson.error(CustomExceptionType.RECEIVE_SERVER_EXCEPTION)
: ResultJson.success(CustomExceptionType.RECEIVE_SUCCESS, message);
}
/**
* DirectExchange的 消息消费者(拉模式、批量拉取)
*/
public ResultJson directConsumerByPullMore(XmlData xmlData, Integer count) throws IOException, TimeoutException
{
List<String> list = new ArrayList<>();
String serverIp = xmlData.getServerIp();
Integer serverPort = xmlData.getServerPort();
String virtualHostName = xmlData.getVirtualHostName();
String superUsername = xmlData.getSuperUsername();
String superPassword = xmlData.getSuperPassword();
Connection connection = getConnection(serverIp, serverPort, virtualHostName, superUsername, superPassword);
Channel channel = connection.createChannel();
channel.basicQos(1);
while (list.size() < count) {
GetResponse response = channel.basicGet(xmlData.getQueueName(), false);
if (response != null) {
if (response.getMessageCount() >= 0) {
String messageId = response.getProps().getMessageId();
if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 1, TimeUnit.MINUTES)) {
redisTemplate.opsForValue().set(messageId, "1", 1, TimeUnit.MINUTES);
String data = byteArrayToStr(response.getBody());
list.add(data);
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
}
}
}
} else {
break;
}
}
int size = list.size();
if (0 < size && size < count) {
return new ResultJson<>("20200", "接收消息,成功!但是,该队列内只有" + size + "条消息", list);
}
return size > 0
? ResultJson.success(CustomExceptionType.RECEIVE_SUCCESS, list)
: ResultJson.error(CustomExceptionType.RECEIVE_SERVER_EXCEPTION);
}
}
... ...