Preface As the development of mobile terminals moves towards saturation, the entire IT industry is now looking forward to the era of the Internet of Things where "everything is connected". In the Internet of Things scenario, there are often many different types of terminal devices deployed in different locations to collect various data. For example, there are 100,000 loT devices in a certain area, and each loT device sends data every 5 seconds. Then 630.7 billion data points will be generated every year. And these data are all generated sequentially, and the format of the data generated by the loT devices is all consistent, and there is no need to delete or modify. For such a scenario of timely mass writing without update, time series databases came into being. Time series databases pursue fast writing, high compression, and fast data retrieval, assuming that there is no need to insert or update data and that the data structure is stable. The Label (tag) of time series data will be indexed to improve query performance so that you can quickly find values that match all specified labels. If the number of Label (tag) values is too large (high cardinality problem), various problems will arise with the index. This article mainly discusses some feasible solutions for influxdb when encountering high cardinality problems in the written data. High Cardinality Problem (Timeline Dilation) Time series databases mainly store metric data. Each piece of data is called a sample, which consists of the following three parts: Metrics (time-series): metric name and labelsets describing the characteristics of the current sample; node_cpu{cpu="cpu0",mode="idle"} @1627339366586 70 node_cpu{cpu="cpu0",mode="sys"} @1627339366586 5 node_cpu{cpu="cpu0",mode="user"} @1627339366586 25 Usually, the labellsets in time-series are finite and enumerable. For example, in the example above, the optional values of model are idle, sys, and user. Prometheus official documentation recommends labels: CAUTION: Remember that every unique combination of key-value label pairs represents a new time series, which can dramatically increase the amount of data stored. Do not use labels to store dimensions with high cardinality (many different label values), such as user IDs, email addresses, or other unbounded sets of values. The design of time series databases also assumes low cardinality of timelines. However, with the widespread use of metrics, timeline expansion is unavoidable in many scenarios. For example, in cloud-native scenarios, tags include pod/container IDs, some tags include userIds, and some tags are even URLs. When these tags are combined, the timeline expands significantly. This contradiction is inevitable. How to solve it? Should the data writer adjust the number of time-series written when writing data, or should the time series database change its design to adapt to this scenario? There is no perfect solution to this problem. We need to strike a balance. In practical terms, if the timeline is expanded, the time series database will not become unavailable and the performance will not decrease exponentially. In other words, when the timeline is not expanded, the performance is excellent. After the timeline is expanded, the performance can be good or passable. So how can we make the time series database perform well when the timeline is expanded? Next, we will discuss this issue through the source code of InfluxDB. Timeline processing logic The TSM structure of InfluxDB has a main logical processing process similar to LSM. After the data is reported, it will be added to the cache and log file (WAL). In order to speed up the retrieval speed or compress the ratio, the reported data will be compacted (data files will be merged and the index will be rebuilt). Indexing involves three aspects: TSI (Time Series Index) retrieves Measurement, tag, tagval, time When the timeline is expanded, the retrieval performance of TSI and TSM does not deteriorate seriously. The problem mainly occurs in the Series Segment Index. In this section we will discuss the positive index of the timeline file of InfluxDB (time-series key ->id, id->time-series key): SeriesFile is at the Database (bucket) level. The specific code (influxdb 2.0.7) is as follows: tsdb/series_partition.go:30 // SeriesPartition represents a subset of series file data. type SeriesPartition struct { ... segments []*SeriesSegment index *SeriesIndex seq uint64 // series id sequence .... } tsdb/series_index.go:36 // SeriesIndex represents an index of key-to-id & id-to-offset mappings. type SeriesIndex struct { path string ... data []byte // mmap data keyIDData []byte // key/id mmap data idOffsetData []byte // id/offset mmap data // In-memory data since rebuild. keyIDMap *rhh.HashMap idOffsetMap map[uint64]int64 tombstones map[uint64]struct{} } When searching for a series key, it will first search in the memory map and then search on the disk map. The specific implementation code is as follows: tsdb/series_index.go:185 func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte) uint64 { // Memory map search if v := idx.keyIDMap.Get(key); v != nil { if id, _ := v.(uint64); id != 0 && !idx.IsDeleted(id) { return id } } if len(idx.data) == 0 { return 0 } hash := rhh.HashKey(key) for d, pos := int64(0), hash&idx.mask; ; d, pos = d+1, (pos+1)&idx.mask { // Disk map search offset elem := idx.keyIDData[(pos * SeriesIndexElemSize):] elemOffset := int64(binary.BigEndian.Uint64(elem[:8])) if elemOffset == 0 { return 0 } // Get the id of the object through offset elemKey := ReadSeriesKeyFromSegments(segments, elemOffset+SeriesEntryHeaderSize) elemHash := rhh.HashKey(elemKey) if d > rhh.Dist(elemHash, pos, idx.capacity) { return 0 } else if elemHash == hash && bytes.Equal(elemKey, key) { id := binary.BigEndian.Uint64(elem[8:]) if idx.IsDeleted(id) { return 0 } return id } } } Here is a knowledge point to add, the implementation of converting the memory hashmap to the disk hashmap. We all know that the storage of hashmap is an array. The implementation in influxdb is to map the disk space through mmap (see keyIDData of SeriesIndex), and then access the array address through hash, using Robin Hood Hashing, which conforms to the principle of memory locality (the code of the search logic is in series_index.go above). The developers spent a lot of effort to manually transplant the Robin Hood Hashtable to the disk hashtable. How are the memory map and disk map generated? Why are two maps needed? InfluxDB puts the newly added series key into the memory hashmap first. When the memory hashmap grows larger than the threshold, it merges the memory hashmap and the disk hashmap (traverses all SeriesSegments and filters the deleted series keys) to generate a new disk hashmap. This process is called compaction. After the compaction, the memory hashmap is cleared and the newly added series key continues to be stored. tsdb/series_partition.go:200 // Check if we've crossed the compaction threshold. if p.compactionsEnabled() && !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) && p.compactionLimiter.TryTake() { p.compacting = true log, logEnd := logger.NewOperation(context.TODO(), p.Logger, "Series partition compaction", "series_partition_compaction", zap.String("path", p.path)) p.wg.Add(1) go func() { defer p.wg.Done() defer p.compactionLimiter.Release() compactor := NewSeriesPartitionCompactor() compactor.cancel = p.closing if err := compactor.Compact(p); err != nil { log.Error("series partition compaction failed", zap.Error(err)) } logEnd() // Clear compaction flag. p.mu.Lock() p.compacting = false p.mu.Unlock() }() } tsdb/series_partition.go:569 func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error { hdr := NewSeriesIndexHeader() hdr.Count = seriesN hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor) // Allocate space for maps. keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) // Reindex all partitions. var entryN int for _, segment := range segments { errDone := errors.New("done") if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error { ... // Save max series identifier processed. hdr.MaxSeriesID, hdr.MaxOffset = id, offset // Ignore entry if tombstoned. if index.IsDeleted(id) { return nil } // Insert into maps. c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset) return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id) }); err == errDone { break } else if err != nil { return err } } This design has two flaws: When doing compaction, when io accesses the SeriesSegments file, all series keys are loaded into memory, a new hashtable is constructed, and then the hashtable is mmap-stored to disk. When the series key exceeds tens of millions or more, insufficient memory and OOM problems may occur. Possible solutions 1. Add partition or database The forward index of influxdb is at the database level. There are two ways to reduce the memory usage during compaction. One is to increase the number of partitions or to allocate multiple measurements to different databases. But the problem with this is that it is difficult for InfluxDB, which already has data, to adjust the two data. 2. Modify the timeline storage strategy We know that hash index is an O1 query and is very efficient, but there is a capacity expansion problem for growing data. So let's make a compromise. When the partition is larger than a certain threshold, the hash index is converted to a b+tree index. B+tree has limited performance degradation for data expansion, is more suitable for high cardinality problems, and no longer requires global compaction. 3. Sink the positive index of the series key to the shard level Each shard in influxdb has a time interval, and the timeline data within a certain time interval is not large. For example, the database stores a 180-day series key, while the shard generally only has a span of one day or even one hour. The series keys stored in the two are 1 to 2 orders of magnitude different. In addition, sinking the series key positive index to the shard level is more friendly to the deletion operation. When the shard expires and is deleted, all series keys of the current shard will be diffed with other shards, and the series key will be deleted when it does not exist. 4. Modify the timeline storage strategy based on Measurement In the actual production environment, timeline expansion is closely related to measurement. Generally, only a few measurements have timeline expansion problems, but most measurements do not have timeline explosion problems. When performing compaction on the forward index of a series key, we can add measurement timeline statistics. If the timeline of a measurement expands, we can switch all series keys of this measurement to B+ tree. The series keys that do not expand continue to use hash indexes. This solution has better performance than the second solution, but the development cost will be higher. At present, the high cardinality problem is mainly reflected in the series key positive index. I personally think that in the short term, the second solution should be used to transition to the fourth solution. This can better solve the problem of timeline growth, with little performance degradation and low cost. The third solution has a relatively large change and a more reasonable design, which can be used as a long-term repair solution. Summarize This article mainly uses InfluxDB to explain the high cardinality problem of time series databases and feasible solutions. The explosion of metric dimensions leads to the problem of data linear expansion. Many students think that this is a misuse or abuse of time series databases. However, with the explosion of information data today, it is very costly to make data dimensions converge and not diverge, even much higher than the cost of data storage. I personally think that we need to use a divide-and-conquer approach to this problem and improve the tolerance of the time series database to dimensional explosion. In other words, after the timeline expands, the time series database will not crash, and the metric that has not expanded the timeline will continue to run efficiently, while the metric that has expanded the timeline may experience performance degradation, but it will not be a linear degradation. Improving tolerance to timeline expansion and controlling the explosion radius of timeline expansion will become the core capabilities of the time series database. I am new to Golang and I am using the source code of InfluxDB to practice. Special thanks to Bo Shu, Li Zi, and Ren Jie for helping me explain InfluxDB and discuss this issue. |
<<: Gartner predicts: Global 5G network infrastructure revenue will grow 39% in 2021
>>: Gartner: 5G network infrastructure revenue to grow by more than $5 billion in 2021
[[346837]] After 2019, the first year of 5G, and ...
【51CTO.com original article】In 2019, IPv6 transfo...
DMIT has released two special Christmas packages,...
Since most 5G networks are deployed using the 3.5...
Received a promotional email from Justhost.ru. Fr...
Technology is always evolving faster and becoming...
On June 6, 2019, China officially issued 5G licen...
It’s now widely accepted that monitoring is only ...
The last time I shared information about Anynode ...
"5G is like a newborn baby, new from head to...
[[441504]] 5G remote ultrasonic robot diagnostic ...
The future quantum internet will be faster and mo...
Megalayer has launched its September event, offer...
[[408334]] In recent years, the development of te...
In recent years, the industrial Internet has grow...