0
点赞
收藏
分享

微信扫一扫

Index_2

前言

  • influxdb安装和使用
  • influxdb概念详解1
  • influxdb概念详解2
  • influxdb源码编译
  • influxdb启动分析
  • influxdb源码分析-meta部分
  • infludb源码分析-数据写入
  • influxdb数据写入细节
  • influxdb源码解析-series
  • influxdb源码解析-inmem index

​ 这是一个influxdb 源码分析的系列文章,在此之前,已经分析过了meta,数据写入,series,index的inmem 实现。阅读本篇文章需要首先阅读上一篇influxdb源码解析-inmem index 。因为上一章文章里面有一些上下文在里面,但是篇幅和内容的关系,分为了两章来写。

​ 上一章提到了一些基本的信息,在这里再回顾一下。

  • Index在influxdb里面被Shard等结构依赖。
  • influxdb 在tsdb/index.go中定义了一个index应该实现的方法,是一个顶层的抽象。
  • 在influxdb的tsdb/index模块下,有两种具体的实现,分别是inmem index和tsi index

​ 上一章的下半部分,分析了inmem index的结构和一些常用的方法,可以看到inmem index主要功能是对series和measurement做了索引和cache。但是从整个索引功能角度来说,有一些缺点:

  • 不能持久化,索引信息是一种结构化了的信息,每次启动需要重建。不能利用上次停机前的信息。
  • 功能有限,比如在查询是,按照tagk,或者tagv来做匹配,这时候inmem index就比较费力。
  • cache 大小不容易控制。

基于以上几个缺点,influxdb 实现了一个可以持久化,并且更加功能强大的索引:TSI(Time Series Index).这个实现支持了持久化,倒排索引,以及索引自身的一些compact,来控制占用的内存大小。

TSI

​ 对于TSI 的处理也是一个LSM-Tree类型的存储系统。那么就会有常见的组件,比如WAL,Cache,Memtable,SSTable等。从这些组件出来,能够更好地理解整个TSI 的设计。首先看WAL的部分

TSI WAL

​ 这里如果你不是很清楚WAL是什么,那么建议先去看看LSM-Tree相关的资料(当然了,WAL并不是和LSM-Tree强绑定的,只是大部分都有)。WAL的抽象在tsi index中位于tsd/tsi1/log_file.go中。PS:部分结构删减了,详细的可以去翻一下源码。

// LogFile represents an on-disk write-ahead log file.
type LogFile struct {
    id         int            // file sequence identifier
    data       []byte         // mmap
    file       *os.File       // writer
    w          *bufio.Writer  // buffered writer
    buf        []byte         // marshaling buffer
    keyBuf     []byte

    sfile   *tsdb.SeriesFile // series lookup
    size    int64            // tracks current file size
    modTime time.Time        // tracks last time write occurred
    // In-memory series existence/tombstone sets.
    seriesIDSet, tombstoneSeriesIDSet *tsdb.SeriesIDSet
    // In-memory index.
    mms logMeasurements
    // Filepath to the log file.
    path string
}

​ 在这个结构中,有几个比较重要的:

  • id,data,file,这是当前的log file的基本信息。
  • sfile 是log file对应的series 信息
  • mms logMeasurements。代表了这个log-file写入的所有的measurement相关信息。

logMeasurement:倒排索引

​ 从上面的结构中,可以看到一个log-file主要提供了series和measurement相关信息。这里的logMeasurement 是一个关于measurement->tagk->tagv->seriesId的倒排索引。可以从代码中看到:

type logMeasurement struct {
	name      []byte
	tagSet    map[string]logTagKey
	deleted   bool
	series    map[uint64]struct{}
	seriesSet *tsdb.SeriesIDSet
}


type logTagKey struct {
	name      []byte
	deleted   bool
	tagValues map[string]logTagValue
}


type logTagValue struct {
	name      []byte
	deleted   bool
	// series id set
	series    map[uint64]struct{}
	seriesSet *tsdb.SeriesIDSet
}

这个结构可以支持根据tagk,tagv,measurement 快速匹配seriesId的功能。

NewLogFile和Open

​ NewLogFile是从给定的参数新建LogFile结构,这个没啥好说的。

func NewLogFile(sfile *tsdb.SeriesFile, path string) *LogFile {
	return &LogFile{
		sfile: sfile,
		path:  path,
		mms:   make(logMeasurements),

		seriesIDSet:          tsdb.NewSeriesIDSet(),
		tombstoneSeriesIDSet: tsdb.NewSeriesIDSet(),
	}
}

​ Open是把这个结构的服务启动。读过之前的文章的人可能都明白,Influxdb 的代码风格非常一致,NewXX是初始化结构里面的字段,Open是启动服务。看一下这个函数做了啥:


func (f *LogFile) open() error {
    f.id, _ = ParseFilename(f.path)
    // Open file for appending.
    file, err := os.OpenFile(f.Path(), os.O_WRONLY|os.O_CREATE, 0666)
    if err != nil {
        return err
    }
    f.file = file
    if f.bufferSize == 0 {
        // 4k
        f.bufferSize = defaultLogFileBufferSize
    }
    f.w = bufio.NewWriterSize(f.file, f.bufferSize)
    fi, err := file.Stat()
    f.size = fi.Size()
    f.modTime = fi.ModTime()
    data, err := mmap.Map(f.Path(), 0)
    f.data = data
    var n int64
    for buf := f.data; len(buf) > 0; {
        var e LogEntry
        if err := e.UnmarshalBinary(buf); err == io.ErrShortBuffer || err == ErrLogEntryChecksumMismatch {
            break
        } else if err != nil {
            return err
        }

        f.execEntry(&e)
        n += int64(e.Size)
        buf = buf[e.Size:]
    }
    f.size = n
    _, err = file.Seek(n, io.SeekStart)
    return err
}

这个函数主要做了这些事情:

  • 打开文件,做mmap,把数据映射进来。
  • 遍历数据,反序列化成每个LogEntry
  • 执行Entry
  • 文件offset reset

前两点都很好理解,核心的逻辑在读取和执行。这里也说明了,LogFile文件是以LogEntry写入的。这里我们暂时先不关心LogEntry的具体格式和execEntry的逻辑,还是按照上一章的逻辑,因为index的核心成员是series和measurement结构,就看看是怎么处理这两个结构的。

createMeasurementIfNotExists

​ 创建不存在的measurement,在inmem-index里,这里的逻辑相对简单,就是往map里面加东西。在这里也不例外:

func (f *LogFile) createMeasurementIfNotExists(name []byte) *logMeasurement {
	mm := f.mms[string(name)]
	if mm == nil {
		mm = &logMeasurement{
			name:   name,
			tagSet: make(map[string]logTagKey),
			series: make(map[uint64]struct{}),
		}
		f.mms[string(name)] = mm
	}
	return mm
}

​ 这里创建出来的logMeasurement是一个不完整的,只有name,tag和series相关的信息是没有填充的,只是做了初始化。

AddSeriesList

​ 新增series是,在inmem-index里面是委托给了seriesFile结构,然后更新自己的map信息。那么在tsi index怎么实现的呢?这个逻辑在AddSeriesList:

func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, names [][]byte, tagsSlice []models.Tags) ([]uint64, error) {
    seriesIDs, err := f.sfile.CreateSeriesListIfNotExists(names, tagsSlice)
    if err != nil {
        return nil, err
    }

    var writeRequired bool
    entries := make([]LogEntry, 0, len(names))
    seriesSet.RLock()
    for i := range names {
        if seriesSet.ContainsNoLock(seriesIDs[i]) {
            // We don't need to allocate anything for this series.
            seriesIDs[i] = 0
            continue
        }
        writeRequired = true
        entries = append(entries, LogEntry{SeriesID: seriesIDs[i], name: names[i], tags: tagsSlice[i], cached: true, batchidx: i})
    }
    seriesSet.RUnlock()

    // Exit if all series already exist.
    if !writeRequired {
        return seriesIDs, nil
    }

    f.mu.Lock()
    defer f.mu.Unlock()

    seriesSet.Lock()
    defer seriesSet.Unlock()

    for i := range entries { // NB - this doesn't evaluate all series ids returned from series file.
        entry := &entries[i]
        if seriesSet.ContainsNoLock(entry.SeriesID) {
            // We don't need to allocate anything for this series.
            seriesIDs[entry.batchidx] = 0
            continue
        }
        if err := f.appendEntry(entry); err != nil {
            return nil, err
        }
        f.execEntry(entry)
        seriesSet.AddNoLock(entry.SeriesID)
    }

    // Flush buffer and sync to disk.
    if err := f.FlushAndSync(); err != nil {
        return nil, err
    }
    return seriesIDs, nil
}

​ 这个函数有点长,来解释一下。

  • 首先在前五行,也是把创建的任务委托给了SeriesFile,返回对应的seriesId
  • 遍历所有的seriesID,检查之前是不是存在。这里判断存在的依据是这个seriesId是不是ContainsNoLock,和下面的是对应上的。
  • 如果是需要被新建的,那么构建LogEntry结构。
  • 遍历所有的entry,写入WAL并且调用execEntry
  • 强制刷盘

​ 前面三个步骤都很显而易见,注意第四步,entry是一条一条被写入的,并且是先写入到WAL,然后execEntry 为什么要这样做呢?这里留下个疑问,我们后面来解答。

​ 所以到这里对series 信息的添加剂结束了。可以看出来比inmem-index还是复杂了不少的,主要在写WAL日志,execEntry等。

LogEntry

上面留下了个疑问,或者说到这里已经有很多疑问,首先LogEntry到底是个啥?我们先解决这个问题。LogFile是一条条LogEntry组成的。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ii2n3j2l-1651548844975)(/Users/bytedance/Library/Application Support/typora-user-images/image-20220501192045908.png)]

每次写入都是写入一个log_entry的结构,这个结构被定义在了tsdb/index/log_file/LogEntry

type LogEntry struct {
	Flag     byte   // flag
	SeriesID uint64 // series id
	Name     []byte // measurement name
	Key      []byte // tag key
	Value    []byte // tag value
	Checksum uint32 // checksum of flag/name/tags.
	Size     int    // total size of record, in bytes.

	cached   bool        // Hint to LogFile that series data is already parsed
	name     []byte      // series naem, this is a cached copy of the parsed measurement name
	tags     models.Tags // series tags, this is a cached copied of the parsed tags
	batchidx int         // position of entry in batch.
}

主要有flag,seriesId ,name,key,value等结构。LogEntry有多种类型,默认是正常写入类型,由于LSM-Tree这种不支持乱序写入,所以更新和删除就变成了追加一个其他flag的log进来:

  LogEntrySeriesTombstoneFlag      = 0x01
	LogEntryMeasurementTombstoneFlag = 0x02
	LogEntryTagKeyTombstoneFlag      = 0x04
	LogEntryTagValueTombstoneFlag    = 0x08

例如删除某个series,就会追加一个flag=LogEntrySeriesTombstoneFlag的logEntry到wal中,然后再compact的时候执行删除。LogEntry在文件中是有格式的,具体的格式从appendLogEntry可看出来。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FOOMnWpn-1651548844976)(/Users/bytedance/Library/Application Support/typora-user-images/image-20220501192844536.png)]

这里面seriesId做了varInt编码,其他的类似,详细的可以看一下代码。

说完LogEntry,那么接下来就是execEntry了。

execEntry

**execEntry的核心逻辑是,使用logEntry来更新LogFile的cache部分,也就是LogMeasurements那个倒排索引。**上面说到了LogEntry其实有多种类型,使用flag来区分。所以对于不痛的LogEntry有不同的逻辑,这里看一下插入类型的LogEntry执行。

func (f *LogFile) execEntry(e *LogEntry) {
	switch e.Flag {
	case LogEntryMeasurementTombstoneFlag:
		f.execDeleteMeasurementEntry(e)
	case LogEntryTagKeyTombstoneFlag:
		f.execDeleteTagKeyEntry(e)
	case LogEntryTagValueTombstoneFlag:
		f.execDeleteTagValueEntry(e)
	default:
		f.execSeriesEntry(e)
	}
}

重点看一下execSeriesEntry:

func (f *LogFile) execSeriesEntry(e *LogEntry) {
    var seriesKey []byte
    // Check if deleted.
    deleted := e.Flag == LogEntrySeriesTombstoneFlag
    // Read key size.
    _, remainder := tsdb.ReadSeriesKeyLen(seriesKey)
    // Read measurement name.
    name, remainder := tsdb.ReadSeriesKeyMeasurement(remainder)
    mm := f.createMeasurementIfNotExists(name)
    mm.deleted = false
    if !deleted {
        mm.addSeriesID(e.SeriesID)
    } else {
        mm.removeSeriesID(e.SeriesID)
    }
    // Read tag count.
    tagN, remainder := tsdb.ReadSeriesKeyTagN(remainder)
    // Save tags.
    var k, v []byte
    for i := 0; i < tagN; i++ {
        k, v, remainder = tsdb.ReadSeriesKeyTag(remainder)
        ts := mm.createTagSetIfNotExists(k)
        tv := ts.createTagValueIfNotExists(v)
        // Add/remove a reference to the series on the tag value.
        if !deleted {
            tv.addSeriesID(e.SeriesID)
        } else {
            tv.removeSeriesID(e.SeriesID)
        }

        ts.tagValues[string(v)] = tv
        mm.tagSet[string(k)] = ts
    }
    // Add/remove from appropriate series id sets.
    if !deleted {
        f.seriesIDSet.Add(e.SeriesID)
        f.tombstoneSeriesIDSet.Remove(e.SeriesID)
    } else {
        f.seriesIDSet.Remove(e.SeriesID)
        f.tombstoneSeriesIDSet.Add(e.SeriesID)
    }
}

代码不是很长,有删减,主要的逻辑是:

  • 创建不存在的Measurement,并且确实entry的类型,更新seriesId。
  • 遍历tagkv,更新logMeasurement
  • 更新tomb标记。

看下来发现,这里的exec逻辑主要在更新LogFile对应的cache。也就是那个倒排索引。回到上面的问题:“entry是一条一条被写入的,并且是先写入到WAL,然后execEntry **” 这里就可以回到问题了。

  • execEnty是在更新LogFile持有的倒排索引
  • 先写入WAL是为了保证数据能被持久化,只有被持久化好的数据,才会被更新到cache里面,开始使用。

其实这种逻辑在LSM-Tree类型的存储里很常见,这就是WAL的作用。

TSI Cache

上面说完了WAL,接下来是Cache模块。Cache不是在tsdb/tsi1/cache.go中这个模块相对比较简单:

type TagValueSeriesIDCache struct {
	sync.RWMutex
	// 这个map是name->tagKey->tagValue->seriesId的映射
	cache   map[string]map[string]map[string]*list.Element
	evictor *list.List

	capacity int
}

核心的结构是一个多层map,维护了name-> tagk->tagV->seriesIDList的映射。而且这个cache是一个LRU Cache。

New和Get

主要看一下New和Get方法吧。

func NewTagValueSeriesIDCache(c int) *TagValueSeriesIDCache {
	return &TagValueSeriesIDCache{
		cache:    map[string]map[string]map[string]*list.Element{},
		evictor:  list.New(),
		capacity: c,
	}
}

New 比较简单,就是初始化。

Get:

// exists.
func (c *TagValueSeriesIDCache) Get(name, key, value []byte) *tsdb.SeriesIDSet {
	c.Lock()
	defer c.Unlock()
	return c.get(name, key, value)
}
// 这个cache是个lru cache
func (c *TagValueSeriesIDCache) get(name, key, value []byte) *tsdb.SeriesIDSet {
	if mmap, ok := c.cache[string(name)]; ok {
		if tkmap, ok := mmap[string(key)]; ok {
			if ele, ok := tkmap[string(value)]; ok {
				c.evictor.MoveToFront(ele) // This now becomes most recently used.
				return ele.Value.(*seriesIDCacheElement).SeriesIDSet
			}
		}
	}
	return nil
}

Get方法也不难,通过信息找具体的值,注意c.evictor.MoveToFront(ele) 这个是把命中的元素放到最前面,这就是LRU的做法。

其他的操作都不是很难,可以看看。Cache到这里就结束了。

Index File

​ Index file结构是管理倒排索引的模块。算是LSM-Tree中的SSTable,这部分是会被持久化道磁盘上的。首先看一下index file的结构。

type IndexFile struct {
    data []byte
    sfile *tsdb.SeriesFile
    tblks map[string]*TagBlock // tag blocks by measurement name
    mblk  MeasurementBlock
    seriesIDSetData          []byte
    tombstoneSeriesIDSetData []byte
    sketchData, tSketchData []byte
    // Sortable identifier & filepath to the log file.
    level int
    id    int
    mu sync.RWMutex
    // Compaction tracking.
    compacting bool
    // Path to data file.
    path string
}

一个indexFile结构在磁盘上对应一个文件,可以看到具体的字段:

  • data表示文件内容
  • TagBlock和MeasurementBlock代表Tag和Measurement信息。
  • level代表当前文件的等级,这个和leveldb很像,每次做compact时,合并同一级别的文件。
  • compacting 表示是不是在compating中

TagBlock和MeasurementBlock

​ 一个tsi 文件由文件头+block 信息+文件尾部构成。这个详细的可以去查阅一下源码,不再细说。block顾名思义就是块,TagBlock是存储Tag信息的,Measurement是存储Measurement信息的。这两种block都是按照一定格式编码的。比如从一段byte 连decode TagKey

func (blk *TagBlock) DecodeTagKeyElem(key []byte, elem *TagBlockKeyElem) bool {
	keyN := int64(binary.BigEndian.Uint64(blk.hashData[:TagKeyNSize]))
	hash := rhh.HashKey(key)
	pos := hash % keyN

	// Track current distance
	var d int64
	for {
		// Find offset of tag key.
		offset := binary.BigEndian.Uint64(blk.hashData[TagKeyNSize+(pos*TagKeyOffsetSize):])
		if offset == 0 {
			return false
		}

		// Parse into element.
		elem.unmarshal(blk.data[offset:], blk.data)

		// Return if keys match.
		if bytes.Equal(elem.key, key) {
			return true
		}

		// Check if we've exceeded the probe distance.
		if d > rhh.Dist(rhh.HashKey(elem.key), pos, keyN) {
			return false
		}

		// Move position forward.
		pos = (pos + 1) % keyN
		d++

		if d > keyN {
			return false
		}
	}
}

​ 这部分的核心在于文件格式,其他的不是很重要。有了格式做encode和decode都是显而易见。

Partition和Index

​ 上面分析完了tsi index的重要组成部分:cache,wal,SSTable等。index是一个database粒度的,这里对index也做了下分段,上层抽象了Partition结构,Index包含多个partition,每个partition 包含一部分index。

​ 这部分后面再单独分析吧,文章有点长了。

举报

相关推荐

0 条评论