作者 王勇

修改RabbitUtils中方法的返回值

... ... @@ -62,9 +62,7 @@ public class BusExchangeController {
public ResultJson deleteBusExchange(@RequestBody BusExchange busExchange)
{
try {
return busExchangeService.deleteByPrimaryKey(busExchange.getId()) > 0
? new ResultJson<>("200", "删除MQ交换机,成功")
: new ResultJson<>("500", "删除MQ交换机,失败");
return busExchangeService.deleteByPrimaryKey(busExchange.getId());
} catch (IOException | TimeoutException e) {
return new ResultJson<>("500", "服务器异常,删除MQ交换机,失败");
}
... ... @@ -80,9 +78,7 @@ public class BusExchangeController {
public ResultJson batchRemoveBusExchange(String ids)
{
try {
return busExchangeService.deleteByPrimaryKey(ids) > 0
? new ResultJson<>("200", "批量删除MQ交换机,成功")
: new ResultJson<>("500", "批量删除MQ交换机,失败");
return busExchangeService.deleteByPrimaryKey(ids);
} catch (IOException | TimeoutException e) {
return new ResultJson<>("500", "服务器异常,批量删除MQ交换机,失败");
}
... ... @@ -122,9 +118,7 @@ public class BusExchangeController {
// 设置id
busExchange.setId(IdUtils.generateId());
return message == null
? busExchangeService.insertSelective(busExchange) > 0
? new ResultJson<>("200", "新增MQ交换机信息,成功")
: new ResultJson<>("500", "新增MQ交换机信息,失败")
? busExchangeService.insertSelective(busExchange)
: new ResultJson<>("400", message);
} catch (IOException | TimeoutException e) {
return new ResultJson<>("500", "新增MQ交换机信息,失败");
... ...
... ... @@ -73,7 +73,7 @@ public class BusQueueController {
return busQueueService.deleteByPrimaryKey(busQueue.getId()) > 0
? new ResultJson<>("200", "删除消息队列,成功")
: new ResultJson<>("500", "删除消息队列,失败");
} catch (IOException | TimeoutException e) {
} catch (IOException | TimeoutException | InterruptedException e) {
return new ResultJson<>("500", "服务器异常,删除消息队列,失败");
}
}
... ... @@ -91,7 +91,7 @@ public class BusQueueController {
return busQueueService.deleteByPrimaryKey(ids) > 0
? new ResultJson<>("200", "批量删除MQ消息队列,成功")
: new ResultJson<>("500", "批量删除MQ消息队列,失败");
} catch (IOException | TimeoutException e) {
} catch (IOException | TimeoutException | InterruptedException e) {
return new ResultJson<>("500", "服务器异常,批量删除MQ消息队列,失败");
}
}
... ...
... ... @@ -4,8 +4,8 @@ import com.sunyo.wlpt.message.bus.service.domain.XmlData;
import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.DirectUtils;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.impl.AsyncTaskService;
import com.sunyo.wlpt.message.bus.service.service.UserMessageBindingService;
import com.sunyo.wlpt.message.bus.service.service.impl.AsyncTaskService;
import com.sunyo.wlpt.message.bus.service.utils.XmlUtils;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
... ... @@ -15,6 +15,8 @@ import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
/**
... ... @@ -113,17 +115,20 @@ public class RabbitController {
}
// 4、mq发送消息,数据库中保存消息并保存至ES
return sendAndSave(sentData);
} catch (IOException | TimeoutException e) {
} catch (IOException | TimeoutException | InterruptedException e) {
return ResultJson.error(CustomExceptionType.SERVER_EXCEPTION);
}
}
public ResultJson sendAndSave(XmlData sentData) throws IOException, TimeoutException
public ResultJson sendAndSave(XmlData sentData) throws IOException, TimeoutException, InterruptedException
{
// 4、mq发送消息,数据库中保存消息
ResultJson result = directUtils.sendMessage(sentData);
// 异步,保存消息记录
asyncTaskService.saveMessage(sentData);
CountDownLatch latch = new CountDownLatch(1);
Future<String> future = asyncTaskService.saveMessage(sentData, latch);
latch.await();
return result;
}
}
... ...
... ... @@ -80,7 +80,7 @@ public class UserInfoController {
}
/**
* 编辑用户信息
* 修改用户密码
*
* @param userInfo {@link UserInfo}
* @return
... ... @@ -103,13 +103,13 @@ public class UserInfoController {
{
try {
return userInfoService.deleteUserInfo(userInfo);
} catch (IOException | URISyntaxException e) {
} catch (IOException | URISyntaxException | TimeoutException | InterruptedException e) {
return new ResultJson<>("500", "服务器异常,删除用户失败");
}
}
/**
* 删除数据关系(MQ服务器 and 数据库)
* 删除用户关系(MQ服务器 and 数据库)
*/
@DeleteMapping("/deleteRelation")
public ResultJson deleteUserRelation(@RequestBody UserInfo userInfo)
... ...
... ... @@ -2,6 +2,7 @@ package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
... ... @@ -107,4 +108,13 @@ public interface BusQueueMapper {
* @return
*/
List<BusQueue> selectByUsername(String username);
/**
* 根据用户名称和虚拟主机Id查询队列信息
*
* @param username 用户名称
* @param virtualHostId 虚拟主机Id
* @return
*/
List<BusQueue> selectByUsernameAndHostId(@Param("username") String username, @Param("virtualHostId") String virtualHostId);
}
\ No newline at end of file
... ...
... ... @@ -119,6 +119,15 @@ public interface UserInfoMapper {
int deleteByUsername(String username);
/**
* 根据用户名称和服务器名称删除用户信息
*
* @param username 用户名称
* @param serverName 服务器名称
* @return
*/
int deleteByUsernameAndServerName(@Param("username") String username, @Param("serverName") String serverName);
/**
* 根据服务器名称,删除用户关系
*
* @param serverName 服务器名称
... ... @@ -134,4 +143,12 @@ public interface UserInfoMapper {
*/
int deleteByHostName(String hostName);
/**
* 根据用户名称和服务器名称,查询用户信息
*
* @param username 用户名称
* @param serverName 服务器名称
* @return
*/
List<UserInfo> selectByUsernameAndServerName(@Param("username") String username, @Param("serverName") String serverName);
}
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.rabbit.utils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.sunyo.wlpt.message.bus.service.domain.*;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.BusServerService;
import com.sunyo.wlpt.message.bus.service.service.VirtualHostService;
import com.sunyo.wlpt.message.bus.service.utils.EncryptionUtils;
... ... @@ -106,46 +108,67 @@ public class RabbitUtils {
/**
* 添加交换机
*/
public void createExchange(BusServer server, String virtualHostName, BusExchange busExchange)
public ResultJson createExchange(BusServer server, String virtualHostName, BusExchange busExchange)
throws IOException, TimeoutException
{
String exchangeName = busExchange.getExchangeName();
Connection connection = getConnection(server.getServerIp(), server.getServerPort(), virtualHostName, server.getSuperUsername(), server.getSuperPassword());
Channel channel = connection.createChannel();
channel.exchangeDeclare(busExchange.getExchangeName(),
// 添加交换机成功,declareOk.toString是 #method<exchange.declare-ok>()
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclare(exchangeName,
busExchange.getExchangeType(),
busExchange.getDurability(),
busExchange.getAutoDelete(),
busExchange.getInternal(),
null);
closeConnectionAndChanel(channel, connection);
return declareOk.toString().contains("declare-ok")
? ResultJson.success("添加MQ交换机,成功")
: new ResultJson<>("500", "MQ服务异常,添加MQ交换机:" + exchangeName + ",失败");
}
/**
* 删除交换机 channel.exchangeDelete(exchangeName);
*/
public void removeExchange(BusServer server, String virtualHostName, BusExchange busExchange)
public ResultJson removeExchange(BusServer server, String virtualHostName, BusExchange busExchange)
throws IOException, TimeoutException
{
String exchangeName = busExchange.getExchangeName();
Connection connection = getConnection(server.getServerIp(), server.getServerPort(), virtualHostName, server.getSuperUsername(), server.getSuperPassword());
Channel channel = connection.createChannel();
channel.exchangeDelete(busExchange.getExchangeName());
// 删除成功的时候,deleteOk.toString是"#method<exchange.delete-ok>()"
AMQP.Exchange.DeleteOk deleteOk = channel.exchangeDelete(exchangeName);
closeConnectionAndChanel(channel, connection);
return deleteOk.toString().contains("delete-ok")
? ResultJson.success("删除MQ交换机,成功")
: new ResultJson<>("500", "MQ服务异常,删除MQ交换机:" + exchangeName + ",失败");
}
/**
* 添加队列(默认设置参数为 null)
*/
public void createQueue(BusServer server, String virtualHostName, BusQueue busQueue)
public ResultJson createQueue(BusServer server, String virtualHostName, BusQueue busQueue)
throws IOException, TimeoutException
{
String queueName = busQueue.getQueueName();
Connection connection = getConnection(server.getServerIp(), server.getServerPort(), virtualHostName, server.getSuperUsername(), server.getSuperPassword());
Channel channel = connection.createChannel();
channel.queueDeclare(busQueue.getQueueName(),
// 成功时,deleteOk.toString是"#method<exchange.declare-ok>()"
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(queueName,
busQueue.getDurability(),
false,
busQueue.getAutoDelete(),
null);
closeConnectionAndChanel(channel, connection);
return declareOk.toString().contains("declare-ok")
? ResultJson.success("添加MQ队列,成功")
: new ResultJson<>("500", "MQ服务异常,添加MQ队列:" + queueName + ",失败");
}
/**
... ... @@ -155,107 +178,113 @@ public class RabbitUtils {
/**
* 删除队列 channel.queueDelete(queueName);
*/
public void removeQueue(BusServer server, String virtualHostName, BusQueue busQueue)
public ResultJson removeQueue(BusServer server, String virtualHostName, BusQueue busQueue)
throws IOException, TimeoutException
{
String queueName = busQueue.getQueueName();
Connection connection = getConnection(server.getServerIp(), server.getServerPort(), virtualHostName, server.getSuperUsername(), server.getSuperPassword());
Channel channel = connection.createChannel();
channel.queueDelete(busQueue.getQueueName());
AMQP.Queue.DeleteOk deleteOk = channel.queueDelete(queueName);
closeConnectionAndChanel(channel, connection);
return deleteOk.toString().contains("delete-ok")
? ResultJson.success("删除MQ队列,成功")
: new ResultJson<>("500", "MQ服务异常,删除MQ队列:" + queueName + ",失败");
}
/**
* 创建绑定
*/
public void createBinding(BusServer server, String virtualHostName, UserMessageBinding userMessageBinding)
public ResultJson createBinding(BusServer server, String virtualHostName, UserMessageBinding userMessageBinding)
throws IOException, TimeoutException
{
Connection connection = getConnection(server.getServerIp(), server.getServerPort(), virtualHostName, server.getSuperUsername(), server.getSuperPassword());
Channel channel = connection.createChannel();
channel.queueBind(userMessageBinding.getQueueName(),
userMessageBinding.getExchangeName(),
userMessageBinding.getRoutingKeyName());
// #method<queue.bind-ok>()
AMQP.Queue.BindOk bindOk = channel.queueBind(userMessageBinding.getQueueName(), userMessageBinding.getExchangeName(), userMessageBinding.getRoutingKeyName());
closeConnectionAndChanel(channel, connection);
return bindOk.toString().contains("bind-ok")
? ResultJson.success("创建MQ绑定关系,成功")
: new ResultJson<>("500", "MQ服务异常,创建MQ绑定关系" + ",失败");
}
/**
* 解除绑定 channel.queueUnbind("queueName", "exchangeName","routingKey");
*/
public void removeBinding(BusServer server, String virtualHostName, UserMessageBinding userMessageBinding)
throws IOException, TimeoutException
public ResultJson removeBinding(BusServer server, String virtualHostName, UserMessageBinding userMessageBinding) throws IOException, TimeoutException
{
Connection connection = getConnection(server.getServerIp(), server.getServerPort(), virtualHostName, server.getSuperUsername(), server.getSuperPassword());
Channel channel = connection.createChannel();
channel.queueUnbind(userMessageBinding.getQueueName(),
userMessageBinding.getExchangeName(),
userMessageBinding.getRoutingKeyName());
// #method<queue.unbind-ok>()
AMQP.Queue.UnbindOk unbindOk = channel.queueUnbind(userMessageBinding.getQueueName(), userMessageBinding.getExchangeName(), userMessageBinding.getRoutingKeyName());
closeConnectionAndChanel(channel, connection);
return unbindOk.toString().contains("unbind-ok")
? ResultJson.success("解除MQ绑定关系,成功")
: new ResultJson<>("500", "MQ服务异常,解除MQ绑定关系" + ",失败");
}
/**
* 前往创建交换机的路上
*/
public void toCreateExchange(BusExchange busExchange)
throws IOException, TimeoutException
public ResultJson toCreateExchange(BusExchange busExchange) throws IOException, TimeoutException
{
VirtualHost virtualHost = getVirtualHost(busExchange.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
createExchange(busServer, virtualHost.getVirtualHostName(), busExchange);
return createExchange(busServer, virtualHost.getVirtualHostName(), busExchange);
}
/**
* 前往删除交换机的路上
*/
public void toRemoveExchange(BusExchange busExchange)
throws IOException, TimeoutException
public ResultJson toRemoveExchange(BusExchange busExchange) throws IOException, TimeoutException
{
VirtualHost virtualHost = getVirtualHost(busExchange.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
removeExchange(busServer, virtualHost.getVirtualHostName(), busExchange);
return removeExchange(busServer, virtualHost.getVirtualHostName(), busExchange);
}
/**
* 前往创建队列的路上
*/
public void toCreateQueue(BusQueue BusQueue)
throws IOException, TimeoutException
public ResultJson toCreateQueue(BusQueue BusQueue) throws IOException, TimeoutException
{
VirtualHost virtualHost = getVirtualHost(BusQueue.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
createQueue(busServer, virtualHost.getVirtualHostName(), BusQueue);
return createQueue(busServer, virtualHost.getVirtualHostName(), BusQueue);
}
/**
* 前往删除队列的路上
*/
public void toRemoveQueue(BusQueue BusQueue)
throws IOException, TimeoutException
public ResultJson toRemoveQueue(BusQueue BusQueue) throws IOException, TimeoutException
{
VirtualHost virtualHost = getVirtualHost(BusQueue.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
removeQueue(busServer, virtualHost.getVirtualHostName(), BusQueue);
return removeQueue(busServer, virtualHost.getVirtualHostName(), BusQueue);
}
/**
* 前往创建绑定的路上
*/
public void toCreateBinding(UserMessageBinding userMessageBinding)
public ResultJson toCreateBinding(UserMessageBinding userMessageBinding)
throws IOException, TimeoutException
{
VirtualHost virtualHost = getVirtualHost(userMessageBinding.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
createBinding(busServer, virtualHost.getVirtualHostName(), userMessageBinding);
return createBinding(busServer, virtualHost.getVirtualHostName(), userMessageBinding);
}
/**
* 前往解除绑定的路上
*/
public void toRemoveBinding(UserMessageBinding userMessageBinding)
throws IOException, TimeoutException
public ResultJson toRemoveBinding(UserMessageBinding userMessageBinding) throws IOException, TimeoutException
{
VirtualHost virtualHost = getVirtualHost(userMessageBinding.getVirtualHostId());
BusServer busServer = getBusServer(virtualHost.getServerId());
removeBinding(busServer, virtualHost.getVirtualHostName(), userMessageBinding);
return removeBinding(busServer, virtualHost.getVirtualHostName(), userMessageBinding);
}
/**
... ...
... ... @@ -2,6 +2,7 @@ package com.sunyo.wlpt.message.bus.service.service;
import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.domain.BusExchange;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import java.io.IOException;
import java.util.List;
... ... @@ -21,7 +22,7 @@ public interface BusExchangeService {
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id) throws IOException, TimeoutException;
ResultJson deleteByPrimaryKey(String id) throws IOException, TimeoutException;
/**
* 新增
... ... @@ -37,7 +38,7 @@ public interface BusExchangeService {
* @param record the record
* @return insert count
*/
int insertSelective(BusExchange record) throws IOException, TimeoutException;
ResultJson insertSelective(BusExchange record) throws IOException, TimeoutException;
/**
* 查询,根据主键
... ...
... ... @@ -21,7 +21,7 @@ public interface BusQueueService {
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id) throws IOException, TimeoutException;
int deleteByPrimaryKey(String id) throws IOException, TimeoutException, InterruptedException;
/**
* 删除队列,根据虚拟主机id
... ... @@ -112,5 +112,6 @@ public interface BusQueueService {
* @return
*/
List<BusQueue> selectByUsername(String username);
}
... ...
... ... @@ -82,7 +82,9 @@ public interface UserInfoService {
* 修改密码
*
* @param userInfo {@link UserInfo}
* @return
* @return {@link ResultJson}
* @throws IOException
* @throws URISyntaxException
*/
ResultJson updatePassword(UserInfo userInfo) throws IOException, URISyntaxException;
... ... @@ -102,7 +104,7 @@ public interface UserInfoService {
* @param userInfo {@link UserInfo}
* @return 返回结果
*/
ResultJson deleteUserInfo(UserInfo userInfo) throws IOException, URISyntaxException;
ResultJson deleteUserInfo(UserInfo userInfo) throws IOException, URISyntaxException, TimeoutException, InterruptedException;
/**
* 删除用户关系
... ...
package com.sunyo.wlpt.message.bus.service.service.impl;
import com.sunyo.wlpt.message.bus.service.domain.*;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
import com.sunyo.wlpt.message.bus.service.service.*;
import org.springframework.context.annotation.Lazy;
import com.sunyo.wlpt.message.bus.service.domain.XmlData;
import com.sunyo.wlpt.message.bus.service.service.MessageNoteService;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
/**
* @author 子诚
... ... @@ -24,167 +20,24 @@ import java.util.concurrent.TimeoutException;
public class AsyncTaskService {
@Resource
private RabbitUtils rabbitUtils;
@Resource
private MessageNoteService messageNoteService;
@Lazy
@Resource
private RoutingKeyService routingKeyService;
@Resource
private UserMessageBindingService userMessageBindingService;
@Lazy
@Resource
private BusQueueService busQueueService;
@Lazy
@Resource
private BusExchangeService busExchangeService;
@Lazy
@Resource
private VirtualHostService virtualHostService;
/**
* 异步,无论消息是否发送成功,将消息存储于数据库
*
* @param sentData {@link XmlData}
*/
@Async("taskExecutor")
public void saveMessage(XmlData sentData)
{
// 无论消息是否发送成功,将消息存储于数据库
messageNoteService.insertMessageSelective(sentData);
}
/**
* 当删除服务器的时候,级联删除与服务器有关的
* 虚拟主机,交换机,路由键,队列,绑定关系
*/
@Async("taskExecutor")
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void serverCascadeDelete(BusServer busServer) throws IOException, TimeoutException
{
String serverId = busServer.getId();
List<UserMessageBinding> bindings = userMessageBindingService.selectByServerId(serverId);
// 1.1 删除数据库中的绑定关系
userMessageBindingService.deleteByServerId(serverId);
for (UserMessageBinding userMessageBinding : bindings) {
// 1.2、解除MQ服务器上的绑定关系
rabbitUtils.toRemoveBinding(userMessageBinding);
}
List<VirtualHost> virtualHostList = virtualHostService.selectByServerId(serverId);
// 2.1、删除数据库中的虚拟主机列表
virtualHostService.deleteByServerId(serverId);
for (VirtualHost virtualHost : virtualHostList) {
String virtualHostId = virtualHost.getId();
List<BusQueue> queues = busQueueService.selectByVirtualHostId(virtualHostId);
// 3.1、删除数据库中对应的队列
busQueueService.deleteByVirtualHostId(virtualHostId);
for (BusQueue queue : queues) {
// 3.2、删除MQ服务器上的队列
rabbitUtils.toRemoveQueue(queue);
}
List<BusExchange> exchanges = busExchangeService.selectByVirtualHostId(virtualHostId);
// 4.1 根据虚拟主机id,删除交换机
busExchangeService.deleteByVirtualHostId(virtualHostId);
for (BusExchange busExchange : exchanges) {
String exchangeId = busExchange.getId();
// 5.1、删除数据库中对应的路由键
routingKeyService.deleteByExchangeId(exchangeId);
// 4.2 在MQ服务器上删除该交换机
rabbitUtils.toRemoveExchange(busExchange);
}
}
}
/**
* 当删除虚拟主机的时候,级联删除与虚拟主机有关的
* 交换机,路由键,队列,绑定关系
*/
@Async("taskExecutor")
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void virtualHostCascadeDelete(VirtualHost virtualHost) throws IOException, TimeoutException
public Future<String> saveMessage(XmlData sentData, CountDownLatch latch)
{
String virtualHostId = virtualHost.getId();
List<UserMessageBinding> bindings = userMessageBindingService.selectByVirtualHostId(virtualHostId);
// 1.1、删除数据库中的绑定关系
userMessageBindingService.deleteByVirtualHostId(virtualHostId);
for (UserMessageBinding userMessageBinding : bindings) {
// 1.2、解除MQ服务器上的绑定关系
rabbitUtils.toRemoveBinding(userMessageBinding);
}
List<BusQueue> queues = busQueueService.selectByVirtualHostId(virtualHostId);
// 2.1、删除数据库中对应的队列
busQueueService.deleteByVirtualHostId(virtualHostId);
for (BusQueue queue : queues) {
// 2.2、删除MQ服务器上的队列
rabbitUtils.toRemoveQueue(queue);
}
List<BusExchange> exchanges = busExchangeService.selectByVirtualHostId(virtualHostId);
// 3.1 根据虚拟主机id,删除交换机
busExchangeService.deleteByVirtualHostId(virtualHostId);
for (BusExchange busExchange : exchanges) {
String exchangeId = busExchange.getId();
// 4.1、删除数据库中对应的路由键
routingKeyService.deleteByExchangeId(exchangeId);
// 3.2 在MQ服务器上删除该交换机
rabbitUtils.toRemoveExchange(busExchange);
try {
// 无论消息是否发送成功,将消息存储于数据库
messageNoteService.insertMessageSelective(sentData);
return new AsyncResult<>("无论消息是否发送成功,将消息存储于数据库与ES中");
} catch (Exception e) {
return new AsyncResult<>("保存消息于数据库与ES中,失败");
} finally {
latch.countDown();
}
}
/**
* 当删除交换机的时候,
* <p>
* 同时,删除对应的路由键,删除包含交换机的绑定关系
* <p>
* 同时,删除包含该路由键的绑定关系
*/
@Async("taskExecutor")
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void exchangeCascadeDelete(BusExchange busExchange)
{
String exchangeId = busExchange.getId();
List<RoutingKey> routingKeyList = routingKeyService.selectByExchangeId(exchangeId);
// 删除相关路由键
routingKeyService.deleteByExchangeId(exchangeId);
// 删除交换机相关绑定关系
userMessageBindingService.deleteByExchangeId(exchangeId);
// 删除路由键相关绑定关系
for (int i = 0; i < routingKeyList.size(); i++) {
userMessageBindingService.deleteByRoutingKeyId(routingKeyList.get(0).getId());
}
}
/**
* 当删除路由键的时候,删除包含该路由键的绑定关系
*/
@Async("taskExecutor")
public void routingKeyCascadeDelete(RoutingKey routingKey) throws IOException, TimeoutException
{
String routingKeyId = routingKey.getId();
// 根据路由键id查询出所有的包含该路由键的绑定关系
List<UserMessageBinding> bindings = userMessageBindingService.selectByRoutingKeyId(routingKeyId);
for (UserMessageBinding userMessageBinding : bindings) {
// 解除MQ服务器上的绑定关系
rabbitUtils.toRemoveBinding(userMessageBinding);
}
// 删除数据库中的包含该路由键相关绑定关系
userMessageBindingService.deleteByRoutingKeyId(routingKey.getId());
}
/**
* 当删除队列的时候,删除包含队列的绑定关系
*/
@Async("taskExecutor")
public void queueCascadeDelete(BusQueue busQueue)
{
// 删除相关绑定关系
userMessageBindingService.deleteByQueueId(busQueue.getId());
}
}
... ...
... ... @@ -3,9 +3,13 @@ package com.sunyo.wlpt.message.bus.service.service.impl;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.domain.BusExchange;
import com.sunyo.wlpt.message.bus.service.domain.RoutingKey;
import com.sunyo.wlpt.message.bus.service.mapper.BusExchangeMapper;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.BusExchangeService;
import com.sunyo.wlpt.message.bus.service.service.RoutingKeyService;
import com.sunyo.wlpt.message.bus.service.service.UserMessageBindingService;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
... ... @@ -16,6 +20,8 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static com.sunyo.wlpt.message.bus.service.common.Constant.RESULT_SUCCESS;
/**
* @author 子诚
* Description:
... ... @@ -32,53 +38,79 @@ public class BusExchangeServiceImpl implements BusExchangeService {
private AsyncTaskService asyncTaskService;
@Resource
private RoutingKeyService routingKeyService;
@Resource
private UserMessageBindingService userMessageBindingService;
@Resource
RabbitUtils rabbitUtils;
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public int deleteByPrimaryKey(String id) throws IOException, TimeoutException
public ResultJson deleteByPrimaryKey(String id) throws IOException, TimeoutException
{
// 判断删除的个数,需被删除的个数是否一致
int index = 0;
String splitItem = ",";
//如果id,传过来多个,以','分割,即批量删除
if (id.contains(splitItem)) {
String[] split = id.split(splitItem);
for (int i = 0; i < split.length; i++) {
BusExchange busExchange = selectByPrimaryKey(split[i]);
// 删除数据库的交换机记录
int num = busExchangeMapper.deleteByPrimaryKey(split[i]);
// 删除MQ服务器上的交换机
deleteExchange(busExchange);
// 级联删除与交换机相关的路由键,配置关系
asyncTaskService.exchangeCascadeDelete(busExchange);
ResultJson resultJson = deleteExchange(busExchange);
if (!RESULT_SUCCESS.equals(resultJson.getCode())) {
return resultJson;
}
exchangeCascadeDelete(busExchange);
if (num > 0) {
index = index + num;
}
}
if (index == split.length) {
return 1;
} else {
return 0;
}
return index == split.length
? new ResultJson<>("200", "删除MQ交换机,成功")
: new ResultJson<>("500", "删除MQ交换机,失败");
} else {
BusExchange busExchange = selectByPrimaryKey(id);
// 删除数据库的交换机记录
int num = busExchangeMapper.deleteByPrimaryKey(id);
// 删除MQ服务器上的交换机
deleteExchange(busExchange);
// 级联删除与交换机相关的路由键,配置关系
asyncTaskService.exchangeCascadeDelete(busExchange);
return num;
ResultJson resultJson = deleteExchange(busExchange);
if (!RESULT_SUCCESS.equals(resultJson.getCode())) {
return resultJson;
}
exchangeCascadeDelete(busExchange);
return num > 0
? new ResultJson<>("200", "删除MQ交换机,成功")
: new ResultJson<>("500", "删除MQ交换机,失败");
}
}
/**
* 删除MQ服务器上的交换机
*/
public void deleteExchange(BusExchange busExchange) throws IOException, TimeoutException
public ResultJson deleteExchange(BusExchange busExchange) throws IOException, TimeoutException
{
rabbitUtils.toRemoveExchange(busExchange);
return rabbitUtils.toRemoveExchange(busExchange);
}
/**
* 删除交换机时的级联删除
* <p>
* 数据库:删除该交换机的绑定关系
* <p>
* 数据库:删除该交换机下的路由键,以及该路由键的绑定关系
*
* @param busExchange {@link BusExchange}
*/
public void exchangeCascadeDelete(BusExchange busExchange)
{
String exchangeId = busExchange.getId();
List<RoutingKey> routingKeyList = routingKeyService.selectByExchangeId(exchangeId);
routingKeyService.deleteByExchangeId(exchangeId);
userMessageBindingService.deleteByExchangeId(exchangeId);
for (RoutingKey routingKey : routingKeyList) {
userMessageBindingService.deleteByRoutingKeyId(routingKey.getId());
}
}
@Override
... ... @@ -88,10 +120,15 @@ public class BusExchangeServiceImpl implements BusExchangeService {
}
@Override
public int insertSelective(BusExchange record) throws IOException, TimeoutException
public ResultJson insertSelective(BusExchange record) throws IOException, TimeoutException
{
rabbitUtils.toCreateExchange(record);
return busExchangeMapper.insertSelective(record);
ResultJson resultJson = rabbitUtils.toCreateExchange(record);
if (!RESULT_SUCCESS.equals(resultJson.getCode())) {
return resultJson;
}
return busExchangeMapper.insertSelective(record) > 0
? new ResultJson<>("200", "新增MQ交换机信息,成功")
: new ResultJson<>("500", "新增MQ交换机信息,失败");
}
@Override
... ...
... ... @@ -5,12 +5,12 @@ import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import com.sunyo.wlpt.message.bus.service.mapper.BusQueueMapper;
import com.sunyo.wlpt.message.bus.service.mapper.UserInfoMapper;
import com.sunyo.wlpt.message.bus.service.mapper.UserMessageBindingMapper;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.BusQueueService;
import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import io.netty.util.internal.StringUtil;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
... ... @@ -38,13 +38,14 @@ public class BusQueueServiceImpl implements BusQueueService {
@Resource
private UserInfoMapper userInfoMapper;
@Lazy
@Resource
private AsyncTaskService asyncTaskService;
private UserMessageBindingMapper userMessageBindingMapper;
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public int deleteByPrimaryKey(String id) throws IOException, TimeoutException
public int deleteByPrimaryKey(String id) throws IOException, TimeoutException, InterruptedException
{
// 判断删除的个数,需被删除的个数是否一致
int index = 0;
... ... @@ -59,7 +60,7 @@ public class BusQueueServiceImpl implements BusQueueService {
// 删除MQ服务器上的队列
deleteQueue(busQueue);
// 级联删除数据库中与队列有关的配置关系
asyncTaskService.queueCascadeDelete(busQueue);
queueCascadeDelete(busQueue);
if (num > 0) {
index = index + num;
}
... ... @@ -76,7 +77,7 @@ public class BusQueueServiceImpl implements BusQueueService {
// 删除MQ服务器上的队列
deleteQueue(busQueue);
// 级联删除数据库中与队列有关的配置关系
asyncTaskService.queueCascadeDelete(busQueue);
queueCascadeDelete(busQueue);
return num;
}
}
... ... @@ -89,6 +90,16 @@ public class BusQueueServiceImpl implements BusQueueService {
rabbitUtils.toRemoveQueue(busQueue);
}
/**
* 删除队列时的级联删除
*
* @param busQueue {@link BusQueue}
*/
public void queueCascadeDelete(BusQueue busQueue)
{
int num = userMessageBindingMapper.deleteByQueueId(busQueue.getId());
}
@Override
public int deleteByVirtualHostId(String virtualHostId)
{
... ...
... ... @@ -3,9 +3,11 @@ package com.sunyo.wlpt.message.bus.service.service.impl;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.domain.RoutingKey;
import com.sunyo.wlpt.message.bus.service.domain.UserMessageBinding;
import com.sunyo.wlpt.message.bus.service.mapper.RoutingKeyMapper;
import com.sunyo.wlpt.message.bus.service.mapper.UserMessageBindingMapper;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
import com.sunyo.wlpt.message.bus.service.service.RoutingKeyService;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
... ... @@ -24,9 +26,11 @@ import java.util.concurrent.TimeoutException;
@Service
public class RoutingKeyServiceImpl implements RoutingKeyService {
@Lazy
@Resource
private AsyncTaskService asyncTaskService;
private RabbitUtils rabbitUtils;
@Resource
private UserMessageBindingMapper userMessageBindingMapper;
@Resource
private RoutingKeyMapper routingKeyMapper;
... ... @@ -43,8 +47,8 @@ public class RoutingKeyServiceImpl implements RoutingKeyService {
String[] split = id.split(splitItem);
for (int i = 0; i < split.length; i++) {
RoutingKey routingKey = routingKeyMapper.selectByPrimaryKey(split[i]);
// 异步,级联删除包含该路由键的配置关系
asyncTaskService.routingKeyCascadeDelete(routingKey);
// 级联删除包含该路由键的配置关系
routingKeyCascadeDelete(routingKey);
int num = routingKeyMapper.deleteByPrimaryKey(split[i]);
if (num > 0) {
index = index + num;
... ... @@ -57,12 +61,33 @@ public class RoutingKeyServiceImpl implements RoutingKeyService {
}
} else {
RoutingKey routingKey = routingKeyMapper.selectByPrimaryKey(id);
// 异步,级联删除包含该路由键的配置关系
asyncTaskService.routingKeyCascadeDelete(routingKey);
// 级联删除包含该路由键的配置关系
routingKeyCascadeDelete(routingKey);
return routingKeyMapper.deleteByPrimaryKey(id);
}
}
/**
* 删除路由键时的级联删除
*
* @param routingKey {@link RoutingKey}
* @throws IOException
* @throws TimeoutException
*/
public void routingKeyCascadeDelete(RoutingKey routingKey) throws IOException, TimeoutException
{
String routingKeyId = routingKey.getId();
// 根据路由键id查询出所有的包含该路由键的绑定关系
List<UserMessageBinding> bindings = userMessageBindingMapper.selectByRoutingKeyId(routingKeyId);
for (UserMessageBinding userMessageBinding : bindings) {
// 解除MQ服务器上的绑定关系
rabbitUtils.toRemoveBinding(userMessageBinding);
}
// 删除数据库中的包含该路由键相关绑定关系
userMessageBindingMapper.deleteByRoutingKeyId(routingKey.getId());
}
@Override
public int deleteByExchangeId(String exchangeId)
{
... ...
... ... @@ -6,6 +6,7 @@ import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import com.sunyo.wlpt.message.bus.service.domain.BusServer;
import com.sunyo.wlpt.message.bus.service.domain.UserInfo;
import com.sunyo.wlpt.message.bus.service.domain.VirtualHost;
import com.sunyo.wlpt.message.bus.service.mapper.BusQueueMapper;
import com.sunyo.wlpt.message.bus.service.mapper.UserInfoMapper;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.ClientUtils;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
... ... @@ -42,6 +43,9 @@ public class UserInfoServiceImpl implements UserInfoService {
private UserInfoMapper userInfoMapper;
@Resource
private BusQueueMapper busQueueMapper;
@Resource
private RabbitProperties rabbitProperties;
@Resource
... ... @@ -253,15 +257,20 @@ public class UserInfoServiceImpl implements UserInfoService {
@Override
public ResultJson updatePassword(UserInfo userInfo) throws IOException, URISyntaxException
{
if (StringUtil.isNullOrEmpty(userInfo.getUsername()) || StringUtil.isNullOrEmpty(userInfo.getPassword())) {
String username = userInfo.getUsername();
String newPassword = userInfo.getPassword();
if (StringUtil.isNullOrEmpty(username) || StringUtil.isNullOrEmpty(newPassword)) {
return new ResultJson<>("400", "用户名和密码,不能为空");
}
if (userInfoMapper.selectUserExist(userInfo.getUsername()).size() == 0) {
if (userInfoMapper.selectUserExist(username).size() == 0) {
return new ResultJson<>("400", "该用户不存在");
}
BusServer busServer = busServerService.selectByPrimaryKey(userInfo.getServerId());
String newPassword = userInfo.getPassword();
ClientUtils.updatePassword(busServer, userInfo.getUsername(), newPassword);
List<UserInfo> userInfoList = userInfoMapper.selectUserInfoList(userInfo);
for (UserInfo user : userInfoList) {
BusServer busServer = busServerService.selectByPrimaryKey(user.getServerId());
ClientUtils.updatePassword(busServer, userInfo.getUsername(), newPassword);
}
return new ResultJson<>("200", "修改密码成功");
}
... ... @@ -277,34 +286,51 @@ public class UserInfoServiceImpl implements UserInfoService {
}
/**
* 删除根据用户名删除该用户的所有关系
* 删除根据用户名删除该用户的所有关系(只删除当前MQ服务器的该用户)
* <p>
* QM服务器上删除该用户(会自动删除该用户关系)
* <p>
* MQ服务器(以及数据库)删除该用户对应队列(日后需要时,打开注释,即可)
* MQ服务器(以及数据库)删除该用户对应队列
*/
@Override
public ResultJson deleteUserInfo(UserInfo userInfo) throws IOException, URISyntaxException
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public ResultJson deleteUserInfo(UserInfo userInfo) throws IOException, URISyntaxException, TimeoutException, InterruptedException
{
String username = userInfo.getUsername();
List<UserInfo> userInfoList = userInfoMapper.selectByUsername(userInfo.getUsername());
int num = userInfoMapper.deleteByUsername(username);
String serverName = userInfo.getServerName();
String virtualHostId = userInfo.getVirtualHostId();
String splitItem = ",";
List<UserInfo> userInfoList = userInfoMapper.selectByUsernameAndServerName(username, serverName);
int num = userInfoMapper.deleteByUsernameAndServerName(username, serverName);
for (UserInfo item : userInfoList) {
BusServer busServer = busServerService.selectByPrimaryKey(item.getServerId());
ClientUtils.deleteMQUser(username, busServer);
}
// List<BusQueue> queueList = busQueueService.selectByUsername(username);
// for (BusQueue queue : queueList) {
// busQueueService.deleteByPrimaryKey(queue.getId());
// }
/**
* TODO: 删除该用户对应的队列
*/
if (virtualHostId.contains(splitItem)) {
String[] splitHostId = virtualHostId.split(splitItem);
for (int i = 0; i < splitHostId.length; i++) {
List<BusQueue> queueList = busQueueMapper.selectByUsernameAndHostId(username, splitHostId[i]);
for (BusQueue queue : queueList) {
busQueueService.deleteByPrimaryKey(queue.getId());
}
}
} else {
List<BusQueue> queueList = busQueueMapper.selectByUsernameAndHostId(username, virtualHostId);
for (BusQueue queue : queueList) {
busQueueService.deleteByPrimaryKey(queue.getId());
}
}
return num > 0
? new ResultJson<>("200", "删除用户信息,成功!")
: new ResultJson<>("500", "删除用户信息,失败!");
}
/**
* 删除用户关系(数据库 and MQ服务器)
*
... ... @@ -314,6 +340,7 @@ public class UserInfoServiceImpl implements UserInfoService {
* @throws URISyntaxException URI语法异常
*/
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public ResultJson deleteUserRelation(UserInfo userInfo) throws IOException, URISyntaxException
{
String virtualHostName = userInfo.getVirtualHostName();
... ... @@ -326,8 +353,6 @@ public class UserInfoServiceImpl implements UserInfoService {
}
}
int num = userInfoMapper.deleteByPrimaryKey(userInfo.getId());
BusServer busServer = busServerService.selectByPrimaryKey(userInfo.getServerId());
ClientUtils.clearPermissions(busServer, userInfo.getVirtualHostName(), userInfo.getUsername());
... ...
... ... @@ -270,4 +270,14 @@
from bus_queue
where username = #{username,jdbcType=VARCHAR}
</select>
<select id="selectByUsernameAndHostId" parameterType="java.lang.String" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from bus_queue
<where>
username = #{username,jdbcType=VARCHAR}
and virtual_host_id = #{virtualHostId,jdbcType=VARCHAR}
</where>
</select>
</mapper>
\ No newline at end of file
... ...
... ... @@ -42,6 +42,15 @@
where username = #{username,jdbcType=VARCHAR}
</delete>
<delete id="deleteByUsernameAndServerName" parameterType="java.lang.String">
delete
from user_info
<where>
username = #{username,jdbcType=VARCHAR}
and server_name= #{serverName,jdbcType=VARCHAR}
</where>
</delete>
<delete id="deleteByServerName" parameterType="java.lang.String">
delete
from user_info
... ... @@ -276,4 +285,14 @@
</if>
</where>
</select>
<select id="selectByUsernameAndServerName" parameterType="java.lang.String" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from user_info
<where>
username = #{username,jdbcType=VARCHAR}
and server_name = #{serverName,jdbcType=VARCHAR}
</where>
</select>
</mapper>
\ No newline at end of file
... ...