RabbitMQ.java
3.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package com.tianbo.analysis.model;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.tianbo.analysis.handle.RabbitGetMessage;
import com.tianbo.util.RabitMq.ConnectionUtil;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@Slf4j
public class RabbitMQ {
public static String status = "not runing";
private String queueName;
private String exchangeName;
private String routingKey;
private String mqIp;
private int mqPort;
private String mqVhost;
private String mqUsername;
private String mqPassword;
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public String getMqIp() {
return mqIp;
}
public void setMqIp(String mqIp) {
this.mqIp = mqIp;
}
public int getMqPort() {
return mqPort;
}
public void setMqPort(int mqPort) {
this.mqPort = mqPort;
}
public String getMqVhost() {
return mqVhost;
}
public void setMqVhost(String mqVhost) {
this.mqVhost = mqVhost;
}
public String getMqUsername() {
return mqUsername;
}
public void setMqUsername(String mqUsername) {
this.mqUsername = mqUsername;
}
public String getMqPassword() {
return mqPassword;
}
public void setMqPassword(String mqPassword) {
this.mqPassword = mqPassword;
}
public RabbitMQ(String mqIp, int mqPort, String mqVhost, String mqUsername, String mqPassword, String queueName) {
this.mqIp = mqIp;
this.mqPort = mqPort;
this.mqVhost = mqVhost;
this.mqUsername = mqUsername;
this.mqPassword = mqPassword;
this.queueName = queueName;
}
public void getResponseFromMq() {
try {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection(
mqIp,
mqPort,
mqVhost,
mqUsername,
mqPassword);
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
// channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true);
// channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName,exchangeName,routingKey);
channel.basicQos(15);
channel.basicConsume(queueName, false, new RabbitGetMessage(channel));
status = "runing";
}catch (IOException e){
status = "not runing";
e.printStackTrace();
log.error("MQ服务器连接失败,连接不上服务器:{},异常退出",e.toString());
}catch (TimeoutException e){
status = "not runing";
e.printStackTrace();
log.error("MQ服务器连接失败,连接超时:{},异常退出",e.toString());
}catch (Exception e){
e.printStackTrace();
status = "not runing";
}
}
}