作者 王勇

简单封装rabbitMQ工具类,并测试

... ... @@ -33,6 +33,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<!-- SpringBoot end -->
<!-- SpringCloud start -->
... ... @@ -69,6 +70,11 @@
<!-- SpringCloud end -->
<!-- database start -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
... ... @@ -128,6 +134,11 @@
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
... ...
package com.sunyo.wlpt.message.bus.service.controller;
import com.sunyo.wlpt.message.bus.service.rabbit.test.TestProduct;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.DirectUtils;
import com.sunyo.wlpt.message.bus.service.rabbit.utils.RabbitUtils;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author 子诚
* Description:
... ... @@ -14,5 +22,27 @@ import org.springframework.web.bind.annotation.RestController;
@RestController
public class RabbitController {
@Resource
private TestProduct testProduct;
@Resource
private RabbitUtils rabbitUtils;
@Resource
private DirectUtils directUtils;
@GetMapping("/test/product")
public void product() throws IOException, TimeoutException
{
rabbitUtils.createExchange("E_zicheng");
rabbitUtils.createQueue("Q_zicheng");
rabbitUtils.createBing("E_zicheng","Q_zicheng","R_zicheng");
directUtils.directProducer("E_zicheng", "R_zicheng", "2020-7-21,进行测试->" + Math.random() * 100);
}
@GetMapping("/test/consumer")
public void consumer() throws IOException, TimeoutException
{
directUtils.directConsumer("Q_zicheng", "E_zicheng", "R_zicheng");
}
}
... ...
package com.sunyo.wlpt.message.bus.service.rabbit;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 子诚
* Description:交换机类型是direct(直连)的rabbit的配置文件
* 时间:2020/7/16 16:20
*/
@Configuration
public class DirectExchangeRabbitConfig {
// 定义直连交换机
public static final String DIRECT_EXCHANGE_NAME = "E_direct";
private static final String queue4BindingKey1 = "big";
private static final String queue4BindingKey2 = "small";
private static final String queue5BindingKey = "cat";
// 声明直连交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE_NAME);
}
// 声明消息队列
@Bean
public Queue messageQueue4() {
return new Queue("queue4");
}
@Bean
public Queue messageQueue5() {
return new Queue("queue5");
}
// 向直连交换机上绑定队列
@Bean
Binding bindingQueue4Exchange1(Queue messageQueue4, DirectExchange directExchange) {
return BindingBuilder.bind( messageQueue4 )
.to( directExchange )
.with( queue4BindingKey1 );
}
@Bean
Binding bindingQueue4Exchange2(Queue messageQueue4, DirectExchange directExchange) {
return BindingBuilder.bind( messageQueue4 )
.to( directExchange )
.with( queue4BindingKey2 );
}
@Bean
Binding bindingQueue5Exchange(Queue messageQueue5, DirectExchange directExchange) {
return BindingBuilder.bind( messageQueue5 )
.to( directExchange )
.with( queue5BindingKey );
}
}
package com.sunyo.wlpt.message.bus.service.rabbit.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import javax.annotation.Resource;
/**
* @author 子诚
* Description:开启消息可靠性(confirm和return机制)
* 时间:2020/7/20 9:30
*/
@Slf4j
//@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 初始化的方法,设置当前上下文(下面的两个方法),为confirm和return机制的回滚事件
*/
// @PostConstruct
public void initMethod() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 根据手动ack来确定
if (ack) {
log.info("消息发送至exchange,成功");
} else {
log.error("消息发送至exchange,失败");
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息从exchange分发到queue,失败");
}
}
... ...
0、首先要有rabbitmq服务器以及对应的虚拟主机
1、要有交换机 exchange
2、要有队列 queue
3、要有队列、交换机、路由键的绑定关系 binding
4、才能将消息发送到 exchange,从exchange分发到queue,从queue处接收到消息,进行消息的消费(具体业务)
从exchange分发到queue,是通过 routingkey
\ No newline at end of file
... ...
package com.sunyo.wlpt.message.bus.service.rabbit.test;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @author 子诚
* Description:测试-消息-消费者
* 时间:2020/7/20 11:09
*/
@Slf4j
@Component
public class TestConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
// @RabbitListener(queues = "Q_test")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
try {
// 0、获取messageId
String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
// 1、设置key到redis,设置缓存时间为10秒
if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)) {
// 2、消费消息
log.info("成功消费了消息:" + msg);
// 3、设置key的value为1
redisTemplate.opsForValue().set(messageId, "1", 10, TimeUnit.SECONDS);
// 4、手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} else {
// 获取redis中的value,如果是1,就手动ack。如果是0,就什么也不做(是0代表着,正在被消费中)
if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {
// 手动ack,第一个参数是所确认消息的标识,第二参数是是否批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
}
/**
* 设置Qos机制
* 第一个参数:单条消息的大小(0表示即无限制)
* 第二个参数:每次处理消息的数量
* 第三个参数:是否为consumer级别(false表示仅当前channel有效)
*/
// channel.basicQos(0, 1, false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
log.error("消息ID:" + message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));
log.error("接收消息发送错误:" + e.getMessage());
}
}
}
... ...
package com.sunyo.wlpt.message.bus.service.rabbit.test;
import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
/**
* @author 子诚
* Description:测试-消息-生产者
* 时间:2020/7/20 11:08
*/
@Slf4j
@Component
public class TestProduct {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* @param exchangeName 交换机名称
* @param routingKeyName 路由键名称
* @param msg 发送的消息
*/
public void sentMessage(String exchangeName, String routingKeyName, String msg) throws IOException {
CorrelationData messageId = new CorrelationData(IdUtils.generateId());
rabbitTemplate.convertAndSend(exchangeName, routingKeyName, msg, messageId);
log.info("成功发送消息-> {};到交换机->{};路由键为 ->{};", msg, exchangeName, routingKeyName);
}
}
... ...
package com.sunyo.wlpt.message.bus.service.rabbit.utils;
import com.rabbitmq.client.*;
import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author 子诚
* Description:
* 时间:2020/7/21 9:32
*/
@Slf4j
@Component
public class DirectUtils {
@Autowired
private StringRedisTemplate redisTemplate;
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String vHost;
/**
* @return 链接 rabbitmq
* @throws IOException IO异常
* @throws TimeoutException 超时异常
*/
@PostConstruct
public Connection getConnection() throws IOException, TimeoutException
{
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost(host);
//端口,amqp协议 端口 类似与mysql的3306
factory.setPort(port);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost(vHost);
factory.setUsername(username);
factory.setPassword(password);
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
/**
* 链接 RabbitMQ
*
* @param hostIp mq服务器Ip地址
* @param hostPort mq服务器端口号
* @param vHostName VirtualHost名称
* @param userName 登录账号
* @param password 登录密码
* @return 返回链接
* @throws Exception
*/
public static Connection getConnection(String hostIp, int hostPort, String vHostName, String userName, String password) throws Exception
{
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost(hostIp);
//端口
factory.setPort(hostPort);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost(vHostName);
factory.setUsername(userName);
factory.setPassword(password);
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
/**
* 关闭通道和关闭连接的工具方法
*
* @param channel 通道
* @param conn 连接
*/
public static void closeConnectionAndChanel(Channel channel, Connection conn)
{
try {
if (channel != null) {
channel.close();
}
if (conn != null) {
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* DirectExchange的 消息生产者
*
* @param exchangeName 交换机名称
* @param routingKeyName 路由键名称
* @param msg 发送的消息
* @throws IOException
* @throws TimeoutException
*/
public void directProducer(String exchangeName, String routingKeyName, String msg) throws IOException, TimeoutException
{
// 1、创建ConnectionFactory
Connection connection = getConnection();
// 2、 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
// 3、开启消息的确认机制(confirm:保证消息能够发送到 exchange)
channel.confirmSelect();
// 4、避免消息被重复消费
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
// 指定消息是否需要持久化,1:需要持久化;2:不需要持久化
.deliveryMode(1)
// 设置全局唯一消息机制id(雪花id)
.messageId(IdUtils.generateId())
.build();
// 5、开启 return 机制(保证消息,从 Exchange 分发到 Queue )
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException
{
// 当消息没有从 Exchange 分发到 Queue 时,才会执行
log.error(new String(body, "UTF8") + "->没有从 Exchange 分发到Queue中");
}
});
// 6、发送消息,并指定 mandatory 参数为true
channel.basicPublish(exchangeName, routingKeyName, true, properties, msg.getBytes());
log.info("消息生产者,目标交换机:{};路由键:{};发送信息:{}", exchangeName, routingKeyName, msg);
// 7、添加一个异步 confirm 确认监听,用于发送消息到Broker端之后,回送消息的监听
channel.addConfirmListener(new ConfirmListener() {
// 发送成功
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException
{
log.info("消息发送成功,标识:{};是否是批量:{}", deliveryTag, multiple);
}
// 发送失败
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException
{
log.error("消息发送失败,标识:{};是否是批量:{}", deliveryTag, multiple);
}
});
// finally,关闭连接
closeConnectionAndChanel(channel, connection);
}
/**
* DirectExchange的 消息消费者
*
* @throws IOException IO异常
* @throws TimeoutException 超时异常
*/
public void directConsumer(String queueName, String exchangeName, String routingKeyName) throws IOException, TimeoutException
{
// 1、创建ConnectionFactory
Connection connection = getConnection();
// 2、 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
// 3、设置绑定关系(队列、交换机名称、路由键名称)
channel.queueBind(queueName, exchangeName, routingKeyName);
// 一次只接受一条未确认的消息
channel.basicQos(1);
// 4、开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
try {
// 0、获取出全局唯一的 信息业务id(messageId)
String messageId = properties.getMessageId();
// 必须保证 messageId 不为空,避免空指针异常
if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.MINUTES)) {
// 消费成功,将redis中的 messageId 对应的value修改为 1
redisTemplate.opsForValue().set(messageId, "1", 10, TimeUnit.MINUTES);
// 手动ack
channel.basicAck(envelope.getDeliveryTag(), false);
log.info("接收到消息:" + new String(body, "UTF-8"));
} else {
// 获取redis中的value,如果是1,就手动ack。如果是0,就什么也不做(是0代表着,正在被消费中)
if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {
// 手动ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
} catch (Exception e) {
// 手动ack
channel.basicAck(envelope.getDeliveryTag(), false);
log.error("接收消息发送错误:" + e.getMessage());
}
}
};
// 消费消息
channel.basicConsume(queueName, false, consumer);
}
}
... ...
package com.sunyo.wlpt.message.bus.service.rabbit;
package com.sunyo.wlpt.message.bus.service.rabbit.utils;
import com.sunyo.wlpt.message.bus.service.domain.BusExchange;
import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
... ... @@ -6,7 +6,6 @@ import com.sunyo.wlpt.message.bus.service.domain.UserMessageBinding;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
... ... @@ -20,16 +19,13 @@ import javax.annotation.Resource;
@Slf4j
@RefreshScope
@Component
public class DirectRabbitUtils {
public class RabbitUtils {
@Resource
private AmqpAdmin amqpAdmin;
@Resource
private RabbitTemplate rabbitTemplate;
@Value("${spring.rabbitmq.virtual-host}")
private String v_host;
/**
* 创建交换机(交换机名称,是否持久化,是否删除)
*
... ... @@ -77,6 +73,16 @@ public class DirectRabbitUtils {
}
/**
* 创建交换机,通过 exchangeName 创建
*/
public void createExchange(String exchangeName) {
amqpAdmin.declareExchange(
new DirectExchange(exchangeName)
);
log.info("创建了交换机:{};类型:{};", exchangeName, "DirectExchange");
}
/**
* 根据交换机名称,删除虚拟机
*
* @param exchangeName 交换机名称
... ... @@ -94,6 +100,19 @@ public class DirectRabbitUtils {
amqpAdmin.declareQueue(
new Queue(busQueue.getQueueName(), busQueue.getDurability(), false, busQueue.getAutoDelete())
);
log.info("创建了队列,队列名称->{}", busQueue.getQueueName());
}
/**
* 创建队列
*
* @param queueName 队列名称
*/
public void createQueue(String queueName) {
amqpAdmin.declareQueue(
new Queue(queueName)
);
log.info("创建了队列,队列名称->{}", queueName);
}
/**
... ... @@ -105,9 +124,29 @@ public class DirectRabbitUtils {
boolean flag = amqpAdmin.deleteQueue(queueName);
}
/**
* 创建绑定关系
*
* @param userMessageBinding {@link UserMessageBinding}
*/
public void createBing(UserMessageBinding userMessageBinding) {
amqpAdmin.declareBinding(
new Binding(userMessageBinding.getQueueName(), Binding.DestinationType.QUEUE, userMessageBinding.getExchangeName(), userMessageBinding.getRoutingKeyName(), null)
);
log.info("创建了绑定关系,交换机->{};队列->{};路由键->{}", userMessageBinding.getQueueName(), userMessageBinding.getExchangeName(), userMessageBinding.getRoutingKeyName());
}
/**
* 创建绑定关系
*
* @param exchangeName 交换机名称
* @param queueName 队列名称
* @param routingKeyName 路由键名称
*/
public void createBing(String exchangeName, String queueName, String routingKeyName) {
amqpAdmin.declareBinding(
new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKeyName, null)
);
log.info("创建了绑定关系,交换机->{};队列->{};路由键->{}", exchangeName, queueName, routingKeyName);
}
}
... ...
package com.sunyo.wlpt.message.bus.service.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author 子诚
* Description:RabbitMQ 的工具类
* 时间:2020/6/30 9:39
*/
public class RabbitUtils {
/**
* 链接 RabbitMQ
*
* @param hostIp mq服务器Ip地址
* @param hostPort mq服务器端口号
* @param vHostName VirtualHost名称
* @param userName 登录账号
* @param password 登录密码
* @return 返回链接
* @throws Exception
*/
public static Connection getConnection(String hostIp, int hostPort, String vHostName, String userName, String password) throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost(hostIp);
//端口
factory.setPort(hostPort);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost(vHostName);
factory.setUsername(userName);
factory.setPassword(password);
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
/**
* 关闭通道和关闭连接的工具方法
*
* @param channel 通道
* @param conn 连接
*/
public static void closeConnectionAndChanel(Channel channel, Connection conn) {
try {
if (channel != null) {
channel.close();
}
if (conn != null) {
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}