作者 王勇

级联删除,删除路由键时,级联删除数据库中绑定关系,MQ中的绑定关系

... ... @@ -8,7 +8,9 @@ import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static com.sunyo.wlpt.message.bus.service.common.Constant.EXIST_ROUTINGKEY_INFO;
... ... @@ -39,7 +41,8 @@ public class RoutingKeyController {
@RequestParam(value = "routingKeyName", required = false) String routingKeyName,
@RequestParam(value = "exchangeId", required = false) String exchangeId,
@RequestParam(value = "pageNum", defaultValue = "1") Integer pageNum,
@RequestParam(value = "pageSize", defaultValue = "10") Integer pageSize) {
@RequestParam(value = "pageSize", defaultValue = "10") Integer pageSize)
{
// 获取查询参数
RoutingKey routingKey = RoutingKey.builder().routingKeyName(routingKeyName).exchangeId(exchangeId).build();
// 分页查询
... ... @@ -56,8 +59,8 @@ public class RoutingKeyController {
* @return {@link ResultJson}
*/
@DeleteMapping("/delete")
public ResultJson deleteRoutingKey(@RequestBody RoutingKey routingKey) {
public ResultJson deleteRoutingKey(@RequestBody RoutingKey routingKey) throws IOException, TimeoutException
{
return routingKeyService.deleteByPrimaryKey(routingKey.getId()) > 0
? new ResultJson<>("200", "删除路由键,成功")
: new ResultJson<>("500", "删除路由键,失败");
... ... @@ -70,8 +73,8 @@ public class RoutingKeyController {
* @return {@link ResultJson}
*/
@GetMapping("/batchRemove")
public ResultJson batchRemoveRoutingKey(String ids) {
public ResultJson batchRemoveRoutingKey(String ids) throws IOException, TimeoutException
{
return routingKeyService.deleteByPrimaryKey(ids) > 0
? new ResultJson<>("200", "删除路由键,成功")
: new ResultJson<>("500", "删除路由键,失败");
... ... @@ -84,7 +87,8 @@ public class RoutingKeyController {
* @return {@link ResultJson}
*/
@PutMapping("/update")
public ResultJson updateRoutingKey(@RequestBody RoutingKey routingKey) {
public ResultJson updateRoutingKey(@RequestBody RoutingKey routingKey)
{
//先验证,修改好的核心信息(路由键名称)是否已存在
String message = validateRoutingKey(routingKey);
... ... @@ -102,7 +106,8 @@ public class RoutingKeyController {
* @return {@link ResultJson}
*/
@PostMapping("/insert")
public ResultJson insertRoutingKey(@RequestBody RoutingKey routingKey) {
public ResultJson insertRoutingKey(@RequestBody RoutingKey routingKey)
{
//先验证,增加的服务器的核心信息(ip和port,同时存在)是否已存在
String message = validateRoutingKey(routingKey);
... ... @@ -122,7 +127,8 @@ public class RoutingKeyController {
* @param routingKey {@link RoutingKey}
* @return 通过,无返回消息
*/
private String validateRoutingKey(RoutingKey routingKey) {
private String validateRoutingKey(RoutingKey routingKey)
{
if ("".equals(routingKey.getRoutingKeyName()) || routingKey.getRoutingKeyName() == null) {
return "该路由键信息中,没有路由键名称";
}
... ...
... ... @@ -2,6 +2,7 @@ package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.domain.RoutingKey;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
... ... @@ -86,6 +87,14 @@ public interface RoutingKeyMapper {
List<RoutingKey> getRoutingKeyList(RoutingKey routingKey);
/**
* 根据exchangeID查询路由键
*
* @param exchangeId 交换机id
* @return
*/
List<RoutingKey> selectByExchangeId(@Param("exchangeId") String exchangeId);
/**
* 检验路由键是否存在,根据交换机id和路由键名称
*
* @param routingKey 路由键
... ...
... ... @@ -39,6 +39,14 @@ public interface UserMessageBindingMapper {
int deleteByQueueId(String queueId);
/**
* 根据路由键id删除配置关系
*
* @param routingKeyId 路由键id
* @return 删除成功
*/
int deleteByRoutingKeyId(String routingKeyId);
/**
* insert record to table
*
* @param record the record
... ... @@ -63,6 +71,14 @@ public interface UserMessageBindingMapper {
UserMessageBinding selectByPrimaryKey(String id);
/**
* 根据路由键id
*
* @param routingKeyId 路由键id
* @return
*/
List<UserMessageBinding> selectByRoutingKeyId(String routingKeyId);
/**
* update record selective
*
* @param record the updated record
... ...
... ... @@ -2,8 +2,11 @@ package com.sunyo.wlpt.message.bus.service.service;
import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.domain.RoutingKey;
import org.apache.ibatis.annotations.Param;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* @author 子诚
... ... @@ -18,7 +21,7 @@ public interface RoutingKeyService {
* @param id primaryKey
* @return deleteCount
*/
int deleteByPrimaryKey(String id);
int deleteByPrimaryKey(String id) throws IOException, TimeoutException;
/**
* 根据交换机id,删除路由键信息
... ... @@ -53,6 +56,14 @@ public interface RoutingKeyService {
RoutingKey selectByPrimaryKey(String id);
/**
* 根据exchangeID查询路由键
*
* @param exchangeId 交换机id
* @return
*/
List<RoutingKey> selectByExchangeId(@Param("exchangeId") String exchangeId);
/**
* 更新,选择性,根据主键
*
* @param record the updated record
... ...
... ... @@ -5,6 +5,7 @@ import com.sunyo.wlpt.message.bus.service.domain.UserMessageBinding;
import com.sunyo.wlpt.message.bus.service.domain.XmlData;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
... ... @@ -30,6 +31,14 @@ public interface UserMessageBindingService {
int deleteByExchangeId(String exchangeId);
/**
* 根据路由键id删除配置关系
*
* @param routingKeyId 路由键id
* @return 删除成功
*/
int deleteByRoutingKeyId(String routingKeyId);
/**
* 根据队列Id,删除绑定关系
*
* @param queueId 队列id
... ... @@ -94,6 +103,14 @@ public interface UserMessageBindingService {
* @return true or false
*/
Boolean validateXmlBinding(XmlData xmlData);
/**
* 根据路由键id
*
* @param routingKeyId 路由键id
* @return
*/
List<UserMessageBinding> selectByRoutingKeyId(String routingKeyId);
}
... ...
package com.sunyo.wlpt.message.bus.service.service.impl;
import com.sunyo.wlpt.message.bus.service.domain.BusExchange;
import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
import com.sunyo.wlpt.message.bus.service.domain.XmlData;
import com.sunyo.wlpt.message.bus.service.domain.*;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
import com.sunyo.wlpt.message.bus.service.service.MessageNoteService;
import com.sunyo.wlpt.message.bus.service.service.RoutingKeyService;
import com.sunyo.wlpt.message.bus.service.service.UserMessageBindingService;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* @author 子诚
... ... @@ -17,10 +21,13 @@ import javax.annotation.Resource;
* 注意点:异步任务需要单独放在一个类中
* 时间:2020/7/30 15:59
*/
@Component
@Service
public class AsyncTaskService {
@Resource
private RabbitUtils rabbitUtils;
@Resource
private MessageNoteService messageNoteService;
@Resource
... ... @@ -53,15 +60,42 @@ public class AsyncTaskService {
/**
* 当删除交换机的时候,
* <p>
* 同时,删除对应的路由键,删除包含交换机的绑定关系
* <p>
* 同时,删除包含该路由键的绑定关系
*/
@Async
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
void exchangeCascadeDelete(BusExchange busExchange)
{
String exchangeId = busExchange.getId();
List<RoutingKey> routingKeyList = routingKeyService.selectByExchangeId(exchangeId);
// 删除相关路由键
routingKeyService.deleteByExchangeId(busExchange.getId());
// 删除相关配置关系
userMessageBindingService.deleteByExchangeId(busExchange.getId());
routingKeyService.deleteByExchangeId(exchangeId);
// 删除交换机相关配置关系
userMessageBindingService.deleteByExchangeId(exchangeId);
// 删除路由键相关配置关系
for (int i = 0; i < routingKeyList.size(); i++) {
userMessageBindingService.deleteByRoutingKeyId(routingKeyList.get(0).getId());
}
}
/**
* 当删除路由键的时候,删除包含该路由键的配置关系
*/
@Async
void routingKeyCascadeDelete(RoutingKey routingKey) throws IOException, TimeoutException
{
String routingKeyId = routingKey.getId();
// 根据路由键id查询出所有的包含该路由键的配置关系
List<UserMessageBinding> bindings = userMessageBindingService.selectByRoutingKeyId(routingKeyId);
for (UserMessageBinding userMessageBinding : bindings) {
// 解除MQ服务器上的配置关系
rabbitUtils.toRemoveBinding(userMessageBinding);
}
// 删除数据库中的包含该路由键相关配置关系
userMessageBindingService.deleteByRoutingKeyId(routingKey.getId());
}
/**
... ...
... ... @@ -7,6 +7,7 @@ import com.sunyo.wlpt.message.bus.service.mapper.BusQueueMapper;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
import com.sunyo.wlpt.message.bus.service.service.BusQueueService;
import io.netty.util.internal.StringUtil;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
... ... @@ -31,6 +32,7 @@ public class BusQueueServiceImpl implements BusQueueService {
@Resource
private RabbitUtils rabbitUtils;
@Lazy
@Resource
private AsyncTaskService asyncTaskService;
... ...
... ... @@ -5,13 +5,16 @@ import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.domain.RoutingKey;
import com.sunyo.wlpt.message.bus.service.mapper.RoutingKeyMapper;
import com.sunyo.wlpt.message.bus.service.service.RoutingKeyService;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* @author 子诚
... ... @@ -21,12 +24,16 @@ import java.util.List;
@Service
public class RoutingKeyServiceImpl implements RoutingKeyService {
@Lazy
@Resource
private AsyncTaskService asyncTaskService;
@Resource
private RoutingKeyMapper routingKeyMapper;
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public int deleteByPrimaryKey(String id)
public int deleteByPrimaryKey(String id) throws IOException, TimeoutException
{
// 判断删除的个数,需被删除的个数是否一致
int index = 0;
... ... @@ -35,6 +42,9 @@ public class RoutingKeyServiceImpl implements RoutingKeyService {
if (id.contains(splitItem)) {
String[] split = id.split(splitItem);
for (int i = 0; i < split.length; i++) {
RoutingKey routingKey = routingKeyMapper.selectByPrimaryKey(split[i]);
// 异步,级联删除包含该路由键的配置关系
asyncTaskService.routingKeyCascadeDelete(routingKey);
int num = routingKeyMapper.deleteByPrimaryKey(split[i]);
if (num > 0) {
index = index + num;
... ... @@ -46,6 +56,9 @@ public class RoutingKeyServiceImpl implements RoutingKeyService {
return 0;
}
} else {
RoutingKey routingKey = routingKeyMapper.selectByPrimaryKey(id);
// 异步,级联删除包含该路由键的配置关系
asyncTaskService.routingKeyCascadeDelete(routingKey);
return routingKeyMapper.deleteByPrimaryKey(id);
}
}
... ... @@ -75,6 +88,12 @@ public class RoutingKeyServiceImpl implements RoutingKeyService {
}
@Override
public List<RoutingKey> selectByExchangeId(String exchangeId)
{
return routingKeyMapper.selectByExchangeId(exchangeId);
}
@Override
public int updateByPrimaryKeySelective(RoutingKey record)
{
return routingKeyMapper.updateByPrimaryKeySelective(record);
... ...
... ... @@ -435,6 +435,18 @@ public class UserMessageBindingServiceImpl implements UserMessageBindingService
{
return userMessageBindingMapper.deleteByQueueId(queueId);
}
@Override
public int deleteByRoutingKeyId(String routingKeyId)
{
return userMessageBindingMapper.deleteByRoutingKeyId(routingKeyId);
}
@Override
public List<UserMessageBinding> selectByRoutingKeyId(String routingKeyId)
{
return userMessageBindingMapper.selectByRoutingKeyId(routingKeyId);
}
}
... ...
... ... @@ -85,6 +85,18 @@
</where>
</select>
<select id="selectByExchangeId" parameterType="java.lang.String" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from routing_key
<where>
<!-- 所属交换机ID -->
<if test="exchangeId != null and exchangeId != ''">
exchange_id = #{exchangeId,jdbcType=VARCHAR}
</if>
</where>
</select>
<select id="selectRoutingKeyExist" parameterType="com.sunyo.wlpt.message.bus.service.domain.RoutingKey"
resultMap="BaseResultMap">
select
... ... @@ -97,7 +109,7 @@
</if>
<!-- 路由键名称 -->
<if test="routingKeyName != null and routingKeyName != ''">
and routing_key_name = #{routingKeyName,jdbcType=VARCHAR}
and routing_key_name = #{routingKeyName,jdbcType=VARCHAR}
</if>
</where>
</select>
... ...
... ... @@ -34,6 +34,14 @@
where id = #{id,jdbcType=VARCHAR}
</select>
<select id="selectByRoutingKeyId" parameterType="java.lang.String" resultMap="BaseResultMap">
<!--@mbg.generated-->
select
<include refid="Base_Column_List"/>
from user_message_binding
where routing_key_id = #{routingKeyId,jdbcType=VARCHAR}
</select>
<delete id="deleteByPrimaryKey" parameterType="java.lang.String">
<!--@mbg.generated-->
delete
... ... @@ -56,6 +64,15 @@
from user_message_binding
where queue_id = #{queueId,jdbcType=VARCHAR}
</delete>
<!-- 根据路由键id,删除配置关系-->
<delete id="deleteByRoutingKeyId" parameterType="java.lang.String">
<!--@mbg.generated-->
delete
from user_message_binding
where routing_key_id = #{routingKeyId,jdbcType=VARCHAR}
</delete>
<insert id="insert" parameterType="com.sunyo.wlpt.message.bus.service.domain.UserMessageBinding">
<!--@mbg.generated-->
insert into user_message_binding (id, user_id, username,
... ...