RabbitMQ communication model publish-subscribe model

RabbitMQ communication model publish-subscribe model

Hello everyone, I am Zhibeijun.

Today, I will lead you to continue learning RabbitMQ and understand the publish-subscribe model, one of the five communication models of RabbitMQ. There will be a series of tutorials on RabbitMQ in the future. If it helps you, remember to pay attention~

Publish-Subscribe Model

In the previous article, we briefly introduced the working model of RabbitMQ. In this article, we will learn about the publish-subscribe model in RabbitMQ.

Publish/Subscribe model: Simply put, the messages in the queue will be received by multiple consumers at the same time, and the information received by the consumers is consistent.

The publish-subscribe model is suitable for asynchronous communication between modules.

Applicable scenarios

  1. Send and record log information
  2. Notification configuration automatic update in springcloud's config component
  3. Cache synchronization
  4. WeChat subscription account

Demo

Producer

 public class Producer {
private static final String EXCHANGE_NAME = "exchange_publish_1" ;

public static void main ( String [ ] args ) throws IOException , TimeoutException {
Connection connection = ConnectionUtils .getConnection ( ) ;
Channel channel = connection .createChannel ( ) ;
// Declare the switch
channel .exchangeDeclare ( EXCHANGE_NAME , "fanout" ) ;
// Send a message to the switch
for ( int i = 0 ; i < 100 ; i ++ ) {
channel .basicPublish ( EXCHANGE_NAME , "" , null , ( "The " + i + "th message of the publish-subscribe model " ) .getBytes ( ) ) ;
}
// Close the resource
channel .close ( ) ;
connection .close ( ) ;
}
}

consumer

 // Consumer 1
public class Consumer {
private static final String QUEUE_NAME = "queue_publish_1" ;
private static final String EXCHANGE_NAME = "exchange_publish_1" ;

public static void main ( String [ ] args ) throws IOException , TimeoutException {
Connection connection = ConnectionUtils .getConnection ( ) ;
Channel channel = connection .createChannel ( ) ;
// Declare the queue
channel .queueDeclare ( QUEUE_NAME , false , false , false , null ) ;
// Declare the switch
channel .exchangeDeclare ( EXCHANGE_NAME , "fanout" ) ;
// Bind the queue to the switch
channel .queueBind ( QUEUE_NAME , EXCHANGE_NAME , "" ) ;
DefaultConsumer defaultConsumer = new DefaultConsumer ( channel ) {
@Override
public void handleDelivery ( String consumerTag , Envelope envelope , AMQP .BasicProperties properties , byte [ ] body ) throws IOException {
System .out .println ( "The message received by queue 1 is: " + new String ( body ) ) ;
}
} ;
channel .basicConsume ( QUEUE_NAME , true , defaultConsumer ) ;
}
}
 // Consumer 2
public class Consumer2 {
private static final String QUEUE_NAME = "queue_publish_2" ;
private static final String EXCHANGE_NAME = "exchange_publish_1" ;

public static void main ( String [ ] args ) throws IOException , TimeoutException {
Connection connection = ConnectionUtils .getConnection ( ) ;
Channel channel = connection .createChannel ( ) ;
// Declare the queue
channel .queueDeclare ( QUEUE_NAME , false , false , false , null ) ;
// Declare the switch
channel .exchangeDeclare ( EXCHANGE_NAME , "fanout" ) ;
// Bind the queue to the switch
channel .queueBind ( QUEUE_NAME , EXCHANGE_NAME , "" ) ;
DefaultConsumer defaultConsumer = new DefaultConsumer ( channel ) {
@Override
public void handleDelivery ( String consumerTag , Envelope envelope , AMQP .BasicProperties properties , byte [ ] body ) throws IOException {
System .out .println ( "The message received by queue 2 is: " + new String ( body ) ) ;
}
} ;
channel .basicConsume ( QUEUE_NAME , true , defaultConsumer ) ;
}
}

test

Start 2 consumers first, then start the producer

It can be seen that the messages received by Consumer 1 and Consumer 2 are exactly the same, and each consumer has received the message sent by the producer;

The publish-subscribe model uses a new thing - the switch. Here is an explanation of the parameters of the relevant methods:

 // Declare the switch
channel .exchangeDeclare ( EXCHANGE_NAME , "fanout" ) ;

// The overloaded method with the most parameters is:
Exchange .DeclareOk exchangeDeclare ( String exchange ,
BuiltinExchangeType type ,
boolean durable ,
boolean autoDelete ,
boolean internal ,
Map < String , Object > arguments ) throws IOException ;

/**
* param1: exchange, switch name
* param2: type, switch type; directly writing a string will have the same effect; there are 4 built-in switch types:
* direct (routing mode), fanout (publish-subscribe mode),
* topic (topic mode - fuzzy matching), headers (header exchange, assigned by Headers parameters, not commonly used)
* param3: durable, whether to persist the switch false: default value, not persistent
* param4: autoDelete, whether to automatically delete the switch when no consumer uses it false: default value, do not delete
* param5: internal, whether it is built-in, if set to true, it means it is a built-in switch, the client program cannot send messages directly to this switch, it can only be routed to the switch through the switch false: default value, allowing external direct access
* param6: arguments, some other properties of the switch, the default value is null
*/
 // Bind the queue to the switch
channel .queueBind ( QUEUE_NAME , EXCHANGE_NAME , "" ) ;
/**
* param1: destination, destination, queue name
* param2: source, resource, switch name
* param3: routingKey, routing key (routingKey is not used currently, just fill in "")
*/

summary

This article ends here, introducing the publish-subscribe model in the RabbitMQ communication model, which is suitable for asynchronous communication between modules.

<<:  Why Private LTE is a Smarter Choice than 5G

>>:  Twenty trends that will impact the information and communications industry in 2023

Recommend

Is LoRaWAN the solution to cellular IoT challenges?

Ten years ago, there were high hopes for cellular...

IKIHOST: $4/month-4GB/40G NVMe/1Gbps unlimited traffic/Los Angeles data center

The tribe shared information about IKIHOST last y...

Millimeter wave is imperative to unleash the full potential of 5G!

As my country's 5G network construction scale...

Hizakura: €17.99/year-AMD Ryzen7950x/1GB/15GB/4TB/Netherlands data center

The tribe once shared information about Hizakura,...

Global Power over Ethernet Lighting Solutions

The world of smart buildings is undergoing a majo...

Smart Operation and Maintenance of Large Data Centers is Important

From these data, we can see that how to ensure th...