前言
- influxdb安装和使用
- influxdb概念详解1
- influxdb概念详解2
- influxdb源码编译
- influxdb启动分析
- influxdb源码分析-meta部分
- infludb源码分析-数据写入
- influxdb数据写入细节
- influxdb源码解析-series
- influxdb源码解析-inmem index
这是一个influxdb 源码分析的系列文章,在此之前,已经分析过了meta,数据写入,series,index的inmem 实现。阅读本篇文章需要首先阅读上一篇influxdb源码解析-inmem index 。因为上一章文章里面有一些上下文在里面,但是篇幅和内容的关系,分为了两章来写。
上一章提到了一些基本的信息,在这里再回顾一下。
- Index在influxdb里面被Shard等结构依赖。
- influxdb 在tsdb/index.go中定义了一个index应该实现的方法,是一个顶层的抽象。
- 在influxdb的tsdb/index模块下,有两种具体的实现,分别是inmem index和tsi index
上一章的下半部分,分析了inmem index的结构和一些常用的方法,可以看到inmem index主要功能是对series和measurement做了索引和cache。但是从整个索引功能角度来说,有一些缺点:
- 不能持久化,索引信息是一种结构化了的信息,每次启动需要重建。不能利用上次停机前的信息。
- 功能有限,比如在查询是,按照tagk,或者tagv来做匹配,这时候inmem index就比较费力。
- cache 大小不容易控制。
基于以上几个缺点,influxdb 实现了一个可以持久化,并且更加功能强大的索引:TSI(Time Series Index).这个实现支持了持久化,倒排索引,以及索引自身的一些compact,来控制占用的内存大小。
TSI
对于TSI 的处理也是一个LSM-Tree类型的存储系统。那么就会有常见的组件,比如WAL,Cache,Memtable,SSTable等。从这些组件出来,能够更好地理解整个TSI 的设计。首先看WAL的部分
TSI WAL
这里如果你不是很清楚WAL是什么,那么建议先去看看LSM-Tree相关的资料(当然了,WAL并不是和LSM-Tree强绑定的,只是大部分都有)。WAL的抽象在tsi index中位于tsd/tsi1/log_file.go中。PS:部分结构删减了,详细的可以去翻一下源码。
// LogFile represents an on-disk write-ahead log file.
type LogFile struct {
id int // file sequence identifier
data []byte // mmap
file *os.File // writer
w *bufio.Writer // buffered writer
buf []byte // marshaling buffer
keyBuf []byte
sfile *tsdb.SeriesFile // series lookup
size int64 // tracks current file size
modTime time.Time // tracks last time write occurred
// In-memory series existence/tombstone sets.
seriesIDSet, tombstoneSeriesIDSet *tsdb.SeriesIDSet
// In-memory index.
mms logMeasurements
// Filepath to the log file.
path string
}
在这个结构中,有几个比较重要的:
- id,data,file,这是当前的log file的基本信息。
- sfile 是log file对应的series 信息
- mms logMeasurements。代表了这个log-file写入的所有的measurement相关信息。
logMeasurement:倒排索引
从上面的结构中,可以看到一个log-file主要提供了series和measurement相关信息。这里的logMeasurement 是一个关于measurement->tagk->tagv->seriesId的倒排索引。可以从代码中看到:
type logMeasurement struct {
name []byte
tagSet map[string]logTagKey
deleted bool
series map[uint64]struct{}
seriesSet *tsdb.SeriesIDSet
}
type logTagKey struct {
name []byte
deleted bool
tagValues map[string]logTagValue
}
type logTagValue struct {
name []byte
deleted bool
// series id set
series map[uint64]struct{}
seriesSet *tsdb.SeriesIDSet
}
这个结构可以支持根据tagk,tagv,measurement 快速匹配seriesId的功能。
NewLogFile和Open
NewLogFile是从给定的参数新建LogFile结构,这个没啥好说的。
func NewLogFile(sfile *tsdb.SeriesFile, path string) *LogFile {
return &LogFile{
sfile: sfile,
path: path,
mms: make(logMeasurements),
seriesIDSet: tsdb.NewSeriesIDSet(),
tombstoneSeriesIDSet: tsdb.NewSeriesIDSet(),
}
}
Open是把这个结构的服务启动。读过之前的文章的人可能都明白,Influxdb 的代码风格非常一致,NewXX是初始化结构里面的字段,Open是启动服务。看一下这个函数做了啥:
func (f *LogFile) open() error {
f.id, _ = ParseFilename(f.path)
// Open file for appending.
file, err := os.OpenFile(f.Path(), os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return err
}
f.file = file
if f.bufferSize == 0 {
// 4k
f.bufferSize = defaultLogFileBufferSize
}
f.w = bufio.NewWriterSize(f.file, f.bufferSize)
fi, err := file.Stat()
f.size = fi.Size()
f.modTime = fi.ModTime()
data, err := mmap.Map(f.Path(), 0)
f.data = data
var n int64
for buf := f.data; len(buf) > 0; {
var e LogEntry
if err := e.UnmarshalBinary(buf); err == io.ErrShortBuffer || err == ErrLogEntryChecksumMismatch {
break
} else if err != nil {
return err
}
f.execEntry(&e)
n += int64(e.Size)
buf = buf[e.Size:]
}
f.size = n
_, err = file.Seek(n, io.SeekStart)
return err
}
这个函数主要做了这些事情:
- 打开文件,做mmap,把数据映射进来。
- 遍历数据,反序列化成每个LogEntry
- 执行Entry
- 文件offset reset
前两点都很好理解,核心的逻辑在读取和执行。这里也说明了,LogFile文件是以LogEntry写入的。这里我们暂时先不关心LogEntry的具体格式和execEntry的逻辑,还是按照上一章的逻辑,因为index的核心成员是series和measurement结构,就看看是怎么处理这两个结构的。
createMeasurementIfNotExists
创建不存在的measurement,在inmem-index里,这里的逻辑相对简单,就是往map里面加东西。在这里也不例外:
func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement {
mm := f.mms[string(name)]
if mm == nil {
mm = &logMeasurement{
name: name,
tagSet: make(map[string]logTagKey),
series: make(map[uint64]struct{}),
}
f.mms[string(name)] = mm
}
return mm
}
这里创建出来的logMeasurement是一个不完整的,只有name,tag和series相关的信息是没有填充的,只是做了初始化。
AddSeriesList
新增series是,在inmem-index里面是委托给了seriesFile结构,然后更新自己的map信息。那么在tsi index怎么实现的呢?这个逻辑在AddSeriesList:
func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) ([]uint64, error) {
seriesIDs, err := f.sfile.CreateSeriesListIfNotExists(names, tagsSlice)
if err != nil {
return nil, err
}
var writeRequired bool
entries := make([]LogEntry, 0, len(names))
seriesSet.RLock()
for i := range names {
if seriesSet.ContainsNoLock(seriesIDs[i]) {
// We don't need to allocate anything for this series.
seriesIDs[i] = 0
continue
}
writeRequired = true
entries = append(entries, LogEntry{SeriesID: seriesIDs[i], name: names[i], tags: tagsSlice[i], cached: true, batchidx: i})
}
seriesSet.RUnlock()
// Exit if all series already exist.
if !writeRequired {
return seriesIDs, nil
}
f.mu.Lock()
defer f.mu.Unlock()
seriesSet.Lock()
defer seriesSet.Unlock()
for i := range entries { // NB - this doesn't evaluate all series ids returned from series file.
entry := &entries[i]
if seriesSet.ContainsNoLock(entry.SeriesID) {
// We don't need to allocate anything for this series.
seriesIDs[entry.batchidx] = 0
continue
}
if err := f.appendEntry(entry); err != nil {
return nil, err
}
f.execEntry(entry)
seriesSet.AddNoLock(entry.SeriesID)
}
// Flush buffer and sync to disk.
if err := f.FlushAndSync(); err != nil {
return nil, err
}
return seriesIDs, nil
}
这个函数有点长,来解释一下。
- 首先在前五行,也是把创建的任务委托给了SeriesFile,返回对应的seriesId
- 遍历所有的seriesID,检查之前是不是存在。这里判断存在的依据是这个seriesId是不是ContainsNoLock,和下面的是对应上的。
- 如果是需要被新建的,那么构建LogEntry结构。
- 遍历所有的entry,写入WAL并且调用execEntry
- 强制刷盘
前面三个步骤都很显而易见,注意第四步,entry是一条一条被写入的,并且是先写入到WAL,然后execEntry 为什么要这样做呢?这里留下个疑问,我们后面来解答。
所以到这里对series 信息的添加剂结束了。可以看出来比inmem-index还是复杂了不少的,主要在写WAL日志,execEntry等。
LogEntry
上面留下了个疑问,或者说到这里已经有很多疑问,首先LogEntry到底是个啥?我们先解决这个问题。LogFile是一条条LogEntry组成的。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ii2n3j2l-1651548844975)(/Users/bytedance/Library/Application Support/typora-user-images/image-20220501192045908.png)]
每次写入都是写入一个log_entry的结构,这个结构被定义在了tsdb/index/log_file/LogEntry中
type LogEntry struct {
Flag byte // flag
SeriesID uint64 // series id
Name []byte // measurement name
Key []byte // tag key
Value []byte // tag value
Checksum uint32 // checksum of flag/name/tags.
Size int // total size of record, in bytes.
cached bool // Hint to LogFile that series data is already parsed
name []byte // series naem, this is a cached copy of the parsed measurement name
tags models.Tags // series tags, this is a cached copied of the parsed tags
batchidx int // position of entry in batch.
}
主要有flag,seriesId ,name,key,value等结构。LogEntry有多种类型,默认是正常写入类型,由于LSM-Tree这种不支持乱序写入,所以更新和删除就变成了追加一个其他flag的log进来:
LogEntrySeriesTombstoneFlag = 0x01
LogEntryMeasurementTombstoneFlag = 0x02
LogEntryTagKeyTombstoneFlag = 0x04
LogEntryTagValueTombstoneFlag = 0x08
例如删除某个series,就会追加一个flag=LogEntrySeriesTombstoneFlag的logEntry到wal中,然后再compact的时候执行删除。LogEntry在文件中是有格式的,具体的格式从appendLogEntry可看出来。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FOOMnWpn-1651548844976)(/Users/bytedance/Library/Application Support/typora-user-images/image-20220501192844536.png)]
这里面seriesId做了varInt编码,其他的类似,详细的可以看一下代码。
说完LogEntry,那么接下来就是execEntry了。
execEntry
**execEntry的核心逻辑是,使用logEntry来更新LogFile的cache部分,也就是LogMeasurements那个倒排索引。**上面说到了LogEntry其实有多种类型,使用flag来区分。所以对于不痛的LogEntry有不同的逻辑,这里看一下插入类型的LogEntry执行。
func (f *LogFile) execEntry(e *LogEntry) {
switch e.Flag {
case LogEntryMeasurementTombstoneFlag:
f.execDeleteMeasurementEntry(e)
case LogEntryTagKeyTombstoneFlag:
f.execDeleteTagKeyEntry(e)
case LogEntryTagValueTombstoneFlag:
f.execDeleteTagValueEntry(e)
default:
f.execSeriesEntry(e)
}
}
重点看一下execSeriesEntry:
func (f *LogFile) execSeriesEntry(e *LogEntry) {
var seriesKey []byte
// Check if deleted.
deleted := e.Flag == LogEntrySeriesTombstoneFlag
// Read key size.
_, remainder := tsdb.ReadSeriesKeyLen(seriesKey)
// Read measurement name.
name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder)
mm := f.createMeasurementIfNotExists(name)
mm.deleted = false
if !deleted {
mm.addSeriesID(e.SeriesID)
} else {
mm.removeSeriesID(e.SeriesID)
}
// Read tag count.
tagN, remainder := tsdb.ReadSeriesKeyTagN(remainder)
// Save tags.
var k, v []byte
for i := 0; i < tagN; i++ {
k, v, remainder = tsdb.ReadSeriesKeyTag(remainder)
ts := mm.createTagSetIfNotExists(k)
tv := ts.createTagValueIfNotExists(v)
// Add/remove a reference to the series on the tag value.
if !deleted {
tv.addSeriesID(e.SeriesID)
} else {
tv.removeSeriesID(e.SeriesID)
}
ts.tagValues[string(v)] = tv
mm.tagSet[string(k)] = ts
}
// Add/remove from appropriate series id sets.
if !deleted {
f.seriesIDSet.Add(e.SeriesID)
f.tombstoneSeriesIDSet.Remove(e.SeriesID)
} else {
f.seriesIDSet.Remove(e.SeriesID)
f.tombstoneSeriesIDSet.Add(e.SeriesID)
}
}
代码不是很长,有删减,主要的逻辑是:
- 创建不存在的Measurement,并且确实entry的类型,更新seriesId。
- 遍历tagkv,更新logMeasurement
- 更新tomb标记。
看下来发现,这里的exec逻辑主要在更新LogFile对应的cache。也就是那个倒排索引。回到上面的问题:“entry是一条一条被写入的,并且是先写入到WAL,然后execEntry **” 这里就可以回到问题了。
- execEnty是在更新LogFile持有的倒排索引
- 先写入WAL是为了保证数据能被持久化,只有被持久化好的数据,才会被更新到cache里面,开始使用。
其实这种逻辑在LSM-Tree类型的存储里很常见,这就是WAL的作用。
TSI Cache
上面说完了WAL,接下来是Cache模块。Cache不是在tsdb/tsi1/cache.go中这个模块相对比较简单:
type TagValueSeriesIDCache struct {
sync.RWMutex
// 这个map是name->tagKey->tagValue->seriesId的映射
cache map[string]map[string]map[string]*list.Element
evictor *list.List
capacity int
}
核心的结构是一个多层map,维护了name-> tagk->tagV->seriesIDList的映射。而且这个cache是一个LRU Cache。
New和Get
主要看一下New和Get方法吧。
func NewTagValueSeriesIDCache(c int) *TagValueSeriesIDCache {
return &TagValueSeriesIDCache{
cache: map[string]map[string]map[string]*list.Element{},
evictor: list.New(),
capacity: c,
}
}
New 比较简单,就是初始化。
Get:
// exists.
func (c *TagValueSeriesIDCache) Get(name, key, value []byte) *tsdb.SeriesIDSet {
c.Lock()
defer c.Unlock()
return c.get(name, key, value)
}
// 这个cache是个lru cache
func (c *TagValueSeriesIDCache) get(name, key, value []byte) *tsdb.SeriesIDSet {
if mmap, ok := c.cache[string(name)]; ok {
if tkmap, ok := mmap[string(key)]; ok {
if ele, ok := tkmap[string(value)]; ok {
c.evictor.MoveToFront(ele) // This now becomes most recently used.
return ele.Value.(*seriesIDCacheElement).SeriesIDSet
}
}
}
return nil
}
Get方法也不难,通过信息找具体的值,注意c.evictor.MoveToFront(ele) 这个是把命中的元素放到最前面,这就是LRU的做法。
其他的操作都不是很难,可以看看。Cache到这里就结束了。
Index File
Index file结构是管理倒排索引的模块。算是LSM-Tree中的SSTable,这部分是会被持久化道磁盘上的。首先看一下index file的结构。
type IndexFile struct {
data []byte
sfile *tsdb.SeriesFile
tblks map[string]*TagBlock // tag blocks by measurement name
mblk MeasurementBlock
seriesIDSetData []byte
tombstoneSeriesIDSetData []byte
sketchData, tSketchData []byte
// Sortable identifier & filepath to the log file.
level int
id int
mu sync.RWMutex
// Compaction tracking.
compacting bool
// Path to data file.
path string
}
一个indexFile结构在磁盘上对应一个文件,可以看到具体的字段:
- data表示文件内容
- TagBlock和MeasurementBlock代表Tag和Measurement信息。
- level代表当前文件的等级,这个和leveldb很像,每次做compact时,合并同一级别的文件。
- compacting 表示是不是在compating中
TagBlock和MeasurementBlock
一个tsi 文件由文件头+block 信息+文件尾部构成。这个详细的可以去查阅一下源码,不再细说。block顾名思义就是块,TagBlock是存储Tag信息的,Measurement是存储Measurement信息的。这两种block都是按照一定格式编码的。比如从一段byte 连decode TagKey
func (blk *TagBlock) DecodeTagKeyElem(key []byte, elem *TagBlockKeyElem) bool {
keyN := int64(binary.BigEndian.Uint64(blk.hashData[:TagKeyNSize]))
hash := rhh.HashKey(key)
pos := hash % keyN
// Track current distance
var d int64
for {
// Find offset of tag key.
offset := binary.BigEndian.Uint64(blk.hashData[TagKeyNSize+(pos*TagKeyOffsetSize):])
if offset == 0 {
return false
}
// Parse into element.
elem.unmarshal(blk.data[offset:], blk.data)
// Return if keys match.
if bytes.Equal(elem.key, key) {
return true
}
// Check if we've exceeded the probe distance.
if d > rhh.Dist(rhh.HashKey(elem.key), pos, keyN) {
return false
}
// Move position forward.
pos = (pos + 1) % keyN
d++
if d > keyN {
return false
}
}
}
这部分的核心在于文件格式,其他的不是很重要。有了格式做encode和decode都是显而易见。
Partition和Index
上面分析完了tsi index的重要组成部分:cache,wal,SSTable等。index是一个database粒度的,这里对index也做了下分段,上层抽象了Partition结构,Index包含多个partition,每个partition 包含一部分index。
这部分后面再单独分析吧,文章有点长了。