An article about NioEventLoopGroup source code analysis

An article about NioEventLoopGroup source code analysis

[[408806]]

This article is reprinted from the WeChat public account "Source Code Apprentice", the author is Huangfu Aoaojiao. Please contact the Source Code Apprentice public account to reprint this article.

NioEventLoopGroup initialization source code

1. The process of finding source code

As we mentioned earlier, we can almost regard NioEventLoopGroup as a thread pool, which will execute tasks one by one. There are two kinds of NioEventLoopGroup we commonly use, NioEventLoopGroup(int nThreads), NioEventLoopGroup(), one is to specify the number of threads, and the other is to specify the number of threads by default! Here we analyze it with the parameterless construction as the entry point!

  1. EventLoopGroup work = new NioEventLoopGroup();
  1. public NioEventLoopGroup() {
  2. this(0);
  3. }

When we use the default amount, it will pass a 0, so let's continue!

  1. public NioEventLoopGroup( int nThreads) {
  2. this(nThreads, (Executor) null );
  3. }

Note that the parameters passed here are: 0, null

  1. public NioEventLoopGroup( int nThreads, Executor executor) {
  2. //Each group maintains a SelectorProvider which is mainly used to obtain the selector selector
  3. this(nThreads, executor, SelectorProvider.provider());
  4. }

Here, an additional SelectorProvider.provider() is passed. This method is an API provided by JDK NIO and can mainly obtain NIO selectors or Channels, as shown in the following figure:

We return to the main thread and continue with:

  1. public NioEventLoopGroup(
  2. int nThreads, Executor executor, final SelectorProvider selectorProvider) {
  3. this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
  4. }

Here, a DefaultSelectStrategy selection strategy is passed, which will be explained in detail later when explaining NioEventLoop, so I won't elaborate on it!

  1. public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider,
  2. final SelectStrategyFactory selectStrategyFactory) {
  3. super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
  4. }

We will find that a rejection strategy RejectedExecutionHandlers.reject() is passed by default. What is this rejection strategy for?

  1. @Override
  2. public void rejected(Runnable task, SingleThreadEventExecutor executor) {
  3. throw new RejectedExecutionException();
  4. }

We have come to a conclusion that when certain conditions trigger this rejection strategy, it will throw a RejectedExecutionException. The specific time of triggering will be explained in detail later. Just remember it here!

Let's go back to the main line. Here we start calling the parent class. Do you remember who is the parent class of NioEventLoopGroup that we analyzed in the last class? That's right: MultithreadEventLoopGroup. We will enter MultithreadEventLoopGroup:

  1. protected MultithreadEventLoopGroup( int nThreads, Executor executor, Object... args) {
  2. //When the number of threads is 0, use the default cpu * 2
  3. super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
  4. }

Do you remember what nThreads is? Is it 0? Here is a judgment. When the number of your threads is 0, DEFAULT_EVENT_LOOP_THREADS will be used as the number of thread pools. What is DEFAULT_EVENT_LOOP_THREADS?

  1. DEFAULT_EVENT_LOOP_THREADS = Math. max (1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads" , NettyRuntime.availableProcessors() * 2));

The default is twice the CPU, so we now come to a conclusion that when we use the default NioEventLoopGroup, the system will use the system CPU core number * 2 as the number of thread pools by default!

The selectorProvider, rejection strategy, and selectStrategyFactory we passed in the previous step are encapsulated as an array and placed at args[0], args[1], and args[2]!

Let's go back to the main thread and call the parent class again, MultithreadEventExecutorGroup:

  1. protected MultithreadEventExecutorGroup( int nThreads, Executor executor, Object... args) {
  2. this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
  3. }

Note that an extra parameter is passed here again: DefaultEventExecutorChooserFactory, a selector factory, which returns a selector of type DefaultEventExecutorChooserFactory. The specific analysis will be analyzed later! Let's go back to the main line:

  1. protected MultithreadEventExecutorGroup( int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
  2. ..........Subsequent source code supplement..........
  3. }

Here we finally see a large section of code. Here is the main logic of EventLoopGroup. Let's analyze it line by line:

2. Building a thread executor

1. Source code analysis

  1. //newDefaultThreadFactory builds a thread factory
  2. if (executor == null ) {
  3. //Create and save the thread executor. The default thread pool for executing tasks is DefaultThreadFactory.
  4. executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
  5. }

Here we will determine whether the executor we passed in is empty, otherwise we will create a new one. Do we remember what the value of executor is? It is null, right? So it will enter the logic here. Let's go into the newDefaultThreadFactory source code and take a look:

newDefaultThreadFactory() main logic

  1. protected ThreadFactory newDefaultThreadFactory() {
  2. return new DefaultThreadFactory(getClass());
  3. }

As you can see, a DefaultThreadFactory, a default thread factory, is passed into the executor!

ThreadPerTaskExecutor main logic:

  1. /**
  2. * io.netty.util.concurrent.DefaultThreadFactory#newThread(java.lang.Runnable)
  3. *
  4. * Execute a task Each time a task is executed, a thread entity object is created
  5. * @param command thread
  6. */
  7. @Override
  8. public void execute (Runnable command) {
  9. //Execute the task
  10. threadFactory.newThread(command).start();
  11. }

We found that a thread factory we passed in was called here, a new thread was created and the start method was called to start it. So how was it created? We go into the newThread source code to check. Since the thread factory we use by default is DefaultThreadFactory, we will go to DefaultThreadFactory#newThread

  1. @Override
  2. public Thread newThread(Runnable r) {
  3. //Create a thread. A thread entity will be created each time a task is executed.
  4. Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
  5. try {
  6. if (t.isDaemon() != daemon) {
  7. t.setDaemon(daemon);
  8. }
  9.  
  10. if (t.getPriority() != priority) {
  11. t.setPriority(priority);
  12. }
  13. } catch (Exception ignored) {
  14. // Doesn't matter even if failed to   set .
  15. }
  16. return t;
  17. }

There are not many operations here, just encapsulating a Runnable as a Thread and returning it. Let's focus on this Thread. Is it the same as the Thread we traditionally use? Let's follow up with the newThread method and take a look:

  1. protected Thread newThread(Runnable r, String name ) {
  2. //Netty's own encapsulated thread
  3. return new FastThreadLocalThread(threadGroup, r, name );
  4. }

The logic is very simple, that is, wrap a Thread as Netty's customized FastThreadLocalThread. As for why, we will not explain it further for now, and the subsequent chapters will explain it in detail!

2. Thread Executor Summary

Here we will create a thread executor ThreadPerTaskExecutor, using the default thread factory DefaultThreadFactory. The thread executor will wrap a task as a FastThreadLocalThread object, and then call the start method to start a new thread to execute the task!

3. Create the corresponding number of executors

  1. //Create an array of executors. The number is consistent with the preset number of threads.
  2. children = new EventExecutor[nThreads];
  3.  
  4. for ( int i = 0; i < nThreads; i ++) {
  5. boolean success = false ;
  6. try {
  7. //Create an executor Start creating an executor. The executor here is probably EventLoop, which is NioEventLoop
  8. children[i] = newChild(executor, args);
  9. success = true ;
  10. } catch (Exception e) {
  11. .....Omit unnecessary code
  12. finally
  13. .....Omit unnecessary code
  14. }
  15. }

1. Source code analysis

  1. children = new EventExecutor[nThreads];

First he will create an empty EventExecutor executor array, and then iterate and fill it!

Remember what nThreads is? The default is CPU*2, so CPU*2 executors will be created here! We found that the main logic filled in the for loop is newChild, so we enter the newChild method. Here is a hint, what kind of object is the Group object we created? It is a NioEventLoopGroup object, right? So here we will enter the NioEventLoopGroup#newChild method:

  1. @Override
  2. protected EventLoop newChild(Executor executor, Object... args) throws Exception {
  3. EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null ;
  4. return new NioEventLoop(this, executor, (SelectorProvider) args[0],
  5. ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
  6. }

The length of the args we passed is 3, which has been parsed before:

args[0] is selectorProvider, args[1] is rejection strategy, args[2] is selectStrategyFactory

So queueFactory is null. Then we focus on the NioEventLoop object. It can be seen that the newChild method returns NioEventLoop, so we can preliminarily determine that the NioEventLoop object exists in the EventExecutor array! So far, we will not go into it. I will analyze the initialization source code of NioEventLoop in the next lesson. Here we can be sure of one thing. The NioEventLoop object exists in the EventExecutor array! Let's go back to the main line:

2. Executor Array Summary

After the for loop is completed, the EventExecutor[nThreads] array is filled, and each element in it is a NioEventLoop object, and each NioEventLoop object contains a ThreadPerTaskExecutor thread executor object!

4. Create an executor selector

1. Source code analysis

  1. chooser = chooserFactory.newChooser(children);

Do you remember what type chooserFactory is? It is of type DefaultEventExecutorChooserFactory. If you forget, you can look up the code in the source code or debug it!

We enter the DefaultEventExecutorChooserFactory#newChooser source code logic and pass in the array we just looped and filled:

  1. @Override
  2. public EventExecutorChooser newChooser(EventExecutor[] executors) {
  3. //Judge the power of 2 isPowerOfTwo
  4. if (isPowerOfTwo(executors.length)) {
  5. return new PowerOfTwoEventExecutorChooser(executors);
  6. } else {
  7. //Simple
  8. return new GenericEventExecutorChooser(executors);
  9. }
  10. }

As you can see, there seem to be two cases here, and different strategy objects are returned. When the length of your array is an idempotent power of 2, the PowerOfTwoEventExecutorChooser object is returned, otherwise the GenericEventExecutorChooser object is returned. Let's analyze both cases:

I. PowerOfTwoEventExecutorChooser

  1. PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
  2. this.executors = executors;
  3. }
  4.  
  5. @Override
  6. public EventExecutor next () {
  7. //2's idempotence can also achieve cyclic access
  8. //executors is the NioEventLoop array. According to the power of 2, find out what the EventLoop obtained this time is.
  9. return executors[idx.getAndIncrement() & executors.length - 1];
  10. }

The main logic of this code is to take a self-increasing CAS class and perform an & operation with the array length, which will eventually result in a loop to obtain the number:

As can be seen from the above picture, this function can realize a cyclic function of fetching numbers. Every time the tail of the array is reached, it will go back to the head and fetch again!

Code example:

  1. public   static void main(String[] args) {
  2. String[] strings = { "first" , "second" , "third" , "fourth" };
  3. AtomicInteger idx = new AtomicInteger();
  4. for ( int i = 0; i < 9; i++) {
  5. System. out .println(strings[idx.getAndIncrement() & strings.length -1]);
  6. }
  7.  
  8. }

Result Set

  1. The first one
  2. The second
  3. The third
  4. The fourth
  5. The first one
  6. The second
  7. The third
  8. The fourth
  9. The first one

II. GenericEventExecutorChooser

When the number of your threads is not a power of 2, a general selector will be used. The specific implementation source code is as follows:

  1. GenericEventExecutorChooser(EventExecutor[] executors) {
  2. this.executors = executors;
  3. }
  4.  
  5. @Override
  6. public EventExecutor next () {
  7. //Self-increment and modulus to achieve the purpose of the loop
  8. //Assuming the length of executors is 5, continuous looping will continuously produce 0 1 2 3 4 0 1 2 3 4...
  9. return executors[Math. abs (idx.getAndIncrement() % executors.length)];
  10. }

I don't need to demonstrate this code. Its function is the same as the above one. It can achieve the function of cyclically obtaining numbers.

think

Why does Netty have to be implemented in two strategy classes? Can't we just use the second one?

Netty's official requirements for performance have reached the extreme. Everyone should know that the speed of bit operations is higher than that of direct modulus operations, so Netty has made an optimization even for this point!

2. Actuator Selector Summary

From the above, we can understand that a selector will be created through a selector factory and saved in NioEvenetLoopGroup. Calling the next method of the selector will return a NioEventLoop object, which is obtained by continuous looping and obtaining NioEventLoop objects in turn. This is also the basis for NioEventLoop to SocketChannel to be one-to-many! This is all later!

NioEventLoopGroup source code summary

  1. Create a thread executor. When the execute method of the thread executor is called, a Runable object will be packaged as a Thread object, and then the Thread object will be packaged as a FastThreadLocalThread object, and then started! In short, every time the execute method is called, a new thread is created and started to execute the task!
  2. Create an executor array. The length of the array is related to the number we pass. The default is CPU*2. Then loop to fill the empty array. The element in the array is a NioEventLoop object. Each NioEventLoop will hold a reference to a thread executor!
  3. Create an executor selector. Calling the next method of the executor selector can return a NioEventLoop object. Internally, it performs a loop to obtain data. Each NioEventLoop may be obtained multiple times!

<<:  The IPv6 in-depth promotion meeting was held in Beijing. Why does China want to promote IPv6 "desperately"?

>>:  Learn InnoDB tablespace

Recommend

Seven trends revealing the future of mobile app development

【51CTO.com Quick Translation】 Undoubtedly, mobile...

Design and analysis of weak current intelligent system in intelligent building

The intelligentization of weak-current electricit...

Juniper Networks' SD-WAN as a Service Reshapes Enterprise Branch Networks

On April 9, 2019, Juniper Networks, a provider of...

Huawei: Realizing a truly bright future for the Internet of Things

Huawei has always been an active promoter and pra...

Photos: 2017 Huawei Connect Conference leaders and guests' wise words

HUAWEI CONNECT 2017 opened on September 5 at the ...

Encyclopedia | What is structured cabling?

What is structured cabling? In short, it is a sta...

How cloud services enable a 5G-driven future

As high-speed cellular networks become mainstream...

CloudCone: $16.5/year - 1GB/50GB/3TB monthly traffic/Los Angeles data center

CloudCone has released a new promotional package,...