This article introduces how Flink Hudi continuously optimizes and evolves the original mini-batch-based incremental computing model through stream computing. Users can use Flink SQL to write CDC data to Hudi storage in real time, and the upcoming 0.9 version of Hudi natively supports the CDC format. The main contents are: 1. Background 2. Incremental ETL 3. Demonstration 1. Background
Near real-time Since 2016, the Apache Hudi community has been exploring use cases for near-real-time scenarios through Hudi's UPSERT capability[1]. Through the MR/Spark batch processing model, users can implement hourly data injection into HDFS/OSS. In pure real-time scenarios, users can implement end-to-end second-level (5-minute-level) real-time analysis through the architecture of stream computing engine Flink + KV/OLAP storage. However, there are still a large number of use cases in scenarios from seconds (5 minutes) to hours, which we call NEAR-REAL-TIME. In practice, there are a large number of cases that fall into the category of near real-time: Minute-level large screen; Various BI analyses (OLAP); Minute-level feature extraction for machine learning. Incremental calculation The solution to near real-time is currently relatively open. The latency of stream processing is low, but the SQL pattern is relatively fixed and the query-side capabilities (index, ad hoc) are lacking; The batch processing data warehouse has rich capabilities but large data latency. Therefore, the Hudi community proposed an incremental computing model based on mini-batch: Incremental data set => incremental calculation results merge existing results => external storage This model pulls incremental data sets (data sets before two commits) through snapshots stored in the lake, calculates incremental results (such as simple counts) through batch processing frameworks such as Spark/Hive, and then merges them into the stored results. Core Issues The core issues that the incremental model needs to solve are: UPSERT capability : Similar to KUDU and Hive ACID, Hudi also provides minute-level update capabilities; Incremental consumption : Hudi provides incremental pulls through multiple snapshots of lake storage. The mini-batch-based incremental computing model can improve latency in some scenarios and save computing costs, but it has a big limitation: it has requirements for SQL patterns. Because the calculation is done in batches, batch calculations themselves do not maintain status, which requires that the calculated indicators can be merged more conveniently. Simple count and sum can be done, but avg and count distinct still require pulling the full amount of data for recalculation. With the popularity of stream computing and real-time data warehouses, the Hudi community is also actively embracing change, and continuously optimizing and evolving the original mini-batch-based incremental computing model through stream computing: version 0.7 introduced streaming data into the lake, and version 0.9 supported the native CDC format. 2. Incremental ETL
DB data into the lake As CDC technology matures, CDC tools such as debezium are becoming more and more popular, and the Hudi community has also integrated stream writing and stream reading capabilities. Users can use Flink SQL to write CDC data to Hudi storage in real time: Users can directly import DB data into Hudi through the Flink CDC connector; You can also import CDC data into Kafka first, and then import it into Hudi through the Kafka connector. The second solution has better fault tolerance and scalability. Data Lake CDC In the upcoming version 0.9, Hudi natively supports the CDC format, and all changes to a record can be saved. Based on this, the combination of Hudi and the stream computing system is more complete, and CDC data can be read in a streaming manner [2]: All message changes of the source CDC stream are saved after entering the lake and used for streaming consumption. Flink's stateful computing accumulates the computing results (state) in real time, and synchronizes the computing changes to the Hudi lake storage through streaming writing Hudi, and then continues to connect Flink streaming consumption Hudi storage changelog to achieve the next level of stateful computing. Near real-time end-to-end ETL pipeline: This architecture shortens the end-to-end ETL latency to minutes, and the storage format of each layer can be compressed into column storage (Parquet, ORC) through compaction to provide OLAP analysis capabilities. Due to the openness of the data lake, the compressed format can be connected to various query engines: Flink, Spark, Presto, Hive, etc. A Hudi data lake table has two forms: Table format: query the latest snapshot results and provide efficient column storage format Streaming mode: Streaming consumption changes, you can specify the changelog after any point of stream reading 3. Demonstration We use a demo to demonstrate the two forms of Hudi tables. Environment Preparation Flink SQL Client Hudi master packages hudi-flink-bundle jar Flink 1.13.1 Here is a section of CDC data in debezium-json format prepared in advance - {"before":null,"after":{"id":101,"ts":1000,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","na me":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0, "gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c"," ts_ms":1589355606100,"transaction":null}{"before":null,"after":{"id":102,"ts":2000,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","nam e":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"g tid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ ms":1589355606101,"transaction":null}{"before":null,"after":{"id":103,"ts":3000,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbse rver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"f ile":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":158935560610 1,"transaction":null}{"before":null,"after":{"id":104,"ts":4000,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1", "ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file": "mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101, "transaction":null}{"before":null,"after":{"id":105,"ts":5000,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1" ,"ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file" :"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101, "transaction":null}{"before":null,"after":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file": "mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101 ,"transaction":null}{"before":null,"after":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"db server1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null," file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":158935560610 1,"transaction":null}{"before":null,"after":{"id":108,"ts":8000,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","n ame":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0, "gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","t s_ms":1589355606101,"transaction":null}{"before":null,"after":{"id":109,"ts":9000,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name": "dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid" :null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1 589355606101,"transaction":null}{"before":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"ts":10000,"name":"hammer","description":"18oz carpenter hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","t s_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"g tid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms": 1589361987936,"transaction":null}{"before":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"ts":11000,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserv er1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gt id":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":158936209 9505,"transaction":null}{"before":null,"after":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbs erver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344," gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362 210230,"transaction":null}{"before":null,"after":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserve r1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344, "gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms": 1589362243428,"transaction":null}{"before":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"ts":14000,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1"," ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gt id":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1 589362293539,"transaction":null}{"before":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"ts":15000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserve r1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344 ,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms" :1589362330904,"transaction":null}{"before":{"id":111,"ts":16000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connect or":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventor y","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2 443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null
Create a table through Flink SQL Client to read CDC data files - Flink SQL > CREATE TABLE debezium_source( > id INT NOT NULL, > ts BIGINT, > name STRING, > description STRING, > weight DOUBLE > ) WITH ( > 'connector' = 'filesystem', > 'path' = '/Users/chenyuzhao/workspace/hudi-demo/source.data', > 'format' = 'debezium-json' > );[INFO] Execute statement succeed.
Execute SELECT and observe the results. You can see that there are 20 records in total, some UPDATEs in the middle, and the last message is DELETE - Flink SQL> select * from debezium_source;+
Create a Hudi table, set the table mode to MERGE_ON_READ and enable the changelog mode property changelog.enabled - Flink SQL> CREATE TABLE hoodie_table(> id INT NOT NULL PRIMARY KEY NOT ENFORCED,> ts BIGINT ,> name STRING,> description STRING,> weight DOUBLE > ) WITH (> 'connector' = 'hudi' ,> 'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1' ,> 'table.type' = 'MERGE_ON_READ' ,> 'changelog.enabled' = 'true' ,> 'compaction.async.enabled' = 'false' > );[INFO] Execute statement succeed.
Query Import data into Hudi through INSERT statements, enable streaming mode, and execute queries to observe the results. |