作者 王勇

初步完成发送消息,并异步保存至数据库

... ... @@ -5,6 +5,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
... ... @@ -17,6 +18,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
@EnableEurekaClient
@EnableTransactionManagement
@EnableScheduling
@EnableAsync
public class MessageBusServiceApplication {
public static void main(String[] args) {
... ...
... ... @@ -5,8 +5,11 @@ import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.DirectUtils;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.service.AsyncTaskService;
import com.sunyo.wlpt.message.bus.service.service.MessageNoteService;
import com.sunyo.wlpt.message.bus.service.service.UserMessageBindingService;
import com.sunyo.wlpt.message.bus.service.utils.XmlUtils;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.web.bind.annotation.*;
... ... @@ -21,6 +24,7 @@ import java.util.concurrent.TimeoutException;
* Description:
* 时间:2020/7/16 14:46
*/
@Slf4j
@CrossOrigin
@RequestMapping("bus/rabbit")
@RestController
... ... @@ -32,11 +36,17 @@ public class RabbitController {
private XmlUtils xmlUtils;
@Resource
private MessageNoteService messageNoteService;
@Resource
private RabbitUtils rabbitUtils;
@Resource
private DirectUtils directUtils;
@Resource
private AsyncTaskService asyncTaskService;
@GetMapping("/test/consumer")
public void consumer() throws IOException, TimeoutException
{
... ... @@ -103,8 +113,20 @@ public class RabbitController {
return ResultJson.error(CustomExceptionType.BINDING_ERROR);
}
// 4、mq发送消息,数据库中保存消息
ResultJson result = directUtils.sendMessage(sentData);
// ResultJson result = directUtils.sendMessage(sentData);
// if (CustomExceptionType.MESSAGE_SUCCESS.getCode().equals(result.getCode())) {
// // mq发送消息成功之后,将消息存储于数据库
// messageNoteService.insertMessageSelective(sentData);
// }
return sendAndSave(sentData);
}
public ResultJson sendAndSave(XmlData sentData) throws Exception
{
// 4、mq发送消息,数据库中保存消息
ResultJson result = directUtils.sendMessage(sentData);
// 异步,保存消息记录
asyncTaskService.saveMessage(sentData);
return result;
}
}
... ...
package com.sunyo.wlpt.message.bus.service.mapper;
import com.sunyo.wlpt.message.bus.service.domain.MessageNote;
import org.apache.ibatis.annotations.Mapper;import org.apache.ibatis.annotations.Param;import java.util.List;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @author 子诚
... ... @@ -72,4 +75,5 @@ public interface MessageNoteMapper {
* @param deleteTime 删除的时间
*/
void autoDelete(@Param("deleteTime") Integer deleteTime);
}
\ No newline at end of file
... ...
... ... @@ -234,9 +234,7 @@ public class DirectUtils {
public ResultJson directProducer(XmlData xmlData) throws Exception
{
// 1、创建ConnectionFactory
// (String hostIp, int hostPort, String vHostName, String userName, String password) throws Exception
//
// 1、创建Connection
Connection connection = getConnection(xmlData.getServerIp(), xmlData.getServerPort(),
xmlData.getVirtualHostName(), xmlData.getSender(), xmlData.getPassword());
// 2、 通过Connection创建一个新的Channel
... ... @@ -245,8 +243,8 @@ public class DirectUtils {
channel.confirmSelect();
// 4、避免消息被重复消费
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
// 指定消息是否需要持久化,1:需要持久化;2:不需要持久化
.deliveryMode(1)
// 指定消息是否需要持久化,2:持久化;1.非持久化
.deliveryMode(2)
// 设置全局唯一消息机制id(雪花id)
.messageId(IdUtils.generateId())
.build();
... ... @@ -263,6 +261,7 @@ public class DirectUtils {
// 6、发送消息,并指定 mandatory 参数为true
channel.basicPublish(xmlData.getExchangeName(), xmlData.getRoutingKeyName(), true, properties,
xmlData.getSendContent().getBytes());
log.info("消息生产者,目标交换机:{};路由键:{};发送信息:{}", xmlData.getExchangeName(), xmlData.getRoutingKeyName(),
xmlData.getSendContent().getBytes());
// 7、添加一个异步 confirm 确认监听,用于发送消息到Broker端之后,回送消息的监听
... ...
... ... @@ -24,7 +24,7 @@ public class ResultJson<T> implements Serializable {
/**
* 响应消息
*/
private String msg = "";
private String msg;
/**
* 错误消息内容
... ... @@ -103,7 +103,10 @@ public class ResultJson<T> implements Serializable {
{
return new ResultJson<>("200", message, data);
}
public static ResultJson success(CustomExceptionType customExceptionType)
{
return new ResultJson<>(customExceptionType.getCode(), customExceptionType.getMsg());
}
/**
* 请求出现异常时的响应数据封装
*
... ...
package com.sunyo.wlpt.message.bus.service.service;
import com.sunyo.wlpt.message.bus.service.domain.XmlData;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author 子诚
* Description:这个Service,用来保存异步任务;
* 注意点:异步任务需要单独放在一个类中
* 时间:2020/7/30 15:59
*/
@Component
public class AsyncTaskService {
@Resource
private MessageNoteService messageNoteService;
@Async
public void saveMessage(XmlData sentData)
{
// 无论消息是否发送成功,将消息存储于数据库
messageNoteService.insertMessageSelective(sentData);
}
}
... ...
... ... @@ -2,6 +2,7 @@ package com.sunyo.wlpt.message.bus.service.service;
import com.github.pagehelper.PageInfo;
import com.sunyo.wlpt.message.bus.service.domain.MessageNote;
import com.sunyo.wlpt.message.bus.service.domain.XmlData;
/**
* @author 子诚
... ... @@ -74,6 +75,15 @@ public interface MessageNoteService {
* @param deleteTime 删除的时间
*/
void autoDelete(Integer deleteTime);
/**
* 存储MQ发送的消息于数据库
*
* @param xmlData {@link XmlData}
* @return 插入数量
*/
int insertMessageSelective(XmlData xmlData);
}
... ...
... ... @@ -124,6 +124,34 @@ public class MessageNoteServiceImpl implements MessageNoteService {
}
@Override
public int insertMessageSelective(XmlData xmlData)
{
String description = "序列-->" + xmlData.getSequence() + "; token-->" + xmlData.getToken();
MessageNote messageNote = MessageNote.builder()
.id(IdUtils.generateId())
// 发送者
.username(xmlData.getSender())
// 服务器
.serverName(xmlData.getServerName())
// 虚拟主机
.virtualHostName(xmlData.getVirtualHostName())
// 交换机
.exchangeName(xmlData.getExchangeName())
// 路由键
.routingKeyName(xmlData.getRoutingKeyName())
// 发送时间
.sendTime(xmlData.getSendDateTime())
// 消息内容
.sendContent(xmlData.getSendContent().getBytes())
// 描述:序列+token,or 自我描述
.description(description)
.build();
int num = messageNoteMapper.insertSelective(messageNote);
return num;
}
/**
* 填充名称(使用get方法,如果不存在就会报空指针异常)
*
... ...