Flink 1.14 New Features Preview

Flink 1.14 New Features Preview

This article is compiled by community volunteer Chen Zhengyu, and the content is based on the "Flink 1.14 New Features Preview" shared by Alibaba technical expert Song Xintong (Wuzang) at the online Flink Meetup on August 7. The main content is:

1. Introduction

2. Flow and batch integration

3. Checkpoint Mechanism

4. Performance and efficiency

5. Table / SQL / Python API

6. Summary

This article is a summary of the sharing on August 7. The latest progress of version 1.14 is explained in the form of comments at the end of the article.

1. Introduction

The 1.14 new version was originally planned to include 35 important new features and optimization tasks, of which 26 have been completed; it is uncertain whether 5 tasks can be completed on time; and the other 4 features will be completed in subsequent versions due to time or design reasons. [1]

Compared with previous versions, 1.14 does not include many optimizations and new features. In fact, by observing the release rhythm, we can find that usually after 1-2 major versions, a version with slightly fewer changes will be released, the main purpose of which is to stabilize some features.

Version 1.14 is positioned as a quality improvement and maintenance version. It is expected that the development of new features will stop on August 16, and it may be officially released in September. If you are interested, you can follow the following link to track the progress of feature releases.

Wiki: https://cwiki.apache.org/confluence/display/FLINK/1.14+Release
Jira: https://issues.apache.org/jira/projects/FLINK/versions/12349614

2. Flow and Batch Integration

In fact, stream-batch integration has received continuous attention since Flink version 1.9. As an important part of the community RoadMap, it is an inevitable trend for big data real-time. However, on the other hand, traditional offline computing needs will not be completely replaced by real-time tasks, but will exist for a long time.

When both real-time and offline requirements exist, the previous independent technical solutions for streaming and batch processing have some pain points, such as:

Two systems need to be maintained, which requires two groups of developers, resulting in high manpower investment costs;
In addition, two sets of data links processing similar content bring maintenance risks and redundancy;
The most important point is that if the stream and batch data are not processed by the same data processing system, the differences in the engines themselves may lead to inconsistent data calibers, resulting in certain errors in business data. This error will have a relatively large impact on big data analysis.
In this context, the Flink community has identified the real-time offline integration technology route as a more important technology trend and direction.

In the past few versions, Flink has done a lot of work on stream-batch integration. It can be said that Flink has truly achieved the same mechanism for stream and batch operation at the engine level, API level, and operator execution level. However, there are two different modes in the specific execution mode of the task:

For infinite data streams, the stream execution mode is uniformly adopted. The stream execution mode means that all computing nodes are connected through the Pipeline mode. Pipeline means that the upstream and downstream computing tasks are run simultaneously. As the upstream continuously produces data, the downstream continuously consumes data. This full Pipeline execution method can: indicate when the data is generated through eventTime; know at which point in time the data has arrived through watermark; maintain the intermediate state of the calculation through state; and perform fault tolerance through Checkpoint. The following figure shows different execution modes:

There are two execution modes for a limited data set. We can treat it as a limited data stream for processing, or we can treat it as a batch execution mode. Although the batch execution mode also has eventTime, it only supports positive infinity for watermark. After sorting the data and state, it will have more options for task scheduling and shuffle. There are differences between the stream and batch execution modes. The most important thing is that the batch execution mode has an intermediate process of disk flushing. Only when the current task is completed, the downstream task will be triggered. This fault tolerance mechanism is fault-tolerant through shuffle. The two also have their own execution advantages: for the stream execution mode, it does not have the pressure of disk flushing, and fault tolerance is based on data segmentation, and breakpoint recovery is ensured by continuously checking the data; however, in batch processing, because it needs to be flushed to the disk after shuffle, there will be pressure on the disk. However, because the data is sorted, the subsequent computing efficiency may be improved for batches. At the same time, tasks are executed in segments during execution, and there is no need to execute them simultaneously. In terms of fault-tolerant calculation, fault tolerance is performed based on stage. Both have their own advantages and disadvantages, and you can choose according to the specific scenario of the operation.
The optimization point of Flink 1.14 is mainly aimed at how to process finite data sets in the stream execution mode. The biggest difference between the previous processing of infinite data sets and the current processing of finite data sets is the introduction of the concept of "tasks may end". In this case, some new problems arise, as shown in the following figure:

Checkpoint Mechanism in Stream Execution Mode For infinite streams, its Checkpoint is triggered by all source nodes, and the source node sends a Checkpoint Barrier. When the Checkpoint Barrier flows through the entire job, it will also store all the states of the current job. In the Checkpoint mechanism of finite streams, Tasks may end early. The upstream Task may finish the task first and exit early, but the downstream Task is still executing. Under the same stage with different concurrency, some tasks may be completed early due to inconsistent data volume. In this case, how to perform Checkpoint in subsequent execution jobs? In 1.14, JobManager dynamically determines where the Checkpoint Barrier is triggered based on the execution status of the current task. At the same time, after some tasks are completed, the subsequent Checkpoint will only save the stage corresponding to the Task that is still running. In this way, after the task is completed, Checkpoints can continue to be performed, providing better fault tolerance in finite stream execution.


Two-phase commit after task completion

When using some Sinks, such as the Kafka Sink in the figure below, Tasks need to rely on the Checkpoint mechanism for two-phase commit to ensure the exactly-once consistency of the data.

Specifically, during the Checkpoint process, each operator will only perform operations that are ready to be submitted. For example, data will be submitted to an external temporary storage directory. After all tasks complete the Checkpoint, they will receive a signal, and then they will perform a formal commit, submitting all distributed temporary files to the external system in a transaction at one time.

In the current limited flow situation, this algorithm cannot guarantee a checkpoint after the job is completed. So how to submit the last part of the data?

In 1.14, this problem has been solved. After a task has processed all data, it must wait for the checkpoint to complete before it can officially exit. This is an improvement in the integration of stream and batch processing for the termination of finite stream tasks.

3. Checkpoint Mechanism

1. Pain points of the existing Checkpoint mechanism

Currently, Flink triggers Checkpoints by circulating barriers between operators. Barriers are sent downstream along with operators. When an operator encounters a barrier, a snapshot is taken and the barrier is sent downstream. For multi-path scenarios, we align barriers and temporarily block the data that arrives at the barrier first. We take a snapshot after both barriers arrive and then continue to send the barrier downstream.

The existing Checkpoint mechanism has the following problems:

Failure to make a checkpoint during back pressure: During back pressure, the barrier cannot flow downstream with the data, so it is impossible to make a checkpoint during back pressure. However, when back pressure occurs, we need to make a checkpoint for the data even more, because at this time, the performance encounters a bottleneck and it is more likely to cause problems;

Barrier alignment blocks data processing: Barrier alignment has a certain impact on performance;

Recovery performance is limited by the checkpoint interval: When doing recovery, the impact of latency often depends on the checkpoint interval. The larger the interval, the more data needs to be replayed, which will cause a greater impact of the interruption. However, the checkpoint interval is currently limited by the persistence operation time, so it cannot be done very quickly.

2. Unaligned Checkpoint

In response to these pain points, Flink has been continuously optimizing in recent versions. Unaligned Checkpoint is one of the mechanisms. When the barrier operator reaches the front of the input buffer, it will start to trigger the Checkpoint operation. It will immediately pass the barrier to the front of the operator's Output Buffer, which means that it will be immediately read by the downstream operator. In this way, the barrier will not be blocked by data, solving the problem of being unable to perform Checkpoint during back pressure.

After we send the barrier, we need to pause briefly. During the pause, we will mark the operator's state and the data in the input and output buffers to facilitate subsequent uploads. For multiple channels, we will wait until the data of another barrier arrives and mark them all.

In this way, when doing the checkpoint, there is no need to align the barrier. The only pause required is to mark all buffers and states in the whole process. This method can effectively solve the problem that the checkpoint cannot be made during back pressure, and the barrier alignment blocks the data and affects the performance processing.

3. Generalized Incremental Checkpoint [2]

Generalized Incremental Checkpoint is mainly used to reduce the Checkpoint interval. As shown in Figure 1 on the left, in the Incremental Checkpoint, the operator first writes the state changelog. After writing, the actual data of the change is written to the StateTable. The state changelog is continuously stored externally for persistence. In this process, we don’t actually need to wait for the entire StateTable to perform a persistence operation. We only need to ensure that the changelog of the corresponding Checkpoint can be persisted before we can start the next Checkpoint. The StateTable is a process of independently persisting externally in a periodic manner.

After these two processes are separated, the previous full persistence (Per Checkpoint) is changed to incremental persistence (Per Checkpoint) + background periodic full persistence , thus achieving the same fault tolerance effect. In this process, the amount of data that needs to be persisted for each Checkpoint is reduced, so that the interval between Checkpoints can be greatly reduced.

In fact, RocksDB also supports Incremental Checkpoint. However, there are two problems:

The first problem is that the Incremental Checkpoint of RocksDB relies on its own implementation, which involves some data compression. The time and effect of compression are uncertain, which is related to the data.
The second problem is that it can only be used for a specific StateBackend. The Generalized Incremental Checkpoint currently in use can actually ensure that it is independent of the StateBackend and that a relatively stable and smaller Checkpoint interval is ensured through the runtime mechanism.
Currently, Unaligned Checkpoint has been released in Flink 1.13. In version 1.14, the focus is on bug fixes and additions. As for Generalized Incremental Checkpoint, the community is still making the final push and is expected to be available in 1.14. [2]

4. Performance and Efficiency

1. Optimization of large-scale job scheduling

Improved performance of building Pipeline Region: all subgraphs connected by pipeline edges. In Flink task scheduling, it is necessary to identify Pipeline Region to ensure that tasks connected by the same Pipeline edge can be scheduled at the same time. Otherwise, it is possible that the upstream task starts to be scheduled, but the downstream task is not running. As a result, the data that has been run upstream cannot be consumed by the downstream nodes, which may cause a deadlock.

Task deployment phase: Each task needs to read data from which upstream, and this information will generate a Result Partition Deployment Descriptor.
In previous versions, both of these construction processes have a time complexity of O (n^2). The main problem is that each downstream node needs to traverse each upstream node. For example, it is necessary to traverse each upstream to see if it is a Pipeline edge connection, or to traverse each upstream to generate the corresponding Result Partition information.

Currently, by introducing the concept of group, assuming that the connection between the upstream and downstream tasks is all-to-all, it is equivalent to combining all Pipeline Region information or Result Partition information in the form of Group. In this way, you only need to know which upstream group the downstream corresponds to, and you can optimize an O (n^2) complexity to O (n). We used the wordcount task to do some testing and compared the performance before and after optimization.

From the table, we can see that the build speed has been greatly improved, and the performance of building Pipeline Region has been improved from seconds to milliseconds. For task deployment, we deploy from the first task to the state where all tasks start running. Here, we only count the flow, because the batch needs to be scheduled after the upstream is completed. From the overall time point of view, the entire task initialization, scheduling and deployment stage can probably reduce the time consumption by minutes.

2. Fine-grained resource management

Fine-grained resource management has been done in many versions in the past. In Flink 1.14, this part of the API can finally be opened to users in DataStream. Users can customize the division of SlotSharingGroup in DataStream. As shown in the figure below, the resource division of Slot is defined, which supports DataStream API, custom SSG division method and resource configuration TaskManager dynamic resource deduction.

For each slot, a relatively fine-grained configuration can be used, and we will automatically perform dynamic resource segmentation at Runtime based on the user's resource configuration.

The advantage of doing this is that there will be no fixed resource slots as before, but rather dynamic resource deductions. In this way, we hope to achieve more refined resource management and resource utilization.

5. Table/SQL/Python API

1. Table API / SQL

Window Table-Valued Function supports more operators and window types. You can see the comparison in the following table:

From the table, we can see that the original three window types have been enhanced, and a new Session window type has been added. Aggregate operations are currently supported.

1.1 Support declarative registration of Source/Sink

Table API supports registering Source/Sink functions in a declarative way to align with SQL DDL;
Also supports the new Source interface of FLIP-27;
The new Source replaces the old connect() interface.

1.2 New code generator

The problem of generated code exceeding the maximum Java code limit has been solved. The new code generator will disassemble the code to completely solve the problem of excessively long code.

1.3 Remove Flink Planner

In the new version, Blink Planner will become the only implementation of Flink Planner.

2. Python API

In previous versions, if there are two UDFs executed in sequence, the execution process is as shown on the left of the following figure. There is a Java Operator on the JVM, which first sends the data to the UDF under Python for execution, and then sends it back to Java after execution, and then transmits it to the downstream Operator, and finally performs another cross-process transmission in Python for processing, resulting in many redundant data transmissions.

In version 1.14, the improvements are shown in the right figure. They can be connected together, and only one round trip between Java and Python is required for data communication. By reducing the number of data transmission times, a better performance improvement can be achieved.

3. Support LoopBack mode

In the past, local execution actually ran the client program in the Python process, submitted the Java process, and started a mini cluster to execute the Java code. The Java code will also start a new Python process to execute the corresponding Python UDF, just like the production environment. As can be seen from the figure below, the new process is actually unnecessary for local debugging.

So after supporting the lookback mode, you can let Java opt run the UDF directly in the same process that the Python client ran before, in this way:

The first is to avoid the overhead of starting additional processes;
The most important thing is that in local debugging, we can better use some tools for debugging within the same process, which is an improvement in the developer experience.

VI. Conclusion

This article mainly introduces the main new features of Flink1.14.

First, we introduced the current work of the community on batch and stream integration, and introduced different batch and stream execution modes and the optimization and improvement of JM node task triggering to better integrate batch jobs.
Then, we analyze the pain points of the existing Checkpoint mechanism and discuss how to improve it in the new version, as well as how to optimize performance in large-scale job scheduling and fine-grained resource management.
Finally, the TableSQL API and related performance optimizations on Python are introduced.
Welcome to continue to pay attention to the latest developments of the release and some other technical sharing and special topics in the subsequent release process.

Notes

[1] As of August 31, 33 items have been confirmed to be included in the new version, and all have been completed.

[2] Generalized Incremental Checkpoint was ultimately not completed in 1.14.

<<:  The shortest path to microservice containerization, best practices for microservices on C

>>:  How Network Modernization Drives Digital Transformation

Recommend

What will 6G look like in the future?

[[389986]] At the recently concluded MWC Shanghai...

Gartner: The number of devices in use worldwide will reach 6.2 billion in 2021

[[391127]] Recently, Gartner pointed out that the...

Fiber pre-entry: A new strategy to accelerate FTTH deployment

With the progress and development of human societ...

Accelerate the release of new infrastructure value with data as the core

[[341973]] Yu Yingtao, Co-President of Tsinghua U...

Where is the future research direction of communications?

[[284708]] A few days ago, I attended the second ...

...

Does China need Wi-Fi 6E?

As WRC-23 (2023 World Radiocommunication Conferen...