Hello everyone, I am Zhibeijun. Today, I will lead you to continue learning RabbitMQ and understand the Work model, one of the five communication models of RabbitMQ. There will be a series of tutorials on RabbitMQ in the future. If it helps you, remember to pay attention~ review In the previous article, RabbitMQ was briefly introduced, as well as its installation and hello world. Some friends left a message saying that they couldn’t understand the method parameters. Here are some basic method parameters explained. // Declare queue method channel .queueDeclare ( QUEUE_NAME , false , false , false , null ) ; /** * param1: queue name * param2: durable Whether the message is persistent; for example, if the message sent to the queue is not persistent, the data will be lost after restarting the queue (false) true: the data will still be there after restarting * param3: exclusive, whether it is exclusive (whether it is the exclusive queue of the current connection), exclusive means: * 1: Whether this queue is automatically deleted after the connection is closed (false: not automatically deleted) * 2: Whether to allow other channels to access this data (false: not allowed) * param4: autoDelete whether to delete automatically * Whether to automatically delete the queue when the last connection is disconnected (false: not deleted) * param5: arguments(map) Some parameters when declaring a queue */ // Send data to the queue channel .basicPublish ( "" , QUEUE_NAME , MessageProperties .PERSISTENT_TEXT_PLAIN , "First queue message..." .getBytes ( ) ) ; /** * param1: If there is no exchange switch, just set it to "" * param2: routingKey routing key No key is set now, just use the queue name * param3: BasicProperties: whether to include some parameters when sending data to the queue. * MessageProperties.PERSISTENT_TEXT_PLAIN means no parameters are included * param4: body message data sent to the queue */ Work Model The work model is called a work queue or a competing consumer model. The sum of data consumed by multiple consumers is all the data in the original queue, which is suitable for traffic peak shaving. Demo Let's write a simple test: Producer public class Producer { private static final String QUEUE_NAME = "queue_work_1" ;
public static void main ( String [ ] args ) throws IOException , TimeoutException { Connection connection = ConnectionUtils .getConnection ( ) ; Channel channel = connection .createChannel ( ) ; channel .queueDeclare ( QUEUE_NAME , false , false , false , null ) ; for ( int i = 0 ; i < 100 ; i ++ ) { channel .basicPublish ( "" , QUEUE_NAME , null , ( "work model: " + i ) .getBytes ( ) ) ; } channel .close ( ) ; connection .close ( ) ; }
} consumer // Consumer 1 public class Consumer { private static final String QUEUE_NAME = "queue_work_1" ;
public static void main ( String [ ] args ) throws IOException , TimeoutException { Connection connection = ConnectionUtils .getConnection ( ) ; Channel channel = connection .createChannel ( ) ; channel .queueDeclare ( QUEUE_NAME , false , false , false , null ) ; // channel .basicQos ( 0 , 1 , false ) ; DefaultConsumer defaultConsumer = new DefaultConsumer ( channel ) { @Override public void handleDelivery ( String consumerTag , Envelope envelope , AMQP .BasicProperties properties , byte [ ] body ) throws IOException { System .out .println ( System .currentTimeMillis ( ) + "Consumer 1 received the message: " + new String ( body ) ) ; channel .basicAck ( envelope .getDeliveryTag ( ) , false ) ; } } ; channel .basicConsume ( QUEUE_NAME , false , defaultConsumer ) ; }
} // Consumer 2 public class Consumer2 { private static final String QUEUE_NAME = "queue_work_1" ;
public static void main ( String [ ] args ) throws IOException , TimeoutException { Connection connection = ConnectionUtils .getConnection ( ) ; Channel channel = connection .createChannel ( ) ; channel .queueDeclare ( QUEUE_NAME , false , false , false , null ) ; // channel .basicQos ( 0 , 1 , false ) ; DefaultConsumer defaultConsumer = new DefaultConsumer ( channel ) { @Override public void handleDelivery ( String consumerTag , Envelope envelope , AMQP .BasicProperties properties , byte [ ] body ) throws IOException { System .out .println ( System .currentTimeMillis ( ) + "Consumer 2 received the message: " + new String ( body ) ) ; channel .basicAck ( envelope .getDeliveryTag ( ) , false ) ; // A delay is added here to indicate the processing time of the business try { Thread.sleep ( 200 ) ; } catch ( InterruptedException e ) { e .printStackTrace ( ) ; } } } ; channel .basicConsume ( QUEUE_NAME , false , defaultConsumer ) ; } } result
It can be seen that 100 messages are equally divided among consumers. Consumer 1 is completed almost instantly, while Consumer 2 is slowly completed. Consumer 1 is idle for a long time, while Consumer 2 is always busy. This is obviously not suitable for actual development. We need to follow a principle, that is, those who are capable should work more. Those who consume faster will consume more. Now we uncomment the line // channel.basicQos(0, 1, false); in the code of consumers 1 and 2 and run again; The current result is more in line with the principle that those who are capable should do more work. Although you work more, your salary is the same. One of the main methods of the work model is basicQos(); its parameters are also explained here: // Set the current limiting mechanism channel .basicQos ( 0 , 1 , false ) ; /** * param1: prefetchSize, if the size of the message itself is set to 0, it means that there is no limit on the size of the message itself * param2: prefetchCount, tells rabbitmq not to push more than N messages to consumers at one time * param3: global, whether to apply the above settings to the entire channel, false means only apply to the current consumer */ summary This article ends here. It mainly introduces the work model in the RabbitMQ communication model, which is suitable for application scenarios such as current limiting and peak shaving. |