作者 王勇

监控队列,基本完善

... ... @@ -41,8 +41,7 @@ public class CascadeController {
@GetMapping("/server")
public ResultJson getServerList()
{
List<BusServer> busServers = busServerService.getServerList();
List<BusServer> busServers = busServerService.selectServerList();
return busServers.size() > 0
? new ResultJson<>("200", "查询服务器列表,成功", busServers)
: new ResultJson<>("500", "查询服务器列表,失败");
... ... @@ -127,7 +126,7 @@ public class CascadeController {
@GetMapping("/server_host")
public ResultJson getServerAndHostList(@RequestParam(value = "serverName", required = false) String serverName)
{
BusServer busServer=BusServer.builder().serverName(serverName)
BusServer busServer = BusServer.builder().serverName(serverName)
.build();
List<BusServer> servers = busServerService.getServerAndHostList(busServer);
return servers.size() > 0
... ...
package com.sunyo.wlpt.message.bus.service.controller.view;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.view.ViewQueueFactory;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.io.IOException;
import java.net.URISyntaxException;
/**
* @author 子诚
* Description:
* 时间:2020/8/26 11:07
*/
@CrossOrigin
@RequestMapping("bus/view/queue")
@RestController
public class ViewQueueController {
@Resource
private ViewQueueFactory viewQueueFactory;
/**
* @param serverName 服务器名称
* @param virtualHostName 虚拟主机名称
* @param pageNum 当前页数
* @param pageSize 每页数量
* @return
*/
@GetMapping("/list")
public ResultJson getViewQueueList(
@RequestParam(value = "serverName", required = false) String serverName,
@RequestParam(value = "virtualHostName", required = false) String virtualHostName,
@RequestParam(value = "pageNum", defaultValue = "1") Integer pageNum,
@RequestParam(value = "pageSize", defaultValue = "10") Integer pageSize
)
{
try {
return viewQueueFactory.getViewQueueList(serverName, virtualHostName, pageNum, pageSize);
} catch (IOException | URISyntaxException e) {
return new ResultJson<>("500", "服务器异常,查询队列监控,失败!请联系管理员");
}
}
}
... ...
... ... @@ -78,6 +78,13 @@ public interface BusServerMapper {
List<BusServer> selectBusServerList(BusServer busServer);
/**
* 查询服务器列表(不包含密码)
*
* @return 服务器列表
*/
List<BusServer> selectServerList();
/**
* 先校验该服务器名称是否存在
*
* @param busServer {@link BusServer}
... ... @@ -86,6 +93,14 @@ public interface BusServerMapper {
List<BusServer> validateServerName(BusServer busServer);
/**
* 校验该服务器名称是否存在
*
* @param serverName 服务器名称
* @return
*/
List<BusServer> validateByServerName(@Param("serverName") String serverName);
/**
* 校验该服务器信息是否存在
*
* @param busServer {@link BusServer}
... ... @@ -94,7 +109,7 @@ public interface BusServerMapper {
List<BusServer> validateBusServer(BusServer busServer);
/**
* 查询服务器列表
* 查询服务器列表(包含密码)
*
* @return 服务器列表
*/
... ...
package com.sunyo.wlpt.message.bus.service.rabbit.utils;
import com.rabbitmq.http.client.Client;
import com.rabbitmq.http.client.domain.QueueInfo;
import com.rabbitmq.http.client.domain.UserPermissions;
import com.sunyo.wlpt.message.bus.service.domain.BusServer;
import com.sunyo.wlpt.message.bus.service.domain.UserInfo;
... ... @@ -10,6 +11,7 @@ import com.sunyo.wlpt.message.bus.service.utils.AESUtils;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
/**
* @author 子诚
... ... @@ -148,4 +150,52 @@ public class ClientUtils {
client.clearPermissions(vHost, username);
client.clearTopicPermissions(vHost, username);
}
/**
* 获取MQ界面的队列信息,重载
*
* @param busServer
* @return
* @throws IOException
* @throws URISyntaxException
*/
public static List<QueueInfo> getViewQueues(BusServer busServer) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
List<QueueInfo> queues = client.getQueues();
return queues;
}
/**
* 获取MQ界面的队列信息,重载
*
* @param busServer
* @param vHostName
* @return
* @throws IOException
* @throws URISyntaxException
*/
public static List<QueueInfo> getViewQueues(BusServer busServer, String vHostName) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
List<QueueInfo> queues = client.getQueues(vHostName);
return queues;
}
/**
* 获取MQ界面的队列信息,重载
*
* @param busServer MQ服务器信息
* @param vHostName 虚拟主机
* @param queueName 队列名称
* @return
* @throws IOException
* @throws URISyntaxException
*/
public QueueInfo getViewQueues(BusServer busServer, String vHostName, String queueName) throws IOException, URISyntaxException
{
Client client = connectClient(busServer);
QueueInfo queue = client.getQueue(vHostName, queueName);
return queue;
}
}
... ...
... ... @@ -32,6 +32,11 @@ public class ResultJson<T> implements Serializable {
private String error;
/**
* 数据总条数
*/
private Integer total;
/**
* 响应数据
*/
private T data;
... ... @@ -41,7 +46,6 @@ public class ResultJson<T> implements Serializable {
*/
private String jwtToken;
private Integer total;
/**
* 无参,构造方法
... ...
... ... @@ -105,6 +105,13 @@ public interface BusServerService {
List<BusServer> getServerList();
/**
* 查询服务器列表(不包含密码)
*
* @return 服务器列表
*/
List<BusServer> selectServerList();
/**
* 查询,服务器(1:n虚拟主机)的基本信息
*
* @return List<BusServer>
... ...
... ... @@ -150,6 +150,12 @@ public class BusServerServiceImpl implements BusServerService {
}
@Override
public List<BusServer> selectServerList()
{
return busServerMapper.selectServerList();
}
@Override
public List<BusServer> getServerAndHostList(BusServer busServer)
{
return busServerMapper.getServerAndHostList(busServer);
... ...
package com.sunyo.wlpt.message.bus.service.service.view;
import com.rabbitmq.http.client.domain.QueueInfo;
import com.sunyo.wlpt.message.bus.service.domain.BusServer;
import com.sunyo.wlpt.message.bus.service.domain.VirtualHost;
import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper;
import com.sunyo.wlpt.message.bus.service.mapper.VirtualHostMapper;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.ClientUtils;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import io.netty.util.internal.StringUtil;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author 子诚
* Description:
* 时间:2020/8/26 15:09
*/
@Service
public class ViewQueueFactory {
@Resource
private BusServerMapper busServerMapper;
@Resource
private VirtualHostMapper virtualHostMapper;
public ResultJson getViewQueueList(String serverName, String virtualHostName, Integer pageNum, Integer pageSize) throws IOException,
URISyntaxException
{
List<QueueInfo> list = new ArrayList<>();
// 服务器名称、虚拟主机名称,均为空
if (StringUtil.isNullOrEmpty(serverName) && StringUtil.isNullOrEmpty(virtualHostName)) {
List<BusServer> serverList = busServerMapper.getServerList();
for (BusServer busServer : serverList) {
List<QueueInfo> queueInfoList = ClientUtils.getViewQueues(busServer);
list.addAll(queueInfoList);
}
}
// 仅,服务器名称不为空
if (!StringUtil.isNullOrEmpty(serverName) && StringUtil.isNullOrEmpty(virtualHostName)) {
BusServer busServer = busServerMapper.selectByServerName(serverName);
if (busServer == null) {
return new ResultJson("400", "该服务器名称不存在,请仔细检查");
}
List<QueueInfo> queueInfoList = ClientUtils.getViewQueues(busServer);
list.addAll(queueInfoList);
}
// 仅,虚拟主机名称不为空
if (StringUtil.isNullOrEmpty(serverName) && !StringUtil.isNullOrEmpty(virtualHostName)) {
VirtualHost virtualHost = virtualHostMapper.selectByVirtualHostName(virtualHostName);
if (virtualHost == null) {
return new ResultJson("400", "该虚拟主机名称不存在,请仔细检查");
}
BusServer busServer = busServerMapper.selectByPrimaryKey(virtualHost.getServerId());
List<QueueInfo> queueInfoList = ClientUtils.getViewQueues(busServer, virtualHostName);
list.addAll(queueInfoList);
}
// 服务器名称、虚拟主机名称,均不为空
if (!StringUtil.isNullOrEmpty(serverName) && !StringUtil.isNullOrEmpty(virtualHostName)) {
BusServer busServer = busServerMapper.selectByServerName(serverName);
if (busServer == null) {
return new ResultJson("400", "该服务器名称不存在,请仔细检查");
}
VirtualHost virtualHost = virtualHostMapper.selectByVirtualHostName(virtualHostName);
if (virtualHost == null) {
return new ResultJson("400", "该虚拟主机名称不存在,请仔细检查");
}
if (!virtualHost.getServerId().equals(busServer.getId())) {
return new ResultJson("400", "该虚拟主机不属于该服务器,请仔细检查");
}
List<QueueInfo> queueInfoList = ClientUtils.getViewQueues(busServer, virtualHostName);
list.addAll(queueInfoList);
}
Integer total = list.size();
// 达到分页与排序效果
List<QueueInfo> resultList = subList(pageNum, pageSize, list);
return resultList.size() > 0
? new ResultJson<>("200", "查询队列监控,成功!", resultList, total)
: new ResultJson<>("500", "查询队列监控,失败!");
}
/**
* 实现分页与排序效果
*
* @param pageNum 开始页
* @param pageSize 每页大小
* @param list 全部的数据
* @return List<QueueInfo>
*/
public List<QueueInfo> subList(Integer pageNum, Integer pageSize, List<QueueInfo> list)
{
Integer total = list.size();
Integer start = (pageNum - 1) * pageSize;
Integer end = start + pageSize;
if (start > total) {
start = 0;
end = total;
}
if (end > total) {
end = total;
}
List<QueueInfo> pageList = list.subList(start, end);
// 对总信息数,进行降序排序
List<QueueInfo> resultList =
pageList.stream().sorted(Comparator.comparing(QueueInfo::getTotalMessages).reversed()).collect(Collectors.toList());
return resultList;
}
}
... ...
... ... @@ -250,6 +250,20 @@
</where>
</select>
<!-- 获取服务器列表 -->
<select id="selectServerList" resultMap="BaseResultMap">
select id,
`server_name`,
server_ip,
server_port,
client_port,
super_username,
description,
gmt_create,
gmt_modified
from bus_server
</select>
<!-- 首先。校验服务器名称,是否存在 -->
<select id="validateServerName" parameterType="com.sunyo.wlpt.message.bus.service.domain.BusServer"
resultMap="BaseResultMap">
... ... @@ -264,6 +278,14 @@
</where>
</select>
<select id="validateByServerName" parameterType="java.lang.String" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from bus_server
where server_name = #{serverName,jdbcType=VARCHAR}
</select>
<!-- 再校验服务器信息(服务器的ip,服务器的port)是否同时存在 -->
<select id="validateBusServer" parameterType="com.sunyo.wlpt.message.bus.service.domain.BusServer"
resultMap="BaseResultMap">
... ...