dataCoord的Compaction分析2
milvus版本:2.3.2
流程图:

compaction用来合并对象存储的小文件,将小的segment合并为大的segment。
Compaction 有一个配置项来控制是否启用自动压缩。此配置是全局的,会影响系统中的所有集合。
dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true
enableAutoCompaction生效的前提是enableCompaction为true。
增加了collection级别的控制。
compaction相关参数(全局):
dataCoord.enableCompaction = true
dataCoord.compaction.enableAutoCompaction = true
dataCoord.compaction.indexBasedCompaction = true
dataCoord.compaction.global.interval = 60 #默认60秒,触发compaction信号
dataCoord.compaction.check.interval = 10 #默认10秒,更新状态
dataCoord.segment.smallProportion = 0.5 #默认0.5
dataCoord.compaction.max.segment = 30 #默认30
dataCoord.compaction.min.segment = 3 #默认3
在collection级别设置属性:
collection.autocompaction.enabled = true
python设置代码:
hello_milvus.set_properties({"collection.autocompaction.enabled": True})
1.执行compaction的代码
func (t *compactionTrigger) start() {
	t.quit = make(chan struct{})
	t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
	t.wg.Add(2)
	go func() {
		defer logutil.LogPanic()
		defer t.wg.Done()
		for {
			select {
			case <-t.quit:
				log.Info("compaction trigger quit")
				return
			case signal := <-t.signals:
				switch {
				case signal.isGlobal:
					// 处理全局compaction信号
					t.handleGlobalSignal(signal)
				default:
					// collection级别信号
					t.handleSignal(signal)
					// shouldn't reset, otherwise a frequent flushed collection will affect other collections
					// t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
				}
			}
		}
	}()
	// 触发全局compaction信号
	go t.startGlobalCompactionLoop()
}
t.handleGlobalSignal(signal) 用来处理全局信号。
t.handleSignal(signal) 用来处理collection级别的信号。
go t.startGlobalCompactionLoop() 定时触发全局信号。
collection信号在flush的时候触发。
2.触发全局
// triggerCompaction trigger a compaction if any compaction condition satisfy.
func (t *compactionTrigger) triggerCompaction() error {
	id, err := t.allocSignalID()
	if err != nil {
		return err
	}
	signal := &compactionSignal{
		id:       id,
		isForce:  false,
		isGlobal: true,
	}
	t.signals <- signal
	return nil
}
3.触发collection级别
// triggerSingleCompaction triger a compaction bundled with collection-partition-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string) error {
	// If AutoCompaction disabled, flush request will not trigger compaction
	if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
		return nil
	}
	id, err := t.allocSignalID()
	if err != nil {
		return err
	}
	signal := &compactionSignal{
		id:           id,
		isForce:      false,
		isGlobal:     false,
		collectionID: collectionID,
		partitionID:  partitionID,
		segmentID:    segmentID,
		channel:      channel,
	}
	t.signals <- signal
	return nil
}
调用堆栈:
SaveBinlogPaths()(internal\datacoord\services.go)
  |--s.compactionTrigger.triggerSingleCompaction()
当调用flush的时候触发。
// SaveBinlogPaths updates segment related binlog path
// works for Checkpoints and Flush
func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
	......
	if req.GetFlushed() {
		s.segmentManager.DropSegment(ctx, req.SegmentID)
		s.flushCh <- req.SegmentID
		if !req.Importing && Params.DataCoordCfg.EnableCompaction.GetAsBool() {
			err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(),
				segmentID, segment.GetInsertChannel())
			if err != nil {
				log.Warn("failed to trigger single compaction")
			} else {
				log.Info("compaction triggered for segment")
			}
		}
	}
	return merr.Success(), nil
}
4.进入handleSignal()
// handleSignal processes segment flush caused partition-chan level compaction signal
func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
	......
	if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) {
		log.RatedInfo(20, "collection auto compaction disabled",
			zap.Int64("collectionID", collectionID),
		)
		return
	}
	......
	plans := t.generatePlans(segments, signal.isForce, isDiskIndex, ct)
	for _, plan := range plans {
		......
	}
}
isCollectionAutoCompactionEnabled()判断是否设置collection级别。
func (t *compactionTrigger) isCollectionAutoCompactionEnabled(coll *collectionInfo) bool {
	enabled, err := getCollectionAutoCompactionEnabled(coll.Properties)
	if err != nil {
		log.Warn("collection properties auto compaction not valid, returning false", zap.Error(err))
		return false
	}
	return enabled
}
进入getCollectionAutoCompactionEnabled():
// getCollectionAutoCompactionEnabled returns whether auto compaction for collection is enabled.
// if not set, returns global auto compaction config.
func getCollectionAutoCompactionEnabled(properties map[string]string) (bool, error) {
	v, ok := properties[common.CollectionAutoCompactionKey]
	if ok {
		enabled, err := strconv.ParseBool(v)
		if err != nil {
			return false, err
		}
		return enabled, nil
	}
	return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool(), nil
}
common.CollectionAutoCompactionKey=collection.autocompaction.enabled










