Building a streaming data lake using Flink Hudi

Building a streaming data lake using Flink Hudi

This article introduces how Flink Hudi continuously optimizes and evolves the original mini-batch-based incremental computing model through stream computing. Users can use Flink SQL to write CDC data to Hudi storage in real time, and the upcoming 0.9 version of Hudi natively supports the CDC format. The main contents are:

1. Background

2. Incremental ETL

3. Demonstration

1. Background

Near real-time

Since 2016, the Apache Hudi community has been exploring use cases for near-real-time scenarios through Hudi's UPSERT capability[1]. Through the MR/Spark batch processing model, users can implement hourly data injection into HDFS/OSS. In pure real-time scenarios, users can implement end-to-end second-level (5-minute-level) real-time analysis through the architecture of stream computing engine Flink + KV/OLAP storage. However, there are still a large number of use cases in scenarios from seconds (5 minutes) to hours, which we call NEAR-REAL-TIME.

In practice, there are a large number of cases that fall into the category of near real-time:

Minute-level large screen;
Various BI analyses (OLAP);
Minute-level feature extraction for machine learning.

Incremental calculation

The solution to near real-time is currently relatively open.

The latency of stream processing is low, but the SQL pattern is relatively fixed and the query-side capabilities (index, ad hoc) are lacking;
The batch processing data warehouse has rich capabilities but large data latency.
Therefore, the Hudi community proposed an incremental computing model based on mini-batch:

Incremental data set => incremental calculation results merge existing results => external storage

This model pulls incremental data sets (data sets before two commits) through snapshots stored in the lake, calculates incremental results (such as simple counts) through batch processing frameworks such as Spark/Hive, and then merges them into the stored results.

Core Issues

The core issues that the incremental model needs to solve are:

UPSERT capability : Similar to KUDU and Hive ACID, Hudi also provides minute-level update capabilities;
Incremental consumption : Hudi provides incremental pulls through multiple snapshots of lake storage.
The mini-batch-based incremental computing model can improve latency in some scenarios and save computing costs, but it has a big limitation: it has requirements for SQL patterns. Because the calculation is done in batches, batch calculations themselves do not maintain status, which requires that the calculated indicators can be merged more conveniently. Simple count and sum can be done, but avg and count distinct still require pulling the full amount of data for recalculation.

With the popularity of stream computing and real-time data warehouses, the Hudi community is also actively embracing change, and continuously optimizing and evolving the original mini-batch-based incremental computing model through stream computing: version 0.7 introduced streaming data into the lake, and version 0.9 supported the native CDC format.

2. Incremental ETL

DB data into the lake

As CDC technology matures, CDC tools such as debezium are becoming more and more popular, and the Hudi community has also integrated stream writing and stream reading capabilities. Users can use Flink SQL to write CDC data to Hudi storage in real time:

Users can directly import DB data into Hudi through the Flink CDC connector;
You can also import CDC data into Kafka first, and then import it into Hudi through the Kafka connector.
The second solution has better fault tolerance and scalability.

Data Lake CDC

In the upcoming version 0.9, Hudi natively supports the CDC format, and all changes to a record can be saved. Based on this, the combination of Hudi and the stream computing system is more complete, and CDC data can be read in a streaming manner [2]:

All message changes of the source CDC stream are saved after entering the lake and used for streaming consumption. Flink's stateful computing accumulates the computing results (state) in real time, and synchronizes the computing changes to the Hudi lake storage through streaming writing Hudi, and then continues to connect Flink streaming consumption Hudi storage changelog to achieve the next level of stateful computing. Near real-time end-to-end ETL pipeline:

This architecture shortens the end-to-end ETL latency to minutes, and the storage format of each layer can be compressed into column storage (Parquet, ORC) through compaction to provide OLAP analysis capabilities. Due to the openness of the data lake, the compressed format can be connected to various query engines: Flink, Spark, Presto, Hive, etc.

A Hudi data lake table has two forms:

Table format: query the latest snapshot results and provide efficient column storage format
Streaming mode: Streaming consumption changes, you can specify the changelog after any point of stream reading

3. Demonstration

We use a demo to demonstrate the two forms of Hudi tables.

Environment Preparation

Flink SQL Client
Hudi master packages hudi-flink-bundle jar
Flink 1.13.1
Here is a section of CDC data in debezium-json format prepared in advance

  1. {"before":null,"after":{"id":101,"ts":1000,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","na me":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0, "gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c"," ts_ms":1589355606100,"transaction":null}{"before":null,"after":{"id":102,"ts":2000,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","nam e":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"g tid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ ms":1589355606101,"transaction":null}{"before":null,"after":{"id":103,"ts":3000,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbse rver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"f ile":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":158935560610 1,"transaction":null}{"before":null,"after":{"id":104,"ts":4000,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1", "ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file": "mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101, "transaction":null}{"before":null,"after":{"id":105,"ts":5000,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1" ,"ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file" :"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101, "transaction":null}{"before":null,"after":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file": "mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101 ,"transaction":null}{"before":null,"after":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"db server1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null," file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":158935560610 1,"transaction":null}{"before":null,"after":{"id":108,"ts":8000,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","n ame":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0, "gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","t s_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":109,"ts":9000,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name": "dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid" :null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1 589355606101,"transaction":null}{"before":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"ts":10000,"name":"hammer","description":"18oz carpenter hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","t s_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"g tid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms": 1589361987936,"transaction":null}{"before":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"ts":11000,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserv er1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gt id":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":158936209 9505,"transaction":null}{"before":null,"after":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbs erver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344," gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362 210230,"transaction":null}{"before":null,"after":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserve r1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344, "gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms": 1589362243428,"transaction":null}{"before":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"ts":14000,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1"," ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gt id":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1 589362293539,"transaction":null}{"before":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"ts":15000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserve r1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344 ,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms" :1589362330904,"transaction":null}{"before":{"id":111,"ts":16000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connect or":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventor y","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2 443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null

Create a table through Flink SQL Client to read CDC data files

  1. Flink SQL > CREATE TABLE debezium_source( > id INT NOT NULL, > ts BIGINT, > name STRING, > description STRING, > weight DOUBLE > ) WITH ( > 'connector' = 'filesystem', > 'path' = '/Users/chenyuzhao/workspace/hudi-demo/source.data', > 'format' = 'debezium-json' > );[INFO] Execute statement succeed.

Execute SELECT and observe the results. You can see that there are 20 records in total, some UPDATEs in the middle, and the last message is DELETE

  1. Flink SQL> select * from debezium_source;+ ----+-------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+| op | id | ts | name | description | weight |+----+-------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+| +I | 101 | 1000 | scooter | Small 2-wheel scooter | 3.140000104904175 | | 102 | 2000 | car battery | 12V car battery | 8.100000381469727 || +I | 103 | 3000 | 12-pack drill bits | 12-pack of drill bits with... | 0.800000011920929 || carpenter's hammer | 0.75 || +I | 105 | 5000 | hammer | 14oz carpenter's hammer | 0.875 || +I | 106 | 6000 | hammer | 16oz carpenter's hammer | 1.0 || +I | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 || +I | 108 | 8000 | jacket | water resistent black wind ... | 0.10000000149011612 || +I | 109 | 9000 | spare tire | 24 inch spare tire | 22.200000762939453 || -U | 106 | 6000 | hammer | 16oz carpenter's hammer | 1.0 || +U | 106 | 10000 | hammer | 18oz carpenter hammer | 1.0 || -U | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 || +U | 107 | 11000 | rocks | box of assorted rocks | 5.099999904632568 || +I | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 || +I | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 || -U | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 || +U | 110 | 14000 | jacket | new water resistent white w... | 0.5 || -U | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 || +U | 111 | 15000 | scooter | Big 2-wheel scooter | 5.170000076293945 || -D | 111 | 16000 | scooter | Big 2-wheel scooter | 5.170000076293945 |+----+-------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+Received a total of 20 rows  

Create a Hudi table, set the table mode to MERGE_ON_READ and enable the changelog mode property changelog.enabled

  1. Flink SQL> CREATE   TABLE hoodie_table(> id INT   NOT   NULL   PRIMARY   KEY   NOT ENFORCED,> ts BIGINT ,> name STRING,> description STRING,> weight DOUBLE > ) WITH (> 'connector' = 'hudi' ,> 'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1' ,> 'table.type' = 'MERGE_ON_READ' ,> 'changelog.enabled' = 'true' ,> 'compaction.async.enabled' = 'false' > );[INFO] Execute statement succeed.

Query

Import data into Hudi through INSERT statements, enable streaming mode, and execute queries to observe the results.

<<:  5G in numbers: 5G trends revealed by statistics in the first half of 2021

>>:  Kuaishou builds a real-time data warehouse scenario-based practice based on Flink

Recommend

Don’t let digital experience get in the way of your business strategy

Do you remember the last time you expressed your ...

Can the interviewer ping 127.0.0.1 after being disconnected from the Internet?

When interviewing for network-related positions, ...

Learn about routers, switches, and network hardware

Today we're taking a look at home network har...

Survey: Germany more dependent on Huawei 5G equipment than before

Germany is even more reliant on Huawei for its 5G...

Differences between Single Mode Fiber and Multimode Fiber

What is Fiber Optic? Fiber optics is a type of ne...

Review: China ranks first in 5G mobile phone sales, is the world happy too?

After the Spring Festival of the Year of the Ox, ...

Eight facts about data center design and construction

This article points out eight facts in data cente...

The three major operators unveiled their latest 5G strategies

As the global 5G latest version standard is locke...

Top 10 technology trends governments should watch in 2021

[[391593]] The pandemic has forced government age...

SD-WAN: A killer way to improve network flexibility and efficiency

Wide area networks are generally used to connect ...