作者 朱兆平

增加rabitMq处理工具类

@@ -35,6 +35,19 @@ @@ -35,6 +35,19 @@
35 <artifactId>annotations</artifactId> 35 <artifactId>annotations</artifactId>
36 <version>RELEASE</version> 36 <version>RELEASE</version>
37 </dependency> 37 </dependency>
  38 +
  39 + <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
  40 + <dependency>
  41 + <groupId>com.rabbitmq</groupId>
  42 + <artifactId>amqp-client</artifactId>
  43 + <version>5.7.0</version>
  44 + </dependency>
  45 + <dependency>
  46 + <groupId>org.projectlombok</groupId>
  47 + <artifactId>lombok</artifactId>
  48 + <version>1.18.10</version>
  49 + <scope>compile</scope>
  50 + </dependency>
38 </dependencies> 51 </dependencies>
39 52
40 <build> 53 <build>
  1 +package com.tianbo.util.RabitMq;
  2 +
  3 +import com.rabbitmq.client.Connection;
  4 +import com.rabbitmq.client.ConnectionFactory;
  5 +
  6 +/**
  7 + * 建立mq链接
  8 + */
  9 +public class ConnectionUtil {
  10 + /**
  11 + * 链接MQ
  12 + * @param hostIp mq服务器Ip地址
  13 + * @param hostPort mq服务器端口号
  14 + * @param vHostName VirtualHost名称
  15 + * @param userName 登录账号
  16 + * @param password 登录密码
  17 + * @return 返回链接
  18 + * @throws Exception
  19 + */
  20 + public static Connection getConnection(String hostIp,int hostPort,String vHostName,String userName,String password) throws Exception {
  21 + //定义连接工厂
  22 + ConnectionFactory factory = new ConnectionFactory();
  23 + //设置服务地址
  24 + factory.setHost(hostIp);
  25 + //端口
  26 + factory.setPort(hostPort);
  27 + //设置账号信息,用户名、密码、vhost
  28 + factory.setVirtualHost(vHostName);
  29 + factory.setUsername(userName);
  30 + factory.setPassword(password);
  31 + // 通过工程获取连接
  32 + Connection connection = factory.newConnection();
  33 + return connection;
  34 + }
  35 +}
  1 +package com.tianbo.util.RabitMq;
  2 +
  3 +import com.rabbitmq.client.AMQP;
  4 +import com.rabbitmq.client.Channel;
  5 +import com.rabbitmq.client.Connection;
  6 +import java.nio.charset.StandardCharsets;
  7 +
  8 +/**
  9 + * 发送 [内容] 到队列
  10 + */
  11 +public class MQSendMsg {
  12 +
  13 + /**
  14 + *
  15 + * @param queueName 队列名称
  16 + * @param msg 发送内容
  17 + * @return
  18 + */
  19 + public static boolean sendMsg(String queueName,String msg,String hostIp,int hostPort,String vHostName,String userName,String password){
  20 + try{
  21 + // 获取到连接以及mq通道
  22 + Connection connection = ConnectionUtil.getConnection(hostIp,hostPort,vHostName,userName,password);
  23 + // 从连接中创建通道
  24 + Channel channel = connection.createChannel();
  25 +
  26 +// String exchangeName = "amq.topic";
  27 +// String routingKey = "consumer.send";
  28 + // 声明(创建)队列
  29 + channel.queueDeclare(queueName, true, false, false, null);
  30 +
  31 + // 消息内容
  32 + channel.basicPublish("", queueName, null, msg.getBytes(StandardCharsets.UTF_8));
  33 + System.out.println("消息发送成功>>>" + msg + "<<<");
  34 + //关闭通道和连接
  35 + channel.close();
  36 + connection.close();
  37 + }catch (Exception e){
  38 + e.printStackTrace();
  39 + return false;
  40 + }
  41 + return true;
  42 + }
  43 +}
  1 +package com.tianbo.util.RabitMq;
  2 +
  3 +import com.rabbitmq.client.*;
  4 +
  5 +import java.io.IOException;
  6 +
  7 +public class MqGetMsg extends DefaultConsumer{
  8 +
  9 + public MqGetMsg(Channel channel) {
  10 + super(channel);
  11 + }
  12 +
  13 + @Override
  14 + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  15 + //TODO someting
  16 + System.err.println("-----------consume message----------");
  17 + System.err.println("consumerTag: " + consumerTag);
  18 + System.err.println("envelope: " + envelope);
  19 + System.err.println("properties: " + properties);
  20 + System.err.println("body: " + new String(body));
  21 +
  22 + }
  23 +}
  1 +package com.tianbo.util.RabitMq;
  2 +
  3 +import com.rabbitmq.client.AMQP;
  4 +import com.rabbitmq.client.Envelope;
  5 +
  6 +
  7 +public class MqResponse {
  8 + private String consumerTag;
  9 + private Envelope envelope;
  10 + private AMQP.BasicProperties properties;
  11 + private String content;
  12 +
  13 + public MqResponse(){
  14 +
  15 + }
  16 + public MqResponse(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, String content) {
  17 + this.consumerTag = consumerTag;
  18 + this.envelope = envelope;
  19 + this.properties = properties;
  20 + this.content = content;
  21 + }
  22 +
  23 + public String getConsumerTag() {
  24 + return consumerTag;
  25 + }
  26 +
  27 + public void setConsumerTag(String consumerTag) {
  28 + this.consumerTag = consumerTag;
  29 + }
  30 +
  31 + public Envelope getEnvelope() {
  32 + return envelope;
  33 + }
  34 +
  35 + public void setEnvelope(Envelope envelope) {
  36 + this.envelope = envelope;
  37 + }
  38 +
  39 + public AMQP.BasicProperties getProperties() {
  40 + return properties;
  41 + }
  42 +
  43 + public void setProperties(AMQP.BasicProperties properties) {
  44 + this.properties = properties;
  45 + }
  46 +
  47 + public String getContent() {
  48 + return content;
  49 + }
  50 +
  51 + public void setContent(String content) {
  52 + this.content = content;
  53 + }
  54 +}