作者 王勇

添加个人rabbitmq,尝试封装

@@ -16,11 +16,15 @@ spring: @@ -16,11 +16,15 @@ spring:
16 16
17 # rabbitmq配置 17 # rabbitmq配置
18 rabbitmq: 18 rabbitmq:
19 - host: 192.168.1.63 19 + host: 192.168.37.137
20 port: 5672 20 port: 5672
21 - username: mrz  
22 - password: vmvnv1v2  
23 - 21 + username: rabbit
  22 + password: 123456
  23 + virtual-host: V_zicheng
  24 +# host: 192.168.1.63
  25 +# port: 5672
  26 +# username: mrz
  27 +# password: vmvnv1v2
24 # 多环境配置 28 # 多环境配置
25 profiles: 29 profiles:
26 active: dev 30 active: dev
  1 +package com.sunyo.wlpt.message.bus.service.controller;
  2 +
  3 +import org.springframework.web.bind.annotation.CrossOrigin;
  4 +import org.springframework.web.bind.annotation.RequestMapping;
  5 +import org.springframework.web.bind.annotation.RestController;
  6 +
  7 +/**
  8 + * @author 子诚
  9 + * Description:
  10 + * 时间:2020/7/16 14:46
  11 + */
  12 +@CrossOrigin
  13 +@RequestMapping("bus/rabbit")
  14 +@RestController
  15 +public class RabbitController {
  16 +
  17 +
  18 +}
  1 +package com.sunyo.wlpt.message.bus.service.rabbit;
  2 +
  3 +import org.springframework.amqp.core.Binding;
  4 +import org.springframework.amqp.core.BindingBuilder;
  5 +import org.springframework.amqp.core.DirectExchange;
  6 +import org.springframework.amqp.core.Queue;
  7 +import org.springframework.context.annotation.Bean;
  8 +import org.springframework.context.annotation.Configuration;
  9 +
  10 +/**
  11 + * @author 子诚
  12 + * Description:交换机类型是direct(直连)的rabbit的配置文件
  13 + * 时间:2020/7/16 16:20
  14 + */
  15 +@Configuration
  16 +public class DirectExchangeRabbitConfig {
  17 + // 定义直连交换机
  18 + public static final String DIRECT_EXCHANGE_NAME = "E_direct";
  19 +
  20 + private static final String queue4BindingKey1 = "big";
  21 + private static final String queue4BindingKey2 = "small";
  22 + private static final String queue5BindingKey = "cat";
  23 +
  24 + // 声明直连交换机
  25 + @Bean
  26 + public DirectExchange directExchange() {
  27 + return new DirectExchange(DIRECT_EXCHANGE_NAME);
  28 + }
  29 +
  30 + // 声明消息队列
  31 + @Bean
  32 + public Queue messageQueue4() {
  33 + return new Queue("queue4");
  34 +
  35 + }
  36 +
  37 + @Bean
  38 + public Queue messageQueue5() {
  39 + return new Queue("queue5");
  40 + }
  41 +
  42 + // 向直连交换机上绑定队列
  43 + @Bean
  44 + Binding bindingQueue4Exchange1(Queue messageQueue4, DirectExchange directExchange) {
  45 + return BindingBuilder.bind( messageQueue4 )
  46 + .to( directExchange )
  47 + .with( queue4BindingKey1 );
  48 + }
  49 +
  50 + @Bean
  51 + Binding bindingQueue4Exchange2(Queue messageQueue4, DirectExchange directExchange) {
  52 + return BindingBuilder.bind( messageQueue4 )
  53 + .to( directExchange )
  54 + .with( queue4BindingKey2 );
  55 + }
  56 +
  57 + @Bean
  58 + Binding bindingQueue5Exchange(Queue messageQueue5, DirectExchange directExchange) {
  59 + return BindingBuilder.bind( messageQueue5 )
  60 + .to( directExchange )
  61 + .with( queue5BindingKey );
  62 + }
  63 +}
  1 +package com.sunyo.wlpt.message.bus.service.rabbit;
  2 +
  3 +import com.sunyo.wlpt.message.bus.service.domain.BusExchange;
  4 +import com.sunyo.wlpt.message.bus.service.domain.BusQueue;
  5 +import com.sunyo.wlpt.message.bus.service.domain.UserMessageBinding;
  6 +import lombok.extern.slf4j.Slf4j;
  7 +import org.springframework.amqp.core.*;
  8 +import org.springframework.amqp.rabbit.core.RabbitTemplate;
  9 +import org.springframework.beans.factory.annotation.Value;
  10 +import org.springframework.cloud.context.config.annotation.RefreshScope;
  11 +import org.springframework.stereotype.Component;
  12 +
  13 +import javax.annotation.Resource;
  14 +
  15 +/**
  16 + * @author 子诚
  17 + * Description:
  18 + * 时间:2020/7/16 16:32
  19 + */
  20 +@Slf4j
  21 +@RefreshScope
  22 +@Component
  23 +public class DirectRabbitUtils {
  24 + @Resource
  25 + private AmqpAdmin amqpAdmin;
  26 +
  27 + @Resource
  28 + private RabbitTemplate rabbitTemplate;
  29 +
  30 + @Value("${spring.rabbitmq.virtual-host}")
  31 + private String v_host;
  32 +
  33 + /**
  34 + * 创建交换机(交换机名称,是否持久化,是否删除)
  35 + *
  36 + * @param busExchange {@link BusExchange}
  37 + */
  38 + public void createExchange(BusExchange busExchange) {
  39 +
  40 + // 类型-直连路由
  41 + String type_direct = "direct";
  42 + // 类型-动态路由
  43 + String type_topic = "topic";
  44 + // 类型-广播
  45 + String type_fanout = "fanout";
  46 + // 类型-头部
  47 + String type_headers = "headers";
  48 +
  49 + // 创建交换机,直连接类型
  50 + if (type_direct.equals(busExchange.getExchangeType())) {
  51 + amqpAdmin.declareExchange(
  52 + new DirectExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
  53 + );
  54 + log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_direct);
  55 + }
  56 + // 创建交换机,扇形交换机
  57 + if (type_topic.equals(busExchange.getExchangeType())) {
  58 + amqpAdmin.declareExchange(
  59 + new TopicExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
  60 + );
  61 + log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_topic);
  62 + }
  63 + // 创建交换机,广播(主题)交换机
  64 + if (type_fanout.equals(busExchange.getExchangeType())) {
  65 + amqpAdmin.declareExchange(
  66 + new FanoutExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
  67 + );
  68 + log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_fanout);
  69 + }
  70 + // 创建交换机,首部交换机
  71 + if (type_headers.equals(busExchange.getExchangeType())) {
  72 + amqpAdmin.declareExchange(
  73 + new HeadersExchange(busExchange.getExchangeName(), busExchange.getDurability(), busExchange.getAutoDelete())
  74 + );
  75 + log.info("创建了交换机:{};类型:{};", busExchange.getExchangeType(), type_headers);
  76 + }
  77 + }
  78 +
  79 + /**
  80 + * 根据交换机名称,删除虚拟机
  81 + *
  82 + * @param exchangeName 交换机名称
  83 + */
  84 + public void deleteExchange(String exchangeName) {
  85 + boolean flag = amqpAdmin.deleteExchange(exchangeName);
  86 + }
  87 +
  88 + /**
  89 + * 创建队列
  90 + *
  91 + * @param busQueue {@link BusQueue}
  92 + */
  93 + public void createQueue(BusQueue busQueue) {
  94 + amqpAdmin.declareQueue(
  95 + new Queue(busQueue.getQueueName(), busQueue.getDurability(), false, busQueue.getAutoDelete())
  96 + );
  97 + }
  98 +
  99 + /**
  100 + * 删除队列,根据队列名称
  101 + *
  102 + * @param queueName 队列名称
  103 + */
  104 + public void deleteQueue(String queueName) {
  105 + boolean flag = amqpAdmin.deleteQueue(queueName);
  106 + }
  107 +
  108 + public void createBing(UserMessageBinding userMessageBinding) {
  109 + amqpAdmin.declareBinding(
  110 + new Binding(userMessageBinding.getQueueName(), Binding.DestinationType.QUEUE, userMessageBinding.getExchangeName(), userMessageBinding.getRoutingKeyName(), null)
  111 + );
  112 + }
  113 +}
1 -package com.sunyo.wlpt.message.bus.service.rabbit; 1 +package com.sunyo.wlpt.message.bus.service.utils;
2 2
3 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Channel;
4 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.Connection;