Flink's general method for calculating Pv and Uv

Flink's general method for calculating Pv and Uv

[[432405]]

PV (Visit Volume): Page View, which is the number of page views or clicks. It is calculated every time the user refreshes the page.

UV (Unique Visitor): A computer client that visits your website is a visitor. The same client is only counted once between 00:00 and 24:00.

Calculating the real-time PV and UV of website apps is a very common statistical requirement. Here we provide a general calculation method that can be used directly with only minor modifications to meet different business needs.

need

Use Flink to perform real-time statistics from 0 o'clock to the current pv and uv.

1. Demand Analysis

The data sent from Kafka contains: timestamp, time, dimension, and user id. It is necessary to count the pv and uv from 0:00 to the current time from different dimensions, and start counting the next day again at 0:00 the next day.

2. Technical Solution

Kafka data may be delayed and out of order, so watermark is introduced here;

Divide the data into different scrolling windows through keyBy, and calculate pv and uv in each window;

Since the state of one day needs to be saved, ValueState is used in the process to save pv and uv;

Use BitMap type ValueState, which takes up very little memory and introduces dependencies that support bitmap;

To save the status, you need to set the ttl expiration time, and expire the first day's status on the second day to avoid excessive memory usage.

3. Data Preparation

Here we assume that it is user order data, and the data format is as follows:

  1. { "time" : "2021-10-31 22:00:01" , "timestamp" : "1635228001" , "product" : "Apple phone" , "uid" :255420}
  2. { "time" : "2021-10-31 22:00:02" , "timestamp" : "1635228001" , "product" : "MacBook Pro" , "uid" :255421}

4. Code Implementation

The screenshot of the entire project code is as follows (some information that is not convenient for public disclosure is erased):

pvuv-project

1. Environment

kafka:1.0.0;

Flink: 1.11.0;

2. Send test data

First, send data to the Kafka test cluster, Maven dependencies:

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.4.1</version>
  5. </dependency>

2.4.1

Send code:

  1. import com.alibaba.fastjson.JSON;
  2. import com.alibaba.fastjson.JSONObject;
  3. import jodd.util.ThreadUtil;
  4. import org.apache.commons.lang3.StringUtils;
  5. import org.junit.Test;
  6.  
  7. import java.io.*;
  8.  
  9. public class SendDataToKafka {
  10.  
  11. @Test
  12. public void sendData() throws IOException {
  13. String inpath = "E:\\My File\\click.txt" ;
  14. String topic = "click_test" ;
  15. int cnt = 0;
  16. String line;
  17. InputStream inputStream = new FileInputStream(inpath);
  18. Reader reader = new InputStreamReader(inputStream);
  19. LineNumberReader lnr = new LineNumberReader(reader);
  20. while ((line = lnr.readLine()) != null ) {
  21. // The KafkaUtil here is a producer and consumer tool class that can be implemented by yourself
  22. KafkaUtil.sendDataToKafka(topic, String.valueOf(cnt), line);
  23. cnt = cnt + 1;
  24. ThreadUtil.sleep(100);
  25. }
  26. }
  27. }

3. Main procedures

First define a pojo:

  1. @NoArgsConstructor
  2. @AllArgsConstructor
  3. @Data
  4. @ToString
  5. public class UserClickModel {
  6. private String date ;
  7. private String product;
  8. private int uid;
  9. private int pv;
  10. private int uv;
  11. }

The next step is to use Flink to consume Kafka, specify Watermark, split the data using KeyBy, and enter the rolling window function to save pv and uv through the state.

  1. public class UserClickMain {
  2.  
  3. private static final Map<String, String> config = Configuration.initConfig( "commons.xml" );
  4.  
  5. public   static void main(String[] args) throws Exception {
  6.  
  7. // Initialize the environment and configure related properties
  8. StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
  9. senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  10. senv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
  11. senv.setStateBackend(new FsStateBackend( "hdfs://bigdata/flink/checkpoints/userClick" ));
  12.  
  13. // Read kafka
  14. Properties kafkaProps = new Properties();
  15. kafkaProps.setProperty( "bootstrap.servers" , config.get( "kafka-ipport" ));
  16. kafkaProps.setProperty( "group.id" , config.get( "kafka-groupid" ));
  17. // kafkaProps.setProperty( "auto.offset.reset" , "earliest" );
  18.  
  19. // watrmark allows data delay time
  20. long maxOutOfOrderness = 5 * 1000L;
  21. SingleOutputStreamOperator<UserClickModel> dataStream = senv.addSource(
  22. new FlinkKafkaConsumer<>(
  23. config.get( "kafka-topic" ),
  24. new SimpleStringSchema(),
  25. kafkaProps
  26. ))
  27. //Set watermark
  28. .assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofMillis(maxOutOfOrderness))
  29. .withTimestampAssigner((element, recordTimestamp) -> {
  30. // Timestamp must be in milliseconds
  31. return Long.valueOf(JSON.parseObject(element).getString( "timestamp" )) * 1000;
  32. })).map(new FCClickMapFunction()). returns (TypeInformation. of (new TypeHint<UserClickModel>() {
  33. }));
  34.  
  35. // Group by ( date , product)
  36. dataStream.keyBy(new KeySelector<UserClickModel, Tuple2<String, String>>() {
  37. @Override
  38. public Tuple2<String, String> getKey(UserClickModel value) throws Exception {
  39. return Tuple2. of (value.getDate(), value.getProduct());
  40. }
  41. })
  42. // One day is the window, and the specified time starting point is 8 hours earlier than the timestamp time
  43. .window(TumblingEventTimeWindows. of ( Time .days(1), Time .hours(-8)))
  44. // Trigger calculation once every 10 seconds and update statistical results
  45. . trigger (ContinuousEventTimeTrigger. of ( Time .seconds(10)))
  46. // Calculate pv uv
  47. .process(new MyProcessWindowFunctionBitMap())
  48. // Save the results to mysql
  49. .addSink(new FCClickSinkFunction());
  50.  
  51. senv.execute (UserClickMain.class.getSimpleName());
  52. }
  53. }

The code is pretty standard, but there are a few things to note.

Notice

Set watermark. Use WatermarkStrategy in flink1.11. The old one has been deprecated.

The timestamp in my data is in seconds, so I need to multiply it by 1000. Flink extracts the time field, which must be in milliseconds.

.window only passes in one parameter, indicating that it is a tumbling window, TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)). Here, the window size is specified as one day. Since Beijing time in China is Eastern Time Zone 8, which is 8 hours earlier than the international time, an offset needs to be introduced. You can enter the source code of this method to view the English comments.

Rather than that, if you are living in somewhere which is not using UTC±00:00 time,

* such as China which is using UTC+08:00, and you want a time window with size of one day,

* and window begins at every 00:00:00 of local time, you may use {@code of(Time.days(1),Time.hours(-8))}.

* The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.

It is obviously unreasonable to trigger the calculation once a day for a window of one day according to the watermark mechanism. We need to use the trigger function to specify the trigger interval as 10s, so that our pv and uv are updated every 10s.

4. Key code, calculate uv

Since the user ID here is just a number, bitmap can be used for deduplication. The simple principle is: use user_id as the offset of the bit, set it to 1 to indicate access, and use 1 MB of space to store the daily access counts of more than 8 million users.

Redis comes with its own bit data structure, but in order to rely as little as possible on external storage media, we implement bit ourselves here and introduce the corresponding Maven dependency:

  1. <dependency>
  2. <groupId>org.roaringbitmap</groupId>
  3. <artifactId>RoaringBitmap</artifactId>
  4. <version>0.8.0</version>
  5. </dependency>

The codes for calculating pv and uv are actually universal and can be quickly modified according to your actual business situation:

  1. public class MyProcessWindowFunctionBitMap extends ProcessWindowFunction<UserClickModel, UserClickModel, Tuple<String, String>, TimeWindow> {
  2.  
  3. private transient ValueState< Integer > pvState;
  4. private transient ValueState<Roaring64NavigableMap> bitMapState;
  5.  
  6. @Override
  7. public void open (Configuration parameters) throws Exception {
  8. super. open (parameters);
  9. ValueStateDescriptor< Integer > pvStateDescriptor = new ValueStateDescriptor<>( "pv" , Integer .class);
  10. ValueStateDescriptor<Roaring64NavigableMap> bitMapStateDescriptor = new ValueStateDescriptor( "bitMap"  
  11. , TypeInformation. of (new TypeHint<Roaring64NavigableMap>() {}));
  12.  
  13. // Clear expired status
  14. StateTtlConfig stateTtlConfig = StateTtlConfig
  15. .newBuilder( Time .days(1))
  16. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  17. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  18. .build();
  19. // Enable ttl
  20. pvStateDescriptor.enableTimeToLive(stateTtlConfig);
  21. bitMapStateDescriptor.enableTimeToLive(stateTtlConfig);
  22.  
  23. pvState = this.getRuntimeContext().getState(pvStateDescriptor);
  24. bitMapState = this.getRuntimeContext().getState(bitMapStateDescriptor);
  25. }
  26.  
  27. @Override
  28. public void process(Tuple2<String, String> key , Context context, Iterable<UserClickModel> elements, Collector<UserClickModel> out ) throws Exception {
  29.  
  30. // Current state pv uv
  31. Integer pv = pvState.value();
  32. Roaring64NavigableMap bitMap = bitMapState.value();
  33. if (bitMap == null ) {
  34. bitMap = new Roaring64NavigableMap();
  35. pv = 0;
  36. }
  37.  
  38. Iterator<UserClickModel> iterator = elements.iterator();
  39. while (iterator.hasNext()){
  40. pv = pv + 1;
  41. int uid = iterator.next () .getUid ();
  42. //If userId can be converted to long
  43. bitMap.add (uid);
  44. }
  45.  
  46. // Update pv
  47. pvState.update (pv);
  48.  
  49. UserClickModel UserClickModel = new UserClickModel();
  50. UserClickModel.setDate( key .f0);
  51. UserClickModel.setProduct( key .f1);
  52. UserClickModel.setPv(pv);
  53. UserClickModel.setUv(bitMap.getIntCardinality());
  54.  
  55. out .collect(UserClickModel);
  56. }
  57. }

Notice

Since the first day's data is no longer needed when calculating UV for the second day, the previous day's status in the memory must be cleared in time and expired through the TTL mechanism;

The final results are saved in MySQL. If there are too many data result classifications and aggregations, pay attention to the pressure on MySQL, which can be optimized by yourself;

5. Other methods

In addition to using bitmap for deduplication, you can also use Flink SQL, which has simpler coding, and you can also use external media Redis for deduplication:

  • Set-based
  • Based on bit
  • Based on HyperLogLog
  • Based on bloomfilter

The specific idea is to calculate pv and uv and put them into redis, and then get the values ​​to save the statistical results, which is also commonly used.

<<:  Briefly describe the binary search algorithm and time complexity, and implement a binary search algorithm

>>:  After working for more than 6 years, I still don’t understand the principles and techniques of coroutines

Recommend

Enterprises want to formalize WFH network architecture

Enterprises are transforming their networks to be...

IPv6 conversion service - rapid business support for IPv6 practice

What is IPv6 conversion service? IPv6 Translation...

5G network construction: NSA or SA first?

Different from the era from 3G to 4G, the evoluti...

The ultimate solution to the problem that Github cannot display images

[[379338]] Preface Not long ago, I encountered th...

Teach you how to solve the WiFi coverage problem in various apartment types

Weekend nights are all about having fun! Playing ...

Machine Learning in Blockchain Technology

Modernity brought new and groundbreaking things t...

Review and Outlook of China's 100-day 5G Licensing

Since June 6, when the Ministry of Industry and I...

Riverbed Digital Experience Management

Today, most businesses realize that in order to a...