Kuaishou builds a real-time data warehouse scenario-based practice based on Flink

Kuaishou builds a real-time data warehouse scenario-based practice based on Flink

This article is compiled from the topic "Kuaishou's Scenario-Based Practice of Building Real-Time Data Warehouse Based on Flink" shared by Li Tianshuo, a data technology expert at Kuaishou, at the Flink Meetup in Beijing on May 22. The content includes:

1. Kuaishou real-time computing scenario

2. Kuaishou's real-time data warehouse architecture and safeguards

3. Kuaishou scenario problems and solutions

4. Future Planning

1. Kuaishou Real-time Computing Scenario

The real-time computing scenarios in Kuaishou’s business are mainly divided into four parts:

Core data at the company level: including the company's overall business performance, real-time core daily reports, and mobile data. This means that the team will have the company's overall performance indicators, as well as each business line, such as video-related and live-streaming-related, and each will have a core real-time dashboard;

Real-time indicators of large-scale events: The core content is the real-time large screen. For example, for Kuaishou's Spring Festival Gala event, we will have an overall large screen to view the overall status of the event. A large-scale event will be divided into N different modules, and we will have different real-time data dashboards for different gameplay in each module;

Data from the operational part: operational data mainly includes two aspects, one is the creator and the other is the content. For creators and content, on the operational side, for example, when launching a big V event, we want to see some information such as the real-time status of the live broadcast room and the traction of the live broadcast room on the market. Based on this scenario, we will do multi-dimensional data of various real-time large screens, as well as some data of the market. In addition, this also includes support for operational strategies, for example, we may discover some hot content and hot creators in real time, as well as some current hot situations. We output strategies based on these hot situations, which is also some of the support capabilities we need to provide; finally, it also includes C-end data display, for example, there are now creator centers and anchor centers in Kuaishou, there will be some such as the anchor's off-broadcast page, and part of the real-time data of the off-broadcast page is also done by us.

Real-time features: include search recommendation features and advertising real-time features.

2. Kuaishou’s real-time data warehouse architecture and safeguards

1. Objectives and Difficulties

1.1 Objectives

First of all, since we are engaged in data warehouse, we hope that all real-time indicators have corresponding offline indicators. We require that the overall data difference between real-time indicators and offline indicators be within 1%, which is the minimum standard.
The second is data delay. The SLA standard is that the data delay of all core reporting scenarios during the activity cannot exceed 5 minutes. These 5 minutes include the time after the job fails and the recovery time. If it exceeds this time, it means that the SLA is not met.
Finally, regarding stability, for some scenarios, such as after a job is restarted, our curve is normal, and there will be no obvious anomalies in indicator output due to job restart.

1.2 Difficulties

The first difficulty is the large amount of data. The overall daily ingress traffic data volume is about trillions. In the event of activities such as the Spring Festival Gala, the QPS peak can reach 100 million per second.
The second difficulty is that component dependencies are relatively complex. Some of this link may depend on Kafka, some on Flink, and some on KV storage, RPC interface, OLAP engine, etc. We need to think about how to distribute them in this link so that these components can work properly.
The third difficulty is the complexity of the links. We currently have more than 200 core business operations, more than 50 core data sources, and more than 1,000 total operations.

2. Real-time data warehouse - layered model

Based on the above three difficulties, let's take a look at the data warehouse architecture:

As shown above:

The bottom layer has three different data sources, namely client logs, server logs, and Binlog logs;
The public basic layer is divided into two different layers, one is the DWD layer, which is used for detailed data, and the other is the DWS layer, which is used for public aggregated data. DIM is what we often call dimension. We have a topic pre-stratification based on offline data warehouses. This topic pre-stratification may include traffic, users, devices, video production and consumption, risk control, social networking, etc. The core work of the DWD layer is standardized cleaning; the DWS layer associates the dimensional data with the DWD layer, and generates some aggregation levels of common granularity after association.
The next layer is the application layer, which includes some large-scale data, multi-dimensional analysis models, and business-specific data.
At the top is the scene.
The overall process can be divided into three steps:

The first step is to digitize the business, which is equivalent to bringing in the business data;
The second step is data assetization, which means cleaning the data and forming some regular and orderly data.
The third step is to commercialize data. It can be understood that data can feed back to the business at the real-time data level and provide some empowerment for the construction of business data value.

3. Real-time data warehouse-safeguard measures

Based on the above hierarchical model, let's take a look at the overall safeguards:

The guarantee level is divided into three different parts: quality assurance, timeliness assurance and stability assurance.

Let's first look at the quality assurance in the blue part. Regarding quality assurance, you can see that in the data source stage, we have done things like out-of-order monitoring of the data source, which we did based on our own SDK collection, as well as consistency calibration of the data source and offline. The calculation process in the R&D stage has three stages, namely the R&D stage, the online stage, and the service stage. The R&D stage may provide a standardized model, based on which there will be some benchmarks, and offline comparison and verification will be done to ensure that the quality is consistent; the online stage is more about service monitoring and indicator monitoring; in the service stage, if some abnormal situations occur, the Flink state will be pulled up first. If some scenarios that do not meet expectations occur, we will perform offline overall data repair.
The second is timeliness guarantee. For data sources, we also include the delay of data sources in monitoring. In fact, there are two things in the R&D stage: the first is stress testing. For routine tasks, the peak traffic of the last 7 days or the last 14 days will be used to see if there is any task delay; after stress testing, there will be some task launch and restart performance evaluation, which is equivalent to what the restart performance will be like after CP recovery.
The last one is stability guarantee, which will be done more frequently in large-scale activities, such as switching drills and graded guarantees. We will limit the flow based on the previous stress test results, with the aim of ensuring that the operation is still stable when it exceeds the limit, and there will not be many instabilities or CP failures. In the future, we will have two different standards, one is cold standby dual computer rooms, and the other is hot standby dual computer rooms. Cold standby dual computer rooms are: when a single computer room hangs up, we will pull it up from another computer room; hot standby dual computer rooms: it is equivalent to deploying the same logic once in each of the two computer rooms.
The above are our overall safeguards.

3. Kuaishou scenario problems and solutions

1. PV/UV Standardization

1.1 Scenario

The first question is PV/UV normalization, here are three screenshots:

The first picture is a warm-up scene for the Spring Festival Gala event, which is equivalent to a way of playing. The second and third pictures are screenshots of the red envelope distribution activity and the live broadcast room on the day of the Spring Festival Gala.

During the activity, we found that 60~70% of the demand was to calculate the information on the page, such as:

How many people came to this page, or how many people clicked to enter this page;
How many people came to the event?
How many clicks and exposures a widget on the page received.

1.2 Solution

To abstract this scenario, the SQL is as follows:

Simply put, it is to make a filter condition from a table, then aggregate it according to the dimension level, and then generate some Count or Sum operations.

Based on this scenario, our initial solution is shown on the right side of the above figure.

We used the Early Fire mechanism of Flink SQL to retrieve data from the Source data source, and then bucketed the data by DID. For example, the purple part was bucketed at the beginning. The reason for bucketing first is to prevent the problem of hot spots in a certain DID. After bucketing, there will be something called Local Window Agg, which is equivalent to adding data of the same type after the data is bucketed. After the Local Window Agg, the Global Window Agg is merged according to the dimension. The concept of merging buckets is equivalent to calculating the final result according to the dimension. The Early Fire mechanism is equivalent to opening a day-level window in the Local Window Agg and then outputting it once a minute.

We encountered some problems during this process, as shown in the lower left corner of the above picture.

There is no problem when the code is running normally. However, if there is a delay in the overall data or the historical data is traced back, such as Early Fire once a minute, because the amount of data will be large when tracing back the history, it may cause the history to be traced back at 14:00 and the data at 14:02 will be read directly, and the point at 14:01 will be lost. What will happen after it is lost?

In this scenario, the curve at the top of the figure is the result of Early Fire's retrospective historical data. The horizontal axis is minutes, and the vertical axis is the page UV up to the current moment. We found that some points are horizontal, which means there is no data result, and then a sharp increase, then horizontal again, and then another sharp increase, and the expected result of this curve is actually the smooth curve at the bottom of the figure.

To solve this problem, we used the Cumulate Window solution, which is also included in Flink 1.13 and has the same principle.

A large day-level window is opened for the data, and a small minute-level window is opened under the large window. The data falls into the minute-level window according to the Row Time of the data itself.

When the watermark advances past the event_time of the window, it triggers a downward dispatch. This solves the backtracking problem. The data itself falls in the real window, the watermark advances, and the trigger is triggered after the window ends.
In addition, this method can solve the disorder problem to a certain extent. For example, its disordered data itself is in a state that is not discarded, and the latest accumulated data will be recorded.
The last one is semantic consistency, which is based on event time. When the disorder is not serious, the consistency with the offline calculation results is quite high.
The above is a standardized solution for PV/UV.

2. DAU calculation

2.1 Background

Here is an introduction to DAU calculation:

We have more monitoring of the entire market's active devices, new devices and returning devices.

Active devices refer to devices that have visited that day;
Newly added devices refer to devices that have come on the day and have not come in the past;
Returning devices refer to devices that came on the current day but have not come within N days.
However, during the calculation process, we may need 5 to 8 different topics to calculate these indicators.

Let’s take a look at how the logic should be calculated during the offline process.

First, we calculate the active devices, merge them together, and then perform daily deduplication under one dimension. Then we associate the dimension table, which includes the first and last time of the device, that is, the time when the device was first accessed and last accessed as of yesterday.

After getting this information, we can perform logical calculations, and then we will find that the newly added and returned devices are actually sub-labels of the active devices. The newly added devices are processed logically, and the returned devices are processed logically for 30 days. Based on this solution, can we simply write an SQL to solve this problem?

In fact, we did this at first, but encountered some problems:

The first problem is that there are 6 to 8 data sources, and the caliber of our market is often fine-tuned. If it is a single operation, it must be changed during each fine-tuning process, and the stability of the single operation will be very poor;
The second problem is that the amount of data is at the trillion level, which will lead to two problems. First, the stability of a single job at this level is very poor. Second, the KV storage used for real-time association of dimension tables. Any such RPC service interface cannot guarantee service stability in a scenario with a data volume of trillions.
The third problem is that we have high requirements for latency, requiring it to be less than one minute. The entire link should avoid batch processing. If there are single-point problems with task performance, we must also ensure high performance and scalability.

2.2 Technical Solution

In response to the above problems, here is how we do it:

As shown in the example above, the first step is to perform minute-level deduplication on the three data sources ABC according to the dimension and DID. After deduplication, three minute-level deduplication data sources are obtained. Then they are unioned together and the same logical operation is performed again.

This is equivalent to the entry of our data source changing from trillions to tens of billions. After deduplication at the minute level and then at the day level, the resulting data source can be increased from tens of billions to billions.

In the case of billions of data, we can associate data with services, which is a more feasible state. It is equivalent to associating the RPC interface of the user portrait, and finally writing it to the target Topic after obtaining the RPC interface. This target Topic will be imported into the OLAP engine to provide multiple different services, including mobile version services, large screen services, indicator dashboard services, etc.

This solution has three advantages: stability, timeliness and accuracy.

The first is stability. Loose coupling can be simply understood as when the logic of data source A and the logic of data source B need to be modified, they can be modified separately. The second is task scalability, because we split all the logic into very fine granularity, when some places have problems such as traffic problems, it will not affect the subsequent parts, so it is relatively simple to expand. In addition, there is service post-positioning and state controllability.
The second is timeliness. We achieve millisecond latency and rich dimensions. Overall, there are 20+ dimensions for multi-dimensional aggregation.
Finally, regarding accuracy, we support data validation, real-time monitoring, model export unification, etc.
At this point, we encountered another problem - disorder. For the three different jobs above, each job restart will have a delay of at least two minutes. The delay will cause the downstream data sources to be out of order when they are unioned together.

2.3 Delayed Calculation Scheme

How do we deal with the disordered situation above?

We have three solutions in total:

The first solution is to use "did + dimension + minute" to deduplicate, and set the Value to "whether it has been there". For example, for the same did, if there is a message at 04:01, it will output the result. Similarly, if there is a message at 04:02 and 04:04, it will also output the result. But if there is a message at 04:01, it will be discarded, but if there is a message at 04:00, it will still output the result.
This solution has some problems, because we store data by minutes, and the size of the state stored for 20 minutes is twice that of the state stored for 10 minutes. Later on, the state size was a bit uncontrollable, so we switched to solution 2.
The second solution involves an assumption that there is no disorder in the data source. In this case, the key stores "did + dimension" and the value is "timestamp". Its update method is shown in the figure above. A piece of data comes at 04:01 and the result is output. A piece of data comes at 04:02. If it is the same did, it will update the timestamp and still output the result. The same logic applies to 04:04, and then the timestamp is updated to 04:04. If a piece of data at 04:01 comes later and it finds that the timestamp has been updated to 04:04, it will discard the data. This approach greatly reduces some of the states required, but it has zero tolerance for disorder and does not allow any disorder to occur. Since it is difficult for us to solve this problem, we came up with solution 3.
Solution 3 is based on the timestamp of Solution 2, and adds a circular buffer, which allows disorder within the buffer.
For example, if a piece of data comes in at 04:01, the result will be output; if a piece of data comes in at 04:02, it will update the timestamp to 04:02 and record that the same device also came in at 04:01. If another piece of data comes in at 04:04, it will shift the data according to the corresponding time difference. Finally, this logic is used to ensure that it can tolerate a certain degree of disorder.
Let’s take a look at these three options in general:

Solution 1: Under the condition of tolerating 16 minutes of disorder, the state size of a single job is about 480G. Although this situation ensures accuracy, the recovery and stability of the job are completely uncontrollable, so we gave up this solution;
Solution 2 has a state size of about 30G and is tolerant to out-of-order data, but the data is inaccurate. Since we have very high requirements for accuracy, we also gave up this solution.
Compared with Solution 1, Solution 3 has a different status, but the increase is not much, and the overall effect is the same as Solution 1. Solution 3 can tolerate disorder for 16 minutes. If we update a job normally, 10 minutes is enough to restart it, so we finally chose Solution 3.

3. Operational scenarios

3.1 Background

The operation scenario can be divided into four parts:

The first is data large screen support, including analysis data of a single live broadcast room and analysis data of the market, which needs to be delayed by minutes and has high update requirements;
The second is the support for live broadcast dashboards. The data of live broadcast dashboards will be analyzed in specific dimensions and supported by specific groups, which requires high dimensional richness.
The third is the data strategy list, which is mainly used to predict popular works and best-selling products. It requires hourly data and has relatively low update requirements.
The fourth is the real-time indicator display on the C-end. The query volume is relatively large, but the query mode is relatively fixed.
The following analyzes some different scenarios generated by these four different states.

There is basically no difference between the first three types, except that some are for specific business scenarios and some are for general business scenarios in terms of query mode.

For the third and fourth types, the requirements for updates are relatively low, the requirements for throughput are relatively high, and the curves in the process do not require consistency. The fourth query mode is more about single-entity queries, such as querying content and what indicators there are, and has relatively high QPS requirements.

3.2 Technical Solution

How do we do it for the 4 different scenarios above?

First, let's look at the basic detail layer (left side of the picture). The data source has two links, one of which is the consumption flow, such as consumption information of live broadcasts, and views/likes/comments. After a round of basic cleaning, dimension management is performed. These upstream dimension information comes from Kafka. Kafka writes some content dimensions and puts them into KV storage, including some user dimensions.
After these dimensions are associated, they are finally written to Kafka's DWD fact layer. To improve performance, we perform a secondary cache operation.
As shown in the upper part of the figure, we read the data of the DWD layer and then do basic aggregation. The core is to aggregate the window dimensions to generate four data of different granularities, namely the multi-dimensional summary topic of the market, the multi-dimensional summary topic of the live broadcast room, the multi-dimensional summary topic of the author, and the multi-dimensional summary topic of the user. These are all data of common dimensions.
As shown in the figure below, based on these general dimension data, we will process the personalized dimension data, which is the ADS layer. After obtaining these data, there will be dimension expansion, including content expansion and operation dimension expansion, and then aggregation, such as e-commerce real-time topic, institutional service real-time topic and celebrity live broadcast real-time topic.
There is an advantage to dividing into two links: one is to process the general dimension, and the other is to process the personalized dimension. The general dimension has higher security requirements, and the personalized dimension will do a lot of personalized logic. If the two are coupled together, it will be found that tasks often go wrong, and it is unclear which task is responsible for what, and it is impossible to build such a stable layer.
As shown in the right figure, we ended up using three different engines. In simple terms, Redis queries were used in C-end scenarios, and OLAP queries were used in large screens and business dashboards.

IV. Future Planning

The above article discusses three scenarios in total. The first scenario is the calculation of standardized PU/UV, the second scenario is the overall solution for DAU, and the third scenario is how to solve it on the operation side. Based on these contents, we have some future plans, which are divided into 4 parts.

The first part is to improve the real-time guarantee system: on the one hand, we will do some large-scale activities, including the Spring Festival Gala and subsequent regular activities. We have a set of specifications to build a platform for how to guarantee these activities; the second part is to formulate graded guarantee standards, which operations have what kind of guarantee level/standard, and there will be a standardized description; the third part is to promote the solution of engine platform capabilities, including some engines for Flink tasks. We will have a platform on this platform to promote norms and standards.
The second part is the construction of real-time data warehouse content: on the one hand, it is the output of scenario-based solutions, such as some general solutions for activities, rather than developing a new set of solutions for each activity; on the other hand, it is the sedimentation of content data levels. For example, the current data content construction lacks some scenarios in terms of thickness, including how content can better serve upstream scenarios.
The third part is the scenario-based construction of Flink SQL, including the continuous promotion of SQL, the stability of SQL tasks, and the resource utilization of SQL tasks. When estimating resources, we will consider, for example, what kind of solution SQL uses and what kind of situations it can support in what kind of QPS scenario. Flink SQL can greatly reduce labor efficiency, but in this process, we want to make business operations simpler.
The fourth part is the exploration of batch and stream integration. The scenario of real-time data warehouse is actually to accelerate offline ETL computing. We have many tasks at the hour level. For these tasks, some logic can be put into stream processing every time batch processing is performed. This is a huge improvement for the offline data warehouse SLA system.

<<:  Building a streaming data lake using Flink Hudi

>>:  5G enters the second half, the difficulty of ToB lies in the "three highs"

Blog    

Recommend

These core Internet protocols are gradually changing

The Internet we are familiar with used to mainly ...

Let's talk about DNS formal verification technology

background The Domain Name System (DNS) is a dist...

5G UPF traffic distribution technology and deployment methods

Labs Guide The User Plane Function (UPF) is an im...

What is 5G NR? Learn about the new radio standard

What is 5G NR 5G sets new standards for mobile co...

5G Era: Say Goodbye to TCP/IP

I just don't love you anymore, a song that ca...

Why is your router's ability to penetrate walls poor?

1. Is it my fault that the signal is weak? Whethe...

How edge computing and 5G can drive business applications

Over the past decade, advances in cloud computing...