作者 王勇

监控队列与监控交换机,基本完善

... ... @@ -132,6 +132,12 @@
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
... ...
package com.sunyo.wlpt.message.bus.service.controller.view;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.view.ExchangeFactory;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.io.IOException;
import java.net.URISyntaxException;
/**
* @author 子诚
* Description:
* 时间:2020/8/27 15:23
*/
@CrossOrigin
@RequestMapping("bus/view/exchange")
@RestController
public class ViewExchangeController {
@Resource
private ExchangeFactory exchangeFactory;
@GetMapping("/list")
public ResultJson getViewExchangeList(
@RequestParam(value = "serverName", required = false) String serverName,
@RequestParam(value = "virtualHostName", required = false) String virtualHostName,
@RequestParam(value = "pageNum", defaultValue = "1") Integer pageNum,
@RequestParam(value = "pageSize", defaultValue = "10") Integer pageSize
)
{
try {
return exchangeFactory.getViewExchangeList(serverName, virtualHostName, pageNum, pageSize);
} catch (IOException | URISyntaxException e) {
return new ResultJson<>("500", "服务器异常,查询交换机监控,失败!请联系管理员");
}
}
}
... ...
package com.sunyo.wlpt.message.bus.service.domain.view;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.rabbitmq.http.client.domain.ExchangeInfo;
import com.rabbitmq.http.client.domain.ExchangeMessageStats;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author 子诚
* Description:
* 时间:2020/8/31 14:38
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class TempExchangeInfo extends ExchangeInfo {
@JsonProperty("message_stats")
private ExchangeMessageStats messageStats;
}
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.domain.view;
import com.rabbitmq.http.client.domain.ExchangeInfo;
import com.rabbitmq.http.client.domain.ExchangeMessageStats;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
... ... @@ -20,5 +20,15 @@ import java.io.Serializable;
public class ViewExchangeInfo implements Serializable {
private static final long serialVersionUID = -3353890396017709770L;
private String serverName;
private ExchangeInfo exchangeInfo;
private TempExchangeInfo tempExchangeInfo;
public long getPublishIn()
{
TempExchangeInfo tempExchangeInfo = getTempExchangeInfo();
ExchangeMessageStats messageStats = tempExchangeInfo.getMessageStats();
if (messageStats == null) {
return 0;
}
return messageStats.getPublishIn();
}
}
... ...
... ... @@ -2,14 +2,18 @@ package com.sunyo.wlpt.message.bus.service.rabbit.utils;
import com.rabbitmq.http.client.Client;
import com.rabbitmq.http.client.domain.ExchangeInfo;
import com.rabbitmq.http.client.domain.ExchangeMessageStats;
import com.rabbitmq.http.client.domain.QueueInfo;
import com.rabbitmq.http.client.domain.UserPermissions;
import com.sunyo.wlpt.message.bus.service.domain.BusServer;
import com.sunyo.wlpt.message.bus.service.domain.UserInfo;
import com.sunyo.wlpt.message.bus.service.domain.VirtualHost;
import com.sunyo.wlpt.message.bus.service.domain.view.TempExchangeInfo;
import com.sunyo.wlpt.message.bus.service.domain.view.ViewExchangeInfo;
import com.sunyo.wlpt.message.bus.service.domain.view.ViewQueueInfo;
import com.sunyo.wlpt.message.bus.service.utils.AESUtils;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import java.io.IOException;
import java.net.URISyntaxException;
... ... @@ -242,13 +246,50 @@ public class ClientUtils {
List<ViewExchangeInfo> list = new ArrayList<>();
// 将获取到的队列信息,拼接一个属性,服务器名称
for (ExchangeInfo exchangeInfo : exchanges) {
ViewExchangeInfo viewExchangeInfo = ViewExchangeInfo.builder().serverName(serverName).exchangeInfo(exchangeInfo).build();
TempExchangeInfo tempExchangeInfo = exchangeInfoToTemp(exchangeInfo);
ViewExchangeInfo viewExchangeInfo = ViewExchangeInfo.builder().serverName(serverName).tempExchangeInfo(tempExchangeInfo).build();
list.add(viewExchangeInfo);
}
return list;
}
/**
* mybatis封装的反射(根据属性名和对象,获取属性的值)
*
* @param fieldName 属性名
* @param object 对象
* @return
*/
private static Object getFieldValueByFieldName(String fieldName, Object object)
{
MetaObject metaObject = SystemMetaObject.forObject(object);
Object value = metaObject.getValue(fieldName);
return value;
}
/**
* 将ExchangeInfo转换成TempExchangeInfo
*
* @param exchangeInfo ExchangeInfo
* @return
*/
private static TempExchangeInfo exchangeInfoToTemp(ExchangeInfo exchangeInfo)
{
ExchangeMessageStats messageStats = (ExchangeMessageStats) getFieldValueByFieldName("messageStats", exchangeInfo);
TempExchangeInfo temp = new TempExchangeInfo();
temp.setVhost(exchangeInfo.getVhost());
temp.setName(exchangeInfo.getName());
temp.setType(exchangeInfo.getType());
temp.setDurable(exchangeInfo.isDurable());
temp.setAutoDelete(exchangeInfo.isAutoDelete());
temp.setInternal(exchangeInfo.isInternal());
temp.setArguments(exchangeInfo.getArguments());
temp.setMessageStats(messageStats);
return temp;
}
/**
* 获取MQ界面的交换机信息,重载
*
* @param busServer MQ服务器信息
... ... @@ -279,7 +320,8 @@ public class ClientUtils {
{
Client client = connectClient(busServer);
ExchangeInfo exchangeInfo = client.getExchange(vHostName, exchangeName);
ViewExchangeInfo viewExchangeInfo = ViewExchangeInfo.builder().serverName(busServer.getServerName()).exchangeInfo(exchangeInfo).build();
TempExchangeInfo tempExchangeInfo = exchangeInfoToTemp(exchangeInfo);
ViewExchangeInfo viewExchangeInfo = ViewExchangeInfo.builder().serverName(busServer.getServerName()).tempExchangeInfo(tempExchangeInfo).build();
return viewExchangeInfo;
}
... ...
package com.sunyo.wlpt.message.bus.service.service.view;
import com.sunyo.wlpt.message.bus.service.domain.BusServer;
import com.sunyo.wlpt.message.bus.service.domain.VirtualHost;
import com.sunyo.wlpt.message.bus.service.domain.view.ViewExchangeInfo;
import com.sunyo.wlpt.message.bus.service.mapper.BusServerMapper;
import com.sunyo.wlpt.message.bus.service.mapper.VirtualHostMapper;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.ClientUtils;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import io.netty.util.internal.StringUtil;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author 子诚
* Description:
* 时间:2020/8/27 17:08
*/
@Service
public class ExchangeFactory {
@Resource
private BusServerMapper busServerMapper;
@Resource
private VirtualHostMapper virtualHostMapper;
public ResultJson getViewExchangeList(String serverName, String virtualHostName, Integer pageNum, Integer pageSize) throws IOException, URISyntaxException
{
List<ViewExchangeInfo> list = new ArrayList<>();
// 服务器名称、虚拟主机名称,均为空
if (StringUtil.isNullOrEmpty(serverName) && StringUtil.isNullOrEmpty(virtualHostName)) {
List<BusServer> serverList = busServerMapper.getServerList();
for (BusServer busServer : serverList) {
List<ViewExchangeInfo> viewExchanges = ClientUtils.getViewExchanges(busServer);
list.addAll(viewExchanges);
}
}
// 仅,服务器名称不为空
if (!StringUtil.isNullOrEmpty(serverName) && StringUtil.isNullOrEmpty(virtualHostName)) {
BusServer busServer = busServerMapper.selectByServerName(serverName);
if (busServer == null) {
return new ResultJson("400", "该服务器名称不存在,请仔细检查");
}
List<ViewExchangeInfo> viewExchanges = ClientUtils.getViewExchanges(busServer);
list.addAll(viewExchanges);
}
// 仅,虚拟主机名称不为空
if (StringUtil.isNullOrEmpty(serverName) && !StringUtil.isNullOrEmpty(virtualHostName)) {
VirtualHost virtualHost = virtualHostMapper.selectByVirtualHostName(virtualHostName);
if (virtualHost == null) {
return new ResultJson("400", "该虚拟主机名称不存在,请仔细检查");
}
BusServer busServer = busServerMapper.selectByPrimaryKey(virtualHost.getServerId());
List<ViewExchangeInfo> viewExchanges = ClientUtils.getViewExchanges(busServer, virtualHostName);
list.addAll(viewExchanges);
}
// 服务器名称、虚拟主机名称,均不为空
if (!StringUtil.isNullOrEmpty(serverName) && !StringUtil.isNullOrEmpty(virtualHostName)) {
BusServer busServer = busServerMapper.selectByServerName(serverName);
if (busServer == null) {
return new ResultJson("400", "该服务器名称不存在,请仔细检查");
}
VirtualHost virtualHost = virtualHostMapper.selectByVirtualHostName(virtualHostName);
if (virtualHost == null) {
return new ResultJson("400", "该虚拟主机名称不存在,请仔细检查");
}
if (!virtualHost.getServerId().equals(busServer.getId())) {
return new ResultJson("400", "该虚拟主机不属于该服务器,请仔细检查");
}
List<ViewExchangeInfo> viewExchanges = ClientUtils.getViewExchanges(busServer, virtualHostName);
list.addAll(viewExchanges);
}
Integer total = list.size();
List<ViewExchangeInfo> resultList = subAndSortList(pageNum, pageSize, list);
return resultList.size() > 0
? new ResultJson<>("200", "查询交换机监控,成功!", resultList, total)
: new ResultJson<>("500", "查询交换机监控,失败!");
}
/**
* 实现排序与分页效果
*
* @param pageNum 当前的页数
* @param pageSize 每页的大小
* @param list List<ViewExchangeInfo>
* @return
*/
private List<ViewExchangeInfo> subAndSortList(Integer pageNum, Integer pageSize, List<ViewExchangeInfo> list)
{
Integer start = (pageNum - 1) * pageSize;
Integer end = start + pageSize;
Integer total = list.size();
if (start > total) {
start = 0;
end = total;
}
if (end > total) {
end = total;
}
List<ViewExchangeInfo> sortedList =
list.stream().sorted(Comparator.comparing(ViewExchangeInfo::getPublishIn).reversed())
.collect(Collectors.toList());
List<ViewExchangeInfo> pageList = sortedList.subList(start, end);
return pageList;
}
}
... ...
... ... @@ -92,7 +92,7 @@ public class ViewQueueFactory {
}
/**
* 实现分页与排序效果
* 实现排序与分页效果
*
* @param pageNum 开始页
* @param pageSize 每页大小
... ... @@ -111,11 +111,13 @@ public class ViewQueueFactory {
if (end > total) {
end = total;
}
List<ViewQueueInfo> pageList = list.subList(start, end);
// 对积压数,进行降序排序
List<ViewQueueInfo> resultList =
pageList.stream().sorted(Comparator.comparing(ViewQueueInfo::getMessageReady).reversed())
List<ViewQueueInfo> sortedList =
list.stream().sorted(Comparator.comparing(ViewQueueInfo::getMessageReady).reversed())
.collect(Collectors.toList());
// 分页
List<ViewQueueInfo> resultList = sortedList.subList(start, end);
return resultList;
}
}
... ...