作者 王勇

初步完成发送消息

... ... @@ -76,7 +76,7 @@ public class RabbitController {
@RequestParam(value = "SEQN", required = false) String SEQN,
@RequestParam(value = "VSHT") String VSHT,
@RequestParam(value = "SERV") String SERV,
@RequestParam(value = "content") String content)
@RequestParam(value = "content") String content) throws Exception
{
// 1、获取数据
XmlData xmlData = XmlData.builder()
... ... @@ -103,8 +103,8 @@ public class RabbitController {
return ResultJson.error(CustomExceptionType.BINDING_ERROR);
}
// 4、mq发送消息,数据库中保存消息
ResultJson result = directUtils.sendMessage();
ResultJson result = directUtils.sendMessage(sentData);
return ResultJson.success("发送成功", sentData);
return result;
}
}
... ...
... ... @@ -105,7 +105,7 @@ public class XmlData implements Serializable {
private String password;
private String hostIp;
private String serverIp;
private Integer hostPort;
private Integer serverPort;
}
... ...
... ... @@ -7,8 +7,11 @@ package com.sunyo.wlpt.message.bus.service.exception;
*/
public enum CustomExceptionType {
MESSAGE_SUCCESS("10200", "消息发送成功"),
BINDING_ERROR("10501", "配置信息,未进行绑定!"),
SENDER_ERROR("10401", "报文格式错误,发送者不能为空!"),
CONTENT_ERROR("10402", "报文格式错误,消息内容不能为空!"),
SERVER_ERROR("10403", "报文格式错误,服务器名称不能为空!"),
... ...
package com.sunyo.wlpt.message.bus.service.rabbit.utils;
import com.rabbitmq.client.*;
import com.sunyo.wlpt.message.bus.service.domain.XmlData;
import com.sunyo.wlpt.message.bus.service.exception.CustomExceptionType;
import com.sunyo.wlpt.message.bus.service.response.ResultJson;
import com.sunyo.wlpt.message.bus.service.utils.IdUtils;
import lombok.extern.slf4j.Slf4j;
... ... @@ -222,9 +224,66 @@ public class DirectUtils {
}
public ResultJson sendMessage()
public ResultJson sendMessage(XmlData xmlData) throws Exception
{
return new ResultJson<>();
/**
* 可以在这里根据类型的不同,进行不同的消息发送
*/
return directProducer(xmlData);
}
public ResultJson directProducer(XmlData xmlData) throws Exception
{
// 1、创建ConnectionFactory
// (String hostIp, int hostPort, String vHostName, String userName, String password) throws Exception
//
Connection connection = getConnection(xmlData.getServerIp(), xmlData.getServerPort(),
xmlData.getVirtualHostName(), xmlData.getSender(), xmlData.getPassword());
// 2、 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
// 3、开启消息的确认机制(confirm:保证消息能够发送到 exchange)
channel.confirmSelect();
// 4、避免消息被重复消费
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
// 指定消息是否需要持久化,1:需要持久化;2:不需要持久化
.deliveryMode(1)
// 设置全局唯一消息机制id(雪花id)
.messageId(IdUtils.generateId())
.build();
// 5、开启 return 机制(保证消息,从 Exchange 分发到 Queue )
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException
{
// 当消息没有从 Exchange 分发到 Queue 时,才会执行
log.error(new String(body, "UTF8") + "->没有从 Exchange 分发到Queue中");
}
});
// 6、发送消息,并指定 mandatory 参数为true
channel.basicPublish(xmlData.getExchangeName(), xmlData.getRoutingKeyName(), true, properties,
xmlData.getSendContent().getBytes());
log.info("消息生产者,目标交换机:{};路由键:{};发送信息:{}", xmlData.getExchangeName(), xmlData.getRoutingKeyName(),
xmlData.getSendContent().getBytes());
// 7、添加一个异步 confirm 确认监听,用于发送消息到Broker端之后,回送消息的监听
channel.addConfirmListener(new ConfirmListener() {
// 发送成功
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException
{
log.info("消息发送成功,标识:{};是否是批量:{}", deliveryTag, multiple);
}
// 发送失败
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException
{
log.error("消息发送失败,标识:{};是否是批量:{}", deliveryTag, multiple);
}
});
// finally,关闭连接
closeConnectionAndChanel(channel, connection);
return ResultJson.success(CustomExceptionType.MESSAGE_SUCCESS);
}
}
... ...
... ... @@ -139,9 +139,15 @@ public class XmlUtils {
// 路由键不存在
return ResultJson.error(CustomExceptionType.ROUTING_KEY_NO_EXIST);
}
// 获取密码
xmlData.setPassword(userList.get(0).getPassword());
ResultJson<XmlData> result = new ResultJson<>("200","通过格式与数据校验",xmlData);
// 获取服务器ip
xmlData.setServerPort(serverList.get(0).getServerPort());
// 获取服务器port
xmlData.setServerIp(serverList.get(0).getServerIp());
ResultJson<XmlData> result = new ResultJson<>("200", "通过格式与数据校验", xmlData);
return result;
}
}
... ...