rabbitmq docker,10-RabbitMQ-整合SpringBoot

 2023-10-25 阅读 27 评论 0

摘要:? RabbitMQ整個SpringBoot SpringBoot因其配置簡單、快速開發,已經成為熱門的開發之一 rabbitmq docker、? 消息中間件的工作過程可以用生產者消費者模型來表示.即,生產者不斷的向消息隊列發送信息 而消費者從消息隊列中消費信息.具體過程如下: rabbitmq部署。 ? 從上圖

?

RabbitMQ整個SpringBoot

SpringBoot因其配置簡單、快速開發,已經成為熱門的開發之一

rabbitmq docker、?

消息中間件的工作過程可以用生產者消費者模型來表示.即,生產者不斷的向消息隊列發送信息

而消費者從消息隊列中消費信息.具體過程如下:

rabbitmq部署。

?

從上圖可看出,對于消息隊列來說,生產者,消息隊列,消費者是最重要的三個概念

生產者發消息到消息隊列中去,消費者監聽指定的消息隊列,并且當消息隊列收到消息之后,

接收消息隊列傳來的消息,并且給予相應的處理.消息隊列常用于分布式系統之間互相信息的傳遞.

?

?

?

使用SpringBoot進行整合RabbitMQ

1.pom文件的引入

這是操作RabbitMQ的starter必須要進行引入的

     <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

?

2.配置文件進行基礎的配置

spring.rabbitmq.virtual-host=/user
spring.rabbitmq.port=5672
spring.rabbitmq.password=user
spring.rabbitmq.username=user
spring.rabbitmq.host=192.168.43.157

?

?

RabbitMQ的模式

1、direct模式

配置Queue(消息隊列).那注意由于采用的是Direct模式,需要在配置Queue的時候,指定一個鍵

使其和交換機綁定.

DirectQueue.java
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectQueue {//若隊列不存在則進行創建隊列
   //返回的是隊列名字@Bean
public Queue queue(){return new Queue("direct_queue");} }

?

消息生產者

Sender.java 

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class Sender {@Autowiredprivate AmqpTemplate amqpTemplate;public void send(){String msg = "direct_queue";User user = new User();user.setName("MrChegns");user.setAge(12);amqpTemplate.convertAndSend("direct_queue",user);}}

?

此時發送的消息是一個User類型的對象

對于發送對象需要實現序列化接口

User.java 
package com.cr.rabbitmqs.direct;
import java.io.Serializable;
public class User implements Serializable {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}public User(String name, int age) {this.name = name;this.age = age;}public User() {}@Overridepublic String toString() {return "User{" +"name='" + name + '\'' +", age=" + age +'}';}
}

?

消費者

Receive.java 

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Receive {//對隊列進行監聽
   //同時可以監聽多個隊列 @RabbitListener(queues
= "direct_queue")public void listen(User msg){System.out.println(msg);} }

?

測試:

 @Autowiredprivate  Sender sender;@Testpublic void test1(){sender.send();}

?

得到的結果i:

?

?

?

?2、topic模式

?首先我們看發送端,我們需要配置隊列Queue,再配置交換機(Exchange)

再把隊列按照相應的規則綁定到交換機上

Topic.java
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class Topic {//創建隊列@Bean(name = "message")public Queue Aqueue(){return  new Queue("message.topic");}@Bean(name = "message1")public Queue BQueue(){return  new Queue("message.topics");}//交換機//若不存在則進行創建交換機
    @Beanpublic TopicExchange exchange(){return new TopicExchange("topic_exchange");}//交換機和隊列進行綁定@BeanBinding bindingExchangeTopic(@Qualifier("message")Queue message,TopicExchange exchange){return BindingBuilder.bind(message).to(exchange).with("message.topic");}@BeanBinding bindingExchangeTopics(@Qualifier("message1")Queue message,TopicExchange exchange){return BindingBuilder.bind(message).to(exchange).with("message.#");}
}

?

?消費者

Receive1.java 
import com.cr.rabbitmqs.direct.User;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Receive1 {
@RabbitListener(queues = "message.topic")public void tes(User user){System.out.println( "user1111:" + user);}
}

?

Receive2.java 
import com.cr.rabbitmqs.direct.User;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Receive2 {  @RabbitListener(queues = "message.topics")public void tes(User user){System.out.println("user222:" + user);}
}

?

消息生產者:

TopicSend.java 
import com.cr.rabbitmqs.direct.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class TopicSend {@Autowiredprivate AmqpTemplate amqpTemplate;//發送消息public void send(){User user = new User("name",12);amqpTemplate.convertSendAndReceive("topic_exchange","message.dev",user);}//發送消息public void send1(){
   User user = new User("name",12);
   amqpTemplate.convertSendAndReceive("topic_exchange","message.topic",user ); 
} 
}

?

在開發中這種模式的使用還是相對比較多的,此時測試的是兩種方法

一個方法所有的隊列都可以進行獲取

一個方法只有一個隊列可以獲取到消息

?

?測試:

    @Autowiredprivate TopicSend topicSend;@Testpublic  void ttt(){topicSend.send();}

?

?測試:

    @Autowiredprivate TopicSend topicSend;@Testpublic  void ttt(){topicSend.send1();}

?

后臺查看交換機和隊列的綁定關系以機相關的路由鍵

?

?

?3、fanout

?那前面已經介紹過了,Fanout Exchange形式又叫廣播形式,因此我們發送到路由器的消息會使

得綁定到該路由器的每一個Queue接收到消息,這個時候就算指定了Key,或者規則(即上文中

convertAndSend方法的參數2),也會被忽略!那么直接上代碼,發送端配置如下:

Fanout.java

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class Fanout {//隊列//如果隊列不存在會自動創建隊列@Beanpublic Queue queueA(){return new Queue("queueA");}
@Beanpublic Queue queueB(){return new Queue("queueB");}@Beanpublic Queue queueC(){return new Queue("queueC");}//交換機//如果交換機不存在會自動創建隊列
    @Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanoutExchange");}//將交換機和隊列進行綁定
    @BeanBinding bindingExchangequeueA(Queue queueA,FanoutExchange fanoutExchange){return BindingBuilder.bind(queueA).to(fanoutExchange);}@BeanBinding bindingExchangequeueB(Queue queueB,FanoutExchange fanoutExchange){return BindingBuilder.bind(queueB).to(fanoutExchange);}@BeanBinding bindingExchangequeueC(Queue queueC,FanoutExchange fanoutExchange){return BindingBuilder.bind(queueC).to(fanoutExchange);}
}

?

?消費者:

FanoutReceive.java

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
//監聽器
@RabbitListener(queues = "queueA")
public class FanoutReceive {//監聽的方法@RabbitHandlerpublic void listen(String  msg){System.out.println("queueA" + msg);}}

?

?

FanoutSender.java?

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class FanoutSender {    @Autowiredprivate AmqpTemplate amqpTemplate;//發送消息public void send(){String  msg = "test fanout....";//發送消息:參數依次是  交換機名字--路由鍵(此時設置路由鍵沒有作用)--消息amqpTemplate.convertAndSend("fanoutExchange","",msg);}
}

?

測試:

@RunWith(SpringRunner.class)
@SpringBootTest
public class BpptandrabbitmqApplicationTests {//測試fanout
    @Autowiredprivate FanoutSender fanoutSender;@Testpublic void fanout() {fanoutSender.send();}}

?

?此時3個隊列都能接收到消息

?

交換機、隊列以及路由鍵

?

?

轉載于:https://www.cnblogs.com/Mrchengs/p/10539003.html

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/4/164079.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息