RabbitMQ communication model work model

RabbitMQ communication model work model

Hello everyone, I am Zhibeijun.

Today, I will lead you to continue learning RabbitMQ and understand the Work model, one of the five communication models of RabbitMQ. There will be a series of tutorials on RabbitMQ in the future. If it helps you, remember to pay attention~

review

In the previous article, RabbitMQ was briefly introduced, as well as its installation and hello world.

Some friends left a message saying that they couldn’t understand the method parameters. Here are some basic method parameters explained.

 // Declare queue method
channel .queueDeclare ( QUEUE_NAME , false , false , false , null ) ;
/**
* param1: queue name
* param2: durable Whether the message is persistent; for example, if the message sent to the queue is not persistent, the data will be lost after restarting the queue (false) true: the data will still be there after restarting
* param3: exclusive, whether it is exclusive (whether it is the exclusive queue of the current connection), exclusive means:
* 1: Whether this queue is automatically deleted after the connection is closed (false: not automatically deleted)
* 2: Whether to allow other channels to access this data (false: not allowed)
* param4: autoDelete whether to delete automatically
* Whether to automatically delete the queue when the last connection is disconnected (false: not deleted)
* param5: arguments(map) Some parameters when declaring a queue
*/
 // Send data to the queue
channel .basicPublish ( "" , QUEUE_NAME , MessageProperties .PERSISTENT_TEXT_PLAIN , "First queue message..." .getBytes ( ) ) ;
/**
* param1: If there is no exchange switch, just set it to ""
* param2: routingKey routing key No key is set now, just use the queue name
* param3: BasicProperties: whether to include some parameters when sending data to the queue.
* MessageProperties.PERSISTENT_TEXT_PLAIN means no parameters are included
* param4: body message data sent to the queue
*/

Work Model

The work model is called a work queue or a competing consumer model. The sum of data consumed by multiple consumers is all the data in the original queue, which is suitable for traffic peak shaving.

Demo

Let's write a simple test:

Producer

 public class Producer {
private static final String QUEUE_NAME = "queue_work_1" ;

public static void main ( String [ ] args ) throws IOException , TimeoutException {
Connection connection = ConnectionUtils .getConnection ( ) ;
Channel channel = connection .createChannel ( ) ;
channel .queueDeclare ( QUEUE_NAME , false , false , false , null ) ;
for ( int i = 0 ; i < 100 ; i ++ ) {
channel .basicPublish ( "" , QUEUE_NAME , null , ( "work model: " + i ) .getBytes ( ) ) ;
}
channel .close ( ) ;
connection .close ( ) ;
}

}

consumer

 // Consumer 1
public class Consumer {
private static final String QUEUE_NAME = "queue_work_1" ;

public static void main ( String [ ] args ) throws IOException , TimeoutException {
Connection connection = ConnectionUtils .getConnection ( ) ;
Channel channel = connection .createChannel ( ) ;
channel .queueDeclare ( QUEUE_NAME , false , false , false , null ) ;
// channel .basicQos ( 0 , 1 , false ) ;
DefaultConsumer defaultConsumer = new DefaultConsumer ( channel ) {
@Override
public void handleDelivery ( String consumerTag , Envelope envelope , AMQP .BasicProperties properties , byte [ ] body ) throws IOException {
System .out .println ( System .currentTimeMillis ( ) + "Consumer 1 received the message: " + new String ( body ) ) ;
channel .basicAck ( envelope .getDeliveryTag ( ) , false ) ;
}
} ;
channel .basicConsume ( QUEUE_NAME , false , defaultConsumer ) ;
}

}
 // Consumer 2
public class Consumer2 {
private static final String QUEUE_NAME = "queue_work_1" ;

public static void main ( String [ ] args ) throws IOException , TimeoutException {
Connection connection = ConnectionUtils .getConnection ( ) ;
Channel channel = connection .createChannel ( ) ;
channel .queueDeclare ( QUEUE_NAME , false , false , false , null ) ;
// channel .basicQos ( 0 , 1 , false ) ;
DefaultConsumer defaultConsumer = new DefaultConsumer ( channel ) {
@Override
public void handleDelivery ( String consumerTag , Envelope envelope , AMQP .BasicProperties properties , byte [ ] body ) throws IOException {
System .out .println ( System .currentTimeMillis ( ) + "Consumer 2 received the message: " + new String ( body ) ) ;
channel .basicAck ( envelope .getDeliveryTag ( ) , false ) ;
// A delay is added here to indicate the processing time of the business
try {
Thread.sleep ( 200 ) ;
} catch ( InterruptedException e ) {
e .printStackTrace ( ) ;
}
}
} ;
channel .basicConsume ( QUEUE_NAME , false , defaultConsumer ) ;
}
}

result


It can be seen that 100 messages are equally divided among consumers. Consumer 1 is completed almost instantly, while Consumer 2 is slowly completed. Consumer 1 is idle for a long time, while Consumer 2 is always busy. This is obviously not suitable for actual development.

We need to follow a principle, that is, those who are capable should work more. Those who consume faster will consume more.

Now we uncomment the line // channel.basicQos(0, 1, false); in the code of consumers 1 and 2 and run again;

The current result is more in line with the principle that those who are capable should do more work. Although you work more, your salary is the same.

One of the main methods of the work model is basicQos(); its parameters are also explained here:

 // Set the current limiting mechanism
channel .basicQos ( 0 , 1 , false ) ;
/**
* param1: prefetchSize, if the size of the message itself is set to 0, it means that there is no limit on the size of the message itself
* param2: prefetchCount, tells rabbitmq not to push more than N messages to consumers at one time
* param3: global, whether to apply the above settings to the entire channel, false means only apply to the current consumer
*/

summary

This article ends here. It mainly introduces the work model in the RabbitMQ communication model, which is suitable for application scenarios such as current limiting and peak shaving.

<<:  Kubesphere deploys Kubernetes using external IP

>>:  Website built by Alibaba Cloud, fool-proof way to prohibit foreign IP access

Recommend

...

The data center dilemma: Is data destroying the environment?

Nowadays, many people ask the question, is data d...

ABI Research: LoRa will lead non-cellular LPWA growth in the next five years

LoRa will lead the growth of non-cellular low-pow...

5G becomes a strong driving force for edge computing

Edge computing is one of the most exciting new co...

Enterprise Network Data Communication Solution Practice - EIGRP

Practical objectives: Through practical applicati...

5 web trends you need to know about in 2021

On December 14, 2020, a massive network outage ca...

Gartner: Enterprises rethink software security strategies

Businesses are rethinking risk management and sof...

Five ways to establish effective communication in remote teams

The term "remote" itself has a connotat...

3 Types of Wireless Network Site Surveys and How to Perform Them

Designing and maintaining a network is complex, b...

Regarding "computing power", this article is worth reading

In today’s article, let’s talk about computing po...

Blockchain, IoT and 5G

5G networks are starting to roll out across the U...

Detailed explanation of Tomcat HTTP protocol and AJP protocol

The main function of Tomcat is to provide a Servl...