diff --git a/internal/querynode/meta_replica.go b/internal/querynode/meta_replica.go index cd1983ea3d1c0ff58f5553c47dc9858902ee7d18..e8ceccd6c12c87f4666347b681b95d5f49cc7ebc 100644 --- a/internal/querynode/meta_replica.go +++ b/internal/querynode/meta_replica.go @@ -603,7 +603,7 @@ func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType seg deleteSegment(segment) } default: - panic("unsupported segment type") + panic(fmt.Sprintf("unsupported segment type %s", segType.String())) } metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Dec() diff --git a/internal/querynode/partition.go b/internal/querynode/partition.go index 627e348fa2e1dda170e55e0d9d9257725118c2e3..1ec631c5a9f7e2ab1d020afab6354726be47f4df 100644 --- a/internal/querynode/partition.go +++ b/internal/querynode/partition.go @@ -97,7 +97,11 @@ func (p *Partition) removeSegmentID(segmentID UniqueID, segType segmentType) { default: return } - log.Info("remove a segment from replica", zap.Int64("collectionID", p.collectionID), zap.Int64("partitionID", p.partitionID), zap.Int64("segmentID", segmentID)) + log.Info("remove a segment from replica", + zap.Int64("collectionID", p.collectionID), + zap.Int64("partitionID", p.partitionID), + zap.Int64("segmentID", segmentID), + zap.String("segmentType", segType.String())) } // newPartition returns a new Partition diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 81a0ff6c8cdb9a578a41500ba0def9d679b977e3..8095a932a8225bca0a9c9dc7df8ea6081d225063 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -84,7 +84,7 @@ type Segment struct { rmMutex sync.RWMutex // guards recentlyModified recentlyModified bool - typeMu sync.Mutex // guards builtIndex + typeMu sync.RWMutex // guards segmentType segmentType segmentType idBinlogRowSizes []int64 @@ -127,8 +127,8 @@ func (s *Segment) setType(segType segmentType) { } func (s *Segment) getType() segmentType { - s.typeMu.Lock() - defer s.typeMu.Unlock() + s.typeMu.RLock() + defer s.typeMu.RUnlock() return s.segmentType } @@ -178,7 +178,7 @@ func newSegment(collection *Collection, segmentID UniqueID, partitionID UniqueID zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID), - zap.Int32("segment type", int32(segType)), + zap.String("segmentType", segType.String()), zap.Error(err)) return nil, err } @@ -187,7 +187,7 @@ func newSegment(collection *Collection, segmentID UniqueID, partitionID UniqueID zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID), - zap.Int32("segmentType", int32(segType))) + zap.String("segmentType", segType.String())) var segment = &Segment{ segmentPtr: segmentPtr, @@ -217,7 +217,11 @@ func deleteSegment(segment *Segment) { C.DeleteSegment(cPtr) segment.segmentPtr = nil - log.Info("delete segment from memory", zap.Int64("collectionID", segment.collectionID), zap.Int64("partitionID", segment.partitionID), zap.Int64("segmentID", segment.ID())) + log.Info("delete segment from memory", + zap.Int64("collectionID", segment.collectionID), + zap.Int64("partitionID", segment.partitionID), + zap.Int64("segmentID", segment.ID()), + zap.String("segmentType", segment.getType().String())) } func (s *Segment) getRowCount() int64 { @@ -758,7 +762,8 @@ func (s *Segment) segmentLoadDeletedRecord(primaryKeys []primaryKey, timestamps log.Info("load deleted record done", zap.Int64("row count", rowCount), - zap.Int64("segmentID", s.ID())) + zap.Int64("segmentID", s.ID()), + zap.String("segmentType", s.getType().String())) return nil } diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 7f52eb0f2257743d4d5e76d6f5c32dac93e64999..a0c23ab2eb8594e3db13f9447439833ddf5e682b 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -86,7 +86,7 @@ func (loader *segmentLoader) LoadSegment(req *querypb.LoadSegmentsRequest, segme log.Info("segmentLoader start loading...", zap.Any("collectionID", req.CollectionID), zap.Any("segmentNum", segmentNum), - zap.Any("loadType", segmentType)) + zap.Any("segmentType", segmentType.String())) // check memory limit concurrencyLevel := loader.cpuPool.Cap() @@ -134,7 +134,7 @@ func (loader *segmentLoader) LoadSegment(req *querypb.LoadSegmentsRequest, segme zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID), - zap.Int32("segment type", int32(segmentType)), + zap.String("segmentType", segmentType.String()), zap.Error(err)) segmentGC() return err @@ -157,7 +157,7 @@ func (loader *segmentLoader) LoadSegment(req *querypb.LoadSegmentsRequest, segme zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID), - zap.Int32("segment type", int32(segmentType)), + zap.String("segmentType", segmentType.String()), zap.Error(err)) return err } @@ -206,7 +206,8 @@ func (loader *segmentLoader) loadSegmentInternal(segment *Segment, log.Info("start loading segment data into memory", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), - zap.Int64("segmentID", segmentID)) + zap.Int64("segmentID", segmentID), + zap.String("segmentType", segment.getType().String())) pkFieldID, err := loader.metaReplica.getPKFieldIDByCollectionID(collectionID) if err != nil { @@ -310,7 +311,8 @@ func (loader *segmentLoader) loadGrowingSegmentFields(segment *Segment, fieldBin log.Info("log field binlogs done", zap.Int64("collection", segment.collectionID), zap.Int64("segment", segment.segmentID), - zap.Any("field", fieldBinlogs)) + zap.Any("field", fieldBinlogs), + zap.String("segmentType", segmentType.String())) _, _, insertData, err := iCodec.Deserialize(blobs) if err != nil { @@ -337,7 +339,7 @@ func (loader *segmentLoader) loadGrowingSegmentFields(segment *Segment, fieldBin return loader.loadGrowingSegments(segment, rowIDData.(*storage.Int64FieldData).Data, utss, insertData) default: - err := fmt.Errorf("illegal segmentType=%v when load segment, collectionID=%v", segmentType, segment.collectionID) + err := fmt.Errorf("illegal segmentType=%s when load segment, collectionID=%v", segmentType.String(), segment.collectionID) return err } } @@ -359,7 +361,8 @@ func (loader *segmentLoader) loadSealedSegmentFields(segment *Segment, fields [] log.Info("log field binlogs done", zap.Int64("collection", segment.collectionID), zap.Int64("segment", segment.segmentID), - zap.Any("fields", fields)) + zap.Any("fields", fields), + zap.String("segmentType", segment.getType().String())) return nil }