作者 王勇

增加ElasticSearch操作messagenote

正在显示 32 个修改的文件 包含 696 行增加72 行删除
... ... @@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<version>2.2.5.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.sunyo.wlpt.message.bus.service</groupId>
... ... @@ -19,7 +19,8 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-cloud.version>Hoxton.RELEASE</spring-cloud.version>
<!-- <elasticsearch.version>6.8.0</elasticsearch.version>-->
<!-- springboot 2.2.1默认的es版本是6.8.4,下面的es的版本要和ES的版本一致 -->
<elasticsearch.version>7.4.0</elasticsearch.version>
</properties>
<dependencies>
... ... @@ -46,7 +47,6 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<!-- SpringBoot end -->
<!-- SpringCloud start -->
<dependency>
... ... @@ -168,12 +168,12 @@
<dependencyManagement>
<dependencies>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-dependencies</artifactId>-->
<!-- <version>${spring-boot.version}</version>-->
<!-- <type>pom</type>-->
<!-- <scope>import</scope>-->
<!-- </dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-dependencies</artifactId>-->
<!-- <version>${spring-boot.version}</version>-->
<!-- <type>pom</type>-->
<!-- <scope>import</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
... ...
package com.sunyo.wlpt.message.bus.service.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
/**
* @author 子诚
* Description:ES的配置文件
* 时间:2020/8/5 10:23
*/
@Configuration
public class ElasticSearchConfig extends AbstractElasticsearchConfiguration {
@Override
@Bean
public RestHighLevelClient elasticsearchClient()
{
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
// 天生契合集群,有几个es环境,就 new HttpHost 几个,用,相隔
new HttpHost("192.168.37.139", 9200, "http")
)
);
return client;
}
@Bean
public ElasticsearchRestTemplate elasticsearchRestTemplate() {
return new ElasticsearchRestTemplate(elasticsearchClient());
}
}
... ...
package com.sunyo.wlpt.message.bus.service.controller;
import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.domain.*;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.*;
import com.sunyo.wlpt.message.bus.service.service.impl.ElasticsearchService;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.util.Date;
/**
... ... @@ -43,6 +44,9 @@ public class MessageNoteController {
@Resource
private SchedulingDeleteService schedulingDeleteService;
@Resource
private ElasticsearchService elasticsearchService;
/**
* 分页查询,消息收发记录
*
... ... @@ -50,28 +54,26 @@ public class MessageNoteController {
* @param serverName MQ服务器名称
* @param virtualHostName 虚拟主机名称
* @param exchangeName 交换机名称
* @param queueName 队列名称
* @param routingKeyName 路由键名称
* @param sendTime 发送消息时间
* @param receiveTime 接收消息时间
* @param sendTimeBegin 查询时间段,开始
* @param sendTimeEnd 查询时间段,结束
* @param pageNum 当前页数,默认 1
* @param pageSize 每页条数,默认 10
* @return 消息收发记录-列表
*/
@GetMapping("/list")
public ResultJson selectMessageNoteList(
public ResultJson selectMessageNoteListFromElasticSearch(
@RequestParam(value = "username", required = false) String username,
@RequestParam(value = "serverName", required = false) String serverName,
@RequestParam(value = "virtualHostName", required = false) String virtualHostName,
@RequestParam(value = "exchangeName", required = false) String exchangeName,
@RequestParam(value = "queueName", required = false) String queueName,
@RequestParam(value = "routingKeyName", required = false) String routingKeyName,
@DateTimeFormat(pattern = "yyyy-MM-dd")
@RequestParam(value = "sendTime", required = false) Date sendTime,
@RequestParam(value = "sendTimeBegin", required = false) Date sendTimeBegin,
@DateTimeFormat(pattern = "yyyy-MM-dd")
@RequestParam(value = "receiveTime", required = false) Date receiveTime,
@RequestParam(value = "sendTimeEnd", required = false) Date sendTimeEnd,
@RequestParam(value = "pageNum", defaultValue = "1") Integer pageNum,
@RequestParam(value = "pageSize", defaultValue = "10") Integer pageSize)
@RequestParam(value = "pageSize", defaultValue = "10") Integer pageSize) throws IOException
{
// 获取查询参数
MessageNote messageNote = MessageNote.builder()
... ... @@ -79,16 +81,11 @@ public class MessageNoteController {
.serverName(serverName)
.virtualHostName(virtualHostName)
.exchangeName(exchangeName)
.queueName(queueName)
.routingKeyName(routingKeyName)
.sendTime(sendTime)
.receiveTime(receiveTime)
.sendTimeBegin(sendTimeBegin)
.sendTimeEnd(sendTimeEnd)
.build();
// 分页查询
PageInfo pageInfo = messageNoteService.selectMessageNoteList(messageNote, pageNum, pageSize);
return pageInfo.getTotal() > 0
? new ResultJson<>("200", "查询MQ服务器列表,成功!", pageInfo)
: new ResultJson<>("500", "查询MQ服务器列表,失败!");
return elasticsearchService.selectMessageNoteList(messageNote, pageNum, pageSize);
}
/**
... ... @@ -100,9 +97,15 @@ public class MessageNoteController {
@DeleteMapping("/delete")
public ResultJson deleteMessageNote(@RequestBody MessageNote messageNote)
{
return messageNoteService.deleteByPrimaryKey(messageNote.getId()) > 0
? new ResultJson<>("200", "删除-消息收发记录,成功")
: new ResultJson<>("500", "删除-消息收发记录,失败");
int num = messageNoteService.deleteByPrimaryKey(messageNote.getId());
if (num > 0) {
if (num == 2) {
return new ResultJson<>("200", "删除ES-消息收发记录,成功");
}
return new ResultJson<>("200", "删除-消息收发记录,成功");
} else {
return new ResultJson<>("500", "删除-消息收发记录,失败");
}
}
/**
... ... @@ -114,9 +117,15 @@ public class MessageNoteController {
@GetMapping("/batchRemove")
public ResultJson batchRemoveMessageNote(String ids)
{
return messageNoteService.deleteByPrimaryKey(ids) > 0
? new ResultJson<>("200", "删除-消息收发记录,成功")
: new ResultJson<>("500", "删除-消息收发记录,失败");
int num = messageNoteService.deleteByPrimaryKey(ids);
if (num > 0) {
if (num == 2) {
return new ResultJson<>("200", "批量删除ES-消息收发记录,成功");
}
return new ResultJson<>("200", "批量删除-消息收发记录,成功");
} else {
return new ResultJson<>("500", "批量删除-消息收发记录,失败");
}
}
/**
... ...
... ... @@ -104,12 +104,7 @@ public class RabbitController {
if (!binding) {
return ResultJson.error(CustomExceptionType.BINDING_ERROR);
}
// 4、mq发送消息,数据库中保存消息
// ResultJson result = directUtils.sendMessage(sentData);
// if (CustomExceptionType.MESSAGE_SUCCESS.getCode().equals(result.getCode())) {
// // mq发送消息成功之后,将消息存储于数据库
// messageNoteService.insertMessageSelective(sentData);
// }
// 4、mq发送消息,数据库中保存消息并保存至ES
return sendAndSave(sentData);
}
... ...
package com.sunyo.wlpt.message.bus.service.domain;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.io.Serializable;
import java.util.Date;
... ... @@ -18,6 +23,7 @@ import java.util.Date;
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(indexName = "message_note")
public class MessageNote implements Serializable {
private static final long serialVersionUID = -2119333801860569470L;
... ... @@ -25,73 +31,104 @@ public class MessageNote implements Serializable {
/**
* 消息收发记录表的ID
*/
@Id
private String id;
/**
* 用户的ID
*/
@Field(type = FieldType.Text)
private String userId;
/**
* 所属用户登陆名称
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String username;
/**
* 所属服务器的ID
*/
@Field(type = FieldType.Text)
private String serverId;
/**
* 所属服务器名称
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String serverName;
/**
* 所属虚拟主机的ID
*/
@Field(type = FieldType.Text)
private String virtualHostId;
/**
* 所属虚拟主机名称
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String virtualHostName;
/**
* 所属交换机的ID
*/
@Field(type = FieldType.Text)
private String exchangeId;
/**
* 所属交换机名称
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String exchangeName;
/**
* 所属队列的ID
*/
@Field(type = FieldType.Text)
private String queueId;
/**
* 所属队列名称
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String queueName;
/**
* 所属路由键的ID
*/
@Field(type = FieldType.Text)
private String routingKeyId;
/**
* 所属路由键的名称
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String routingKeyName;
/**
* 消息发送时间
*/
@Field(type = FieldType.Date)
@JsonFormat(timezone = "GMT+8")
private Date sendTime;
@Field(type = FieldType.Text)
private String alias_sendTime;
/**
* 查询时间段,开始时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date sendTimeBegin;
/**
* 查询时间段,结束时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date sendTimeEnd;
/**
* 消息获取时间
*/
... ... @@ -105,16 +142,20 @@ public class MessageNote implements Serializable {
/**
* 发送消息内容,别名
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String alias_sendContent;
/**
* 相关描述
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String description;
/**
* 创建时间
*/
@Field(type = FieldType.Date)
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date gmtCreate;
/**
... ... @@ -134,7 +175,8 @@ public class MessageNote implements Serializable {
* @param sendTime 发送消息时间
* @param receiveTime 接收消息时间
*/
public MessageNote(String username, String serverName, String virtualHostName, String exchangeName, String queueName, String routingKeyName, Date sendTime, Date receiveTime) {
public MessageNote(String username, String serverName, String virtualHostName, String exchangeName, String queueName, String routingKeyName, Date sendTime, Date receiveTime)
{
this.username = username;
this.serverName = serverName;
this.virtualHostName = virtualHostName;
... ...
package com.sunyo.wlpt.message.bus.service.elasticsearch.dao;
import com.sunyo.wlpt.message.bus.service.domain.MessageNote;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
/**
* @author 子诚
* Description:
* 时间:2020/8/5 14:43
*/
public interface MessageNoteRepository extends ElasticsearchRepository<MessageNote, String> {
}
... ...
... ... @@ -53,6 +53,14 @@ public interface BusExchangeMapper {
BusExchange selectByPrimaryKey(String id);
/**
* 查询,根据交换机名称
*
* @param exchangeName 交换机名称
* @return
*/
BusExchange selectByExchangeName(String exchangeName);
/**
* 更新,选择性,根据主键
*
* @param record the updated record
... ...
... ... @@ -47,6 +47,14 @@ public interface BusServerMapper {
BusServer selectByPrimaryKey(String id);
/**
* 查询,根据服务器名称
*
* @param serverName 服务器名称
* @return
*/
BusServer selectByServerName(String serverName);
/**
* 查询服务器列表,选择性
*
* @param busServer 服务器以及参数
... ...
... ... @@ -47,6 +47,14 @@ public interface RoutingKeyMapper {
RoutingKey selectByPrimaryKey(String id);
/**
* 查询,根据路由键名称
*
* @param routingKeyName 路由键名称
* @return
*/
RoutingKey selectByRoutingKeyName(String routingKeyName);
/**
* 更新,选择性,根据主键
*
* @param record the updated record
... ...
... ... @@ -46,6 +46,14 @@ public interface UserInfoMapper {
UserInfo selectByPrimaryKey(String id);
/**
* 查询,根据用户名称
*
* @param username 用户登录名称
* @return {@link UserInfo}
*/
UserInfo selectByUsername(String username);
/**
* update record selective
*
* @param record the updated record
... ...
... ... @@ -54,6 +54,14 @@ public interface VirtualHostMapper {
VirtualHost selectByPrimaryKey(String id);
/**
* 查询,根据虚拟主机名称
*
* @param virtualHostName 虚拟主机名称
* @return
*/
VirtualHost selectByVirtualHostName(String virtualHostName);
/**
* 根据服务器id,查询虚拟主机列表
*
* @param serverId 服务器id
... ...
... ... @@ -41,6 +41,8 @@ public class ResultJson<T> implements Serializable {
*/
private String jwtToken;
private Integer total;
/**
* 无参,构造方法
*/
... ... @@ -74,6 +76,14 @@ public class ResultJson<T> implements Serializable {
this.data = data;
}
public ResultJson(String code, String msg, T data, Integer total)
{
this.code = code;
this.msg = msg;
this.data = data;
this.total = total;
}
/**
* 定义静态、成功方法(重载)
*
... ... @@ -103,10 +113,12 @@ public class ResultJson<T> implements Serializable {
{
return new ResultJson<>("200", message, data);
}
public static ResultJson success(CustomExceptionType customExceptionType)
{
return new ResultJson<>(customExceptionType.getCode(), customExceptionType.getMsg());
}
/**
* 请求出现异常时的响应数据封装
*
... ...
... ... @@ -48,6 +48,14 @@ public interface BusExchangeService {
BusExchange selectByPrimaryKey(String id);
/**
* 查询,根据交换机名称
*
* @param exchangeName 交换机名称
* @return
*/
BusExchange selectByExchangeName(String exchangeName);
/**
* 更新,选择性,根据主键
*
* @param record the updated record
... ... @@ -112,4 +120,5 @@ public interface BusExchangeService {
* @return
*/
List<BusExchange> selectByVirtualHostId(String virtualHostId);
}
... ...
... ... @@ -47,6 +47,14 @@ public interface BusServerService {
BusServer selectByPrimaryKey(String id);
/**
* 查询,根据服务器名称
*
* @param serverName 服务器名称
* @return
*/
BusServer selectByServerName(String serverName);
/**
* 更新,选择性,根据主键
*
* @param record the updated record
... ... @@ -113,8 +121,9 @@ public interface BusServerService {
/**
* 查询服务器名称是否存在
* @param serverName 服务器名称
*
* @param serverName 服务器名称
* @return true or false
*/
List<BusServer> selectServerExist(String serverName);
List<BusServer> selectServerExist(String serverName);
}
... ...
... ... @@ -56,6 +56,14 @@ public interface RoutingKeyService {
RoutingKey selectByPrimaryKey(String id);
/**
* 查询,根据路由键名称
*
* @param routingKeyName 路由键名称
* @return
*/
RoutingKey selectByRoutingKeyName(String routingKeyName);
/**
* 根据exchangeID查询路由键
*
* @param exchangeId 交换机id
... ... @@ -112,4 +120,5 @@ public interface RoutingKeyService {
* @return List<RoutingKey>
*/
List<RoutingKey> selectRoutingKeyExist(RoutingKey routingKey);
}
... ...
... ... @@ -44,6 +44,14 @@ public interface UserInfoService {
UserInfo selectByPrimaryKey(String id);
/**
* 查询,根据用户名称
*
* @param username 用户登录名称
* @return {@link UserInfo}
*/
UserInfo selectByUsername(String username);
/**
* 更新,选择性,根据主键
*
* @param record the updated record
... ... @@ -73,6 +81,8 @@ public interface UserInfoService {
* @return List<UserInfo>
*/
List<UserInfo> selectUserExist(String username);
}
... ...
... ... @@ -47,6 +47,14 @@ public interface VirtualHostService {
VirtualHost selectByPrimaryKey(String id);
/**
* 查询,根据虚拟主机名称
*
* @param virtualHostName 虚拟主机名称
* @return
*/
VirtualHost selectByVirtualHostName(String virtualHostName);
/**
* 更新,选择性,根据主键
*
* @param record the updated record
... ... @@ -111,4 +119,6 @@ public interface VirtualHostService {
* @return
*/
int deleteByServerId(String serverId);
}
... ...
... ... @@ -101,6 +101,12 @@ public class BusExchangeServiceImpl implements BusExchangeService {
}
@Override
public BusExchange selectByExchangeName(String exchangeName)
{
return busExchangeMapper.selectByExchangeName(exchangeName);
}
@Override
public int updateByPrimaryKeySelective(BusExchange record)
{
return busExchangeMapper.updateByPrimaryKeySelective(record);
... ...
... ... @@ -112,6 +112,12 @@ public class BusServerServiceImpl implements BusServerService {
}
@Override
public BusServer selectByServerName(String serverName)
{
return busServerMapper.selectByServerName(serverName);
}
@Override
public int updateByPrimaryKeySelective(BusServer record)
{
return busServerMapper.updateByPrimaryKeySelective(record);
... ...
package com.sunyo.wlpt.message.bus.service.service.impl;
import com.sunyo.wlpt.message.bus.service.domain.MessageNote;
import com.sunyo.wlpt.message.bus.service.elasticsearch.dao.MessageNoteRepository;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.utils.DateUtils;
import io.netty.util.internal.StringUtil;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* @author 子诚
* Description:
* 时间:2020/8/5 14:08
*/
@Component
public class ElasticsearchService {
/**
* 这里不能使用@Resource,必须使用@Autowired
* 原因:I don`t know
*/
@Resource
private ElasticsearchRestTemplate elasticsearchRestTemplate;
@Resource
private RestHighLevelClient restHighLevelClient;
@Resource
private MessageNoteRepository messageNoteRepository;
public ResultJson selectMessageNoteList(MessageNote messageNote, Integer pageNum, Integer pageSize) throws IOException
{
// 条件搜索
SearchRequest searchRequest = new SearchRequest("message_note");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 高亮设置
HighlightBuilder highlightBuilder = new HighlightBuilder();
// 高亮字段,不设置
highlightBuilder.field("*").requireFieldMatch(false);
/*高亮前缀标签
.preTags("<span style='color=green'>")
// 高亮后缀标签
.postTags("</span>");
*/
// 分页
sourceBuilder
.from((pageNum - 1) * pageSize)
.size(pageSize)
// 检索所有字段
.postFilter(QueryBuilders.matchAllQuery())
.sort("sendTime", SortOrder.DESC)
.highlighter(new HighlightBuilder().
field("*").
requireFieldMatch(false));
if (StringUtil.isNullOrEmpty(messageNote.getUsername())
&& StringUtil.isNullOrEmpty(messageNote.getServerName())
&& StringUtil.isNullOrEmpty(messageNote.getVirtualHostName())
&& StringUtil.isNullOrEmpty(messageNote.getExchangeName())
&& StringUtil.isNullOrEmpty(messageNote.getRoutingKeyName())
&& messageNote.getSendTimeBegin() == null
&& messageNote.getSendTimeEnd() == null) {
sourceBuilder.query(QueryBuilders.matchAllQuery());
} else {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (!StringUtil.isNullOrEmpty(messageNote.getUsername())) {
WildcardQueryBuilder usernameQuery = QueryBuilders.wildcardQuery("username", messageNote.getUsername());
boolQueryBuilder.must(usernameQuery);
}
if (!StringUtil.isNullOrEmpty(messageNote.getServerName())) {
WildcardQueryBuilder serverNameQuery = QueryBuilders.wildcardQuery("serverName", messageNote.getServerName());
boolQueryBuilder.must(serverNameQuery);
}
if (!StringUtil.isNullOrEmpty(messageNote.getVirtualHostName())) {
WildcardQueryBuilder virtualHostNameQuery = QueryBuilders.wildcardQuery("virtualHostName", messageNote.getVirtualHostName());
boolQueryBuilder.must(virtualHostNameQuery);
}
if (!StringUtil.isNullOrEmpty(messageNote.getExchangeName())) {
WildcardQueryBuilder exchangeNameQuery = QueryBuilders.wildcardQuery("exchangeName", messageNote.getExchangeName());
boolQueryBuilder.must(exchangeNameQuery);
}
if (!StringUtil.isNullOrEmpty(messageNote.getRoutingKeyName())) {
WildcardQueryBuilder routingKeyNameQuery = QueryBuilders.wildcardQuery("routingKeyName", messageNote.getRoutingKeyName());
boolQueryBuilder.must(routingKeyNameQuery);
}
if (messageNote.getSendTimeBegin() != null && messageNote.getSendTimeEnd() != null) {
RangeQueryBuilder sendTimeQuery = QueryBuilders.rangeQuery("sendTime")
.from(messageNote.getSendTimeBegin(), true)
.to(DateUtils.addDays(messageNote.getSendTimeEnd(), 1), true);
boolQueryBuilder.must(sendTimeQuery);
}
if (messageNote.getSendTimeBegin() != null && messageNote.getSendTimeEnd() == null) {
RangeQueryBuilder sendTimeQuery = QueryBuilders.rangeQuery("sendTime")
.gte(messageNote.getSendTimeBegin());
boolQueryBuilder.must(sendTimeQuery);
}
if (messageNote.getSendTimeBegin() == null && messageNote.getSendTimeEnd() != null) {
RangeQueryBuilder sendTimeQuery = QueryBuilders.rangeQuery("sendTime")
.lte(DateUtils.addDays(messageNote.getSendTimeEnd(), 1));
boolQueryBuilder.must(sendTimeQuery);
}
sourceBuilder.query(boolQueryBuilder);
}
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
// 执行搜索
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 解析结果
List<Map<String, Object>> list = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
// 原来的结果
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
sourceAsMap.get("sendTime");
System.out.println(sourceAsMap.get("sendTime"));
/* 解析高亮的字段
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
HighlightField username = highlightFields.get("username");
if (username != null) {
// 取出该字段
Text[] fragments = username.fragments();
// 将高亮字段替换原来没有高亮的字段
String user_name = "";
for (Text text : fragments) {
user_name += text; }
// 将高亮字段替换原来的内容
sourceAsMap.put("username", user_name);
}
*/
list.add(sourceAsMap);
}
Integer total = Math.toIntExact(searchResponse.getHits().getTotalHits().value);
return total > 0
? new ResultJson<>("200", "ES分页查询成功", list, total)
: new ResultJson<>("500", "ES分页查询失败");
}
/**
* 保存或者更新一条文档,于ES服务器中
*
* @param messageNote {@link MessageNote}
*/
public void saveOrUpdateMessageNote(MessageNote messageNote)
{
// id,存在即是更新;id不存在即是保存
messageNoteRepository.save(messageNote);
}
/**
* 根据Id,删除ES中的一条文档
*
* @param id 消息记录id
*/
public void deleteMessageNoteById(String id)
{
messageNoteRepository.deleteById(id);
}
/**
* 清空所有文档
*/
public void deleteAllMessageNote()
{
messageNoteRepository.deleteAll();
}
/**
* 根据id查询
*/
public void selectById(String id)
{
Optional<MessageNote> optionalMessageNote = messageNoteRepository.findById(id);
MessageNote messageNote = optionalMessageNote.get();
}
/**
* 查询所有
*/
public List<MessageNote> selectAll()
{
Iterable<MessageNote> gets = messageNoteRepository.findAll(Sort.by(Sort.Order.desc("gmtCreate")));
List<MessageNote> list = new ArrayList<>();
for (MessageNote get : gets) {
list.add(get);
}
return list;
}
/**
* 分页查询
*
* @param pageNum 当前页-1
* @param pageSize 每页数量
* @return
*/
public List<MessageNote> selectByPage(Integer pageNum, Integer pageSize)
{
List<MessageNote> list = new ArrayList<>();
Page<MessageNote> pages = messageNoteRepository.search(QueryBuilders.matchAllQuery(), PageRequest.of(pageNum - 1,
pageSize));
for (MessageNote note : pages) {
list.add(note);
}
return list;
}
}
... ...
... ... @@ -7,6 +7,7 @@ import com.sunyo.wlpt.message.bus.service.mapper.MessageNoteMapper;
import com.sunyo.wlpt.message.bus.service.mapper.UserMessageBindingMapper;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.DirectUtils;
import com.sunyo.wlpt.message.bus.service.service.*;
import com.sunyo.wlpt.message.bus.service.utils.DateUtils;
import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
... ... @@ -49,6 +50,9 @@ public class MessageNoteServiceImpl implements MessageNoteService {
private MessageNoteMapper messageNoteMapper;
@Resource
private ElasticsearchService elasticsearchService;
@Resource
private DirectUtils directUtils;
@Override
... ... @@ -56,24 +60,25 @@ public class MessageNoteServiceImpl implements MessageNoteService {
public int deleteByPrimaryKey(String id)
{
// 判断删除的个数,需被删除的个数是否一致
int index = 0;
String splitItem = ",";
//如果id,传过来多个,以','分割,即批量删除
if (id.contains(splitItem)) {
String[] split = id.split(splitItem);
for (int i = 0; i < split.length; i++) {
int num = messageNoteMapper.deleteByPrimaryKey(split[i]);
if (num > 0) {
index = index + num;
MessageNote messageNote = messageNoteMapper.selectByPrimaryKey(split[i]);
if (messageNote != null) {
messageNoteMapper.deleteByPrimaryKey(split[i]);
}
elasticsearchService.deleteMessageNoteById(split[i]);
}
if (index == split.length) {
return 1;
} else {
return 0;
}
return 2;
} else {
return messageNoteMapper.deleteByPrimaryKey(id);
MessageNote messageNote = messageNoteMapper.selectByPrimaryKey(id);
if (messageNote != null) {
messageNoteMapper.deleteByPrimaryKey(id);
}
elasticsearchService.deleteMessageNoteById(id);
return 2;
}
}
... ... @@ -116,6 +121,7 @@ public class MessageNoteServiceImpl implements MessageNoteService {
messageNoteList.parallelStream().forEach(item -> {
String content = new String(item.getSendContent());
item.setAlias_sendContent(content + "");
item.setAlias_sendTime(DateUtils.formatLong(item.getSendTime()));
});
PageInfo<MessageNote> pageInfo = new PageInfo<>(messageNoteList);
return pageInfo;
... ... @@ -146,15 +152,46 @@ public class MessageNoteServiceImpl implements MessageNoteService {
.routingKeyName(xmlData.getRoutingKeyName())
// 发送时间
.sendTime(xmlData.getSendDateTime())
.alias_sendTime(DateUtils.formatLong(xmlData.getSendDateTime()))
// 消息内容,别名
.alias_sendContent(xmlData.getSendContent())
// 消息内容
.sendContent(xmlData.getSendContent().getBytes())
// 描述:序列+token,or 自我描述
.description(description)
.build();
int num = messageNoteMapper.insertSelective(messageNote);
MessageNote note = note_fillId(messageNote);
int num = messageNoteMapper.insertSelective(note);
// ES没有事务,故先执行SQL
insertMessageToES(note);
return num;
}
/**
* 客户端发来的消息,填充id
*
* @param messageNote {@link MessageNote}
* @return
*/
public MessageNote note_fillId(MessageNote messageNote)
{
UserInfo userInfo = userInfoService.selectByUsername(messageNote.getUsername());
messageNote.setUserId(userInfo.getId());
BusServer busServer = busServerService.selectByServerName(messageNote.getServerName());
messageNote.setServerId(busServer.getId());
VirtualHost virtualHost = virtualHostService.selectByVirtualHostName(messageNote.getVirtualHostName());
messageNote.setVirtualHostId(virtualHost.getId());
BusExchange busExchange = busExchangeService.selectByExchangeName(messageNote.getExchangeName());
messageNote.setExchangeId(busExchange.getId());
RoutingKey routingKey = routingKeyService.selectByRoutingKeyName(messageNote.getRoutingKeyName());
messageNote.setRoutingKeyId(routingKey.getId());
return messageNote;
}
/**
* 填充名称(使用get方法,如果不存在就会报空指针异常)
... ... @@ -167,16 +204,10 @@ public class MessageNoteServiceImpl implements MessageNoteService {
// 设置id
messageNote.setId(IdUtils.generateId());
// 填充,用户名称
// UserInfo userInfo = userInfoService.selectByPrimaryKey(messageNote.getUserId());
// messageNote.setUsername(userInfo.getUsername());
// 填充,发送内容(编辑 or 新增)
messageNote.setSendContent(messageNote.getAlias_sendContent().getBytes());
// 填充,服务器名称
// BusServer busServer = busServerService.selectByPrimaryKey(messageNote.getServerId());
// messageNote.setServerName(busServer.getServerName());
messageNote.setAlias_sendTime(DateUtils.formatLong(messageNote.getSendTime()));
// 填充,虚拟主机名称
VirtualHost virtualHost = virtualHostService.selectByPrimaryKey(messageNote.getVirtualHostId());
... ... @@ -234,11 +265,25 @@ public class MessageNoteServiceImpl implements MessageNoteService {
.serverPort(busServer.getServerPort())
.build();
directUtils.sendMessage(xmlData);
return messageNoteMapper.insertSelective(note);
int num = messageNoteMapper.insertSelective(note);
// ES没有事务,故先执行SQL
insertMessageToES(note);
return num;
} else {
return 0;
}
}
/**
* 保存消息至ES服务器
*
* @param note 消息
*/
public void insertMessageToES(MessageNote note)
{
elasticsearchService.saveOrUpdateMessageNote(note);
}
}
... ...
... ... @@ -88,6 +88,12 @@ public class RoutingKeyServiceImpl implements RoutingKeyService {
}
@Override
public RoutingKey selectByRoutingKeyName(String routingKeyName)
{
return routingKeyMapper.selectByRoutingKeyName(routingKeyName);
}
@Override
public List<RoutingKey> selectByExchangeId(String exchangeId)
{
return routingKeyMapper.selectByExchangeId(exchangeId);
... ...
... ... @@ -44,6 +44,12 @@ public class UserInfoServiceImpl implements UserInfoService {
}
@Override
public UserInfo selectByUsername(String username)
{
return userInfoMapper.selectByUsername(username);
}
@Override
public int updateByPrimaryKeySelective(UserInfo record)
{
return userInfoMapper.updateByPrimaryKeySelective(record);
... ...
... ... @@ -96,6 +96,12 @@ public class VirtualHostServiceImpl implements VirtualHostService {
}
@Override
public VirtualHost selectByVirtualHostName(String virtualHostName)
{
return virtualHostMapper.selectByVirtualHostName(virtualHostName);
}
@Override
public int updateByPrimaryKeySelective(VirtualHost record)
{
return virtualHostMapper.updateByPrimaryKeySelective(record);
... ...
package com.sunyo.wlpt.message.bus.service.utils;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
/**
* @author 子诚
* Description:时间字符串
* 时间:2020/8/6 18:57
*/
public class DateUtils {
/**
* 时间戳转换成字符串
*/
public static String getDateToLong(long time)
{
Date d = new Date(time);
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sf.format(d);
}
/**
* 时间戳转换成字符串
*/
public static String getDateToShort(long time)
{
Date d = new Date(time);
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
return sf.format(d);
}
/**
* 判断传递来的时间,是否为空,为null,则返回"",不为空,则规范格式 "yyyy-MM-dd"
*
* @param time Date类型时间
* @return
*/
public static String isNullShort(Date time)
{
if (null == time) {
return "";
} else {
return getDateToShort(time.getTime());
}
}
/**
* 判断传递来的时间,是否为空,为null,则返回"",不为空,则规范格式 "yyyy-MM-dd HH:mm:ss"
*
* @param time Date类型时间
* @return
*/
public static String isNullLong(Date time)
{
if (null == time) {
return "";
} else {
return getDateToLong(time.getTime());
}
}
/**
* 将 Date类型转成 yyyy-MM-dd HH:mm:ss
*
* @param date 时间
* @return 字符串
*/
public static String formatLong(Date date)
{
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sf.format(date);
}
/**
* 增加天数
*
* @param date 时间
* @param days 增加的天数
* @return 增加天数之后的时间
*/
public static Date addDays(Date date, int days)
{
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.add(Calendar.DATE, days);
return calendar.getTime();
}
}
... ...
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.BusExchangeMapper">
<!-- 使用Redis做mybatis的二级缓存 -->
<cache type="com.sunyo.wlpt.message.bus.service.cache.RedisCache"/>
<resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.domain.BusExchange">
<!--@mbg.generated-->
<!--@Table bus_exchange-->
... ... @@ -43,6 +42,14 @@
where id = #{id,jdbcType=VARCHAR}
</select>
<select id="selectByExchangeName" parameterType="java.lang.String" resultMap="BaseResultMap">
<!--@mbg.generated-->
select
<include refid="Base_Column_List"/>
from bus_exchange
where exchange_name = #{exchangeName,jdbcType=VARCHAR}
</select>
<!-- 获取交换机列表,根据虚拟主机id -->
<select id="selectByVirtualHostId" parameterType="java.lang.String" resultMap="BaseResultMap">
<!--@mbg.generated-->
... ...
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.BusQueueMapper">
<!-- 使用Redis做mybatis的二级缓存 -->
<cache type="com.sunyo.wlpt.message.bus.service.cache.RedisCache"/>
<resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.domain.BusQueue">
<!--@mbg.generated-->
<!--@Table bus_queue-->
... ...
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper">
<!-- 使用Redis做mybatis的二级缓存 -->
<cache type="com.sunyo.wlpt.message.bus.service.cache.RedisCache"/>
<resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.domain.BusServer">
<!--@mbg.generated-->
<!--@Table bus_server-->
... ... @@ -53,6 +51,14 @@
where id = #{id,jdbcType=VARCHAR}
</select>
<select id="selectByServerName" parameterType="java.lang.String" resultMap="BaseResultMap">
<!--@mbg.generated-->
select
<include refid="Base_Column_List"/>
from bus_server
where server_name = #{serverName,jdbcType=VARCHAR}
</select>
<!-- 获取服务器列表,可能要级联或者懒加载 -->
<select id="getServerList" resultMap="BaseResultMap">
select
... ...
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.RoutingKeyMapper">
<!-- 使用Redis做mybatis的二级缓存 -->
<cache type="com.sunyo.wlpt.message.bus.service.cache.RedisCache"/>
<resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.domain.RoutingKey">
<!--@mbg.generated-->
<!--@Table routing_key-->
... ... @@ -36,6 +34,15 @@
where id = #{id,jdbcType=VARCHAR}
</select>
<select id="selectByRoutingKeyName" parameterType="java.lang.String" resultMap="BaseResultMap">
<!--@mbg.generated-->
select
<include refid="Base_Column_List"/>
from routing_key
where routing_key_name = #{routingKeyName,jdbcType=VARCHAR}
</select>
<!-- 查询路由键列表 -->
<select id="selectRoutingKeyList" parameterType="com.sunyo.wlpt.message.bus.service.domain.RoutingKey"
resultMap="RoutingKeyAndExchangeMap">
... ...
... ... @@ -23,6 +23,14 @@
where id = #{id,jdbcType=VARCHAR}
</select>
<select id="selectByUsername" parameterType="java.lang.String" resultMap="BaseResultMap">
<!--@mbg.generated-->
select
<include refid="Base_Column_List"/>
from user_info
where username = #{username,jdbcType=VARCHAR}
</select>
<select id="getUserInfoList" resultMap="BaseResultMap">
<!--@mbg.generated-->
select id,
... ...
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.UserMessageBindingMapper">
<!-- 使用Redis做mybatis的二级缓存 -->
<cache type="com.sunyo.wlpt.message.bus.service.cache.RedisCache"/>
<resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.domain.UserMessageBinding">
<!--@mbg.generated--><!--@Table user_message_binding-->
<id column="id" jdbcType="VARCHAR" property="id"/>
... ...
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.sunyo.wlpt.message.bus.service.mapper.VirtualHostMapper">
<!-- 使用Redis做mybatis的二级缓存 -->
<cache type="com.sunyo.wlpt.message.bus.service.cache.RedisCache"/>
<resultMap id="BaseResultMap" type="com.sunyo.wlpt.message.bus.service.domain.VirtualHost">
<!--@mbg.generated-->
<!--@Table virtual_host-->
... ... @@ -35,6 +33,14 @@
where id = #{id,jdbcType=VARCHAR}
</select>
<select id="selectByVirtualHostName" parameterType="java.lang.String" resultMap="BaseResultMap">
<!--@mbg.generated-->
select
<include refid="Base_Column_List"/>
from virtual_host
where virtual_host_name = #{virtualHostName,jdbcType=VARCHAR}
</select>
<!-- 根据服务器id,查询虚拟主机列表 -->
<select id="selectByServerId" parameterType="java.lang.String" resultMap="BaseResultMap">
<!--@mbg.generated-->
... ...