未验证 提交 74fd28df 编写于 作者: S SimFG 提交者: GitHub

Refine some datanode code (#18351)

Signed-off-by: NSimFG <bang.fu@zilliz.com>
上级 a6e1ddb5
......@@ -168,12 +168,12 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT
dbuff.delData.Append(pk, ts)
if Timestamp(ts) < dbuff.TimestampFrom {
dbuff.TimestampFrom = Timestamp(ts)
if ts < dbuff.TimestampFrom {
dbuff.TimestampFrom = ts
}
if Timestamp(ts) > dbuff.TimestampTo {
dbuff.TimestampTo = Timestamp(ts)
if ts > dbuff.TimestampTo {
dbuff.TimestampTo = ts
}
}
}
......@@ -181,7 +181,7 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT
dbuff.updateSize(dbuff.delData.RowCount)
log.Debug("mergeDeltalogs end", zap.Int64("PlanID", t.getPlanID()),
zap.Int("number of pks to compact in insert logs", len(pk2ts)),
zap.Any("elapse in ms", nano2Milli(time.Since(mergeStart))))
zap.Float64("elapse in ms", nano2Milli(time.Since(mergeStart))))
return pk2ts, dbuff, nil
}
......@@ -310,7 +310,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[primaryKey]Timestamp
log.Debug("merge end", zap.Int64("planID", t.getPlanID()), zap.Int64("remaining insert numRows", numRows),
zap.Int64("expired entities", expired),
zap.Any("elapse in ms", nano2Milli(time.Since(mergeStart))))
zap.Float64("elapse in ms", nano2Milli(time.Since(mergeStart))))
return iDatas, numRows, nil
}
......@@ -347,7 +347,7 @@ func (t *compactionTask) compact() error {
targetSegID = t.plan.GetSegmentBinlogs()[0].GetSegmentID()
}
log.Debug("compaction start", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("timeout in seconds", t.plan.GetTimeoutInSeconds()))
log.Debug("compaction start", zap.Int64("planID", t.plan.GetPlanID()), zap.Int32("timeout in seconds", t.plan.GetTimeoutInSeconds()))
segIDs := make([]UniqueID, 0, len(t.plan.GetSegmentBinlogs()))
for _, s := range t.plan.GetSegmentBinlogs() {
segIDs = append(segIDs, s.GetSegmentID())
......@@ -370,7 +370,7 @@ func (t *compactionTask) compact() error {
<-ti.Injected()
injectEnd := time.Now()
defer func() {
log.Debug("inject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(injectEnd.Sub(injectStart))))
log.Debug("inject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(injectEnd.Sub(injectStart))))
}()
var (
......@@ -463,7 +463,7 @@ func (t *compactionTask) compact() error {
err = g.Wait()
downloadEnd := time.Now()
defer func() {
log.Debug("download elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(downloadEnd.Sub(downloadStart))))
log.Debug("download elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(downloadEnd.Sub(downloadStart))))
}()
if err != nil {
......@@ -493,7 +493,7 @@ func (t *compactionTask) compact() error {
uploadEnd := time.Now()
defer func() {
log.Debug("upload elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(uploadEnd.Sub(uploadStart))))
log.Debug("upload elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uploadEnd.Sub(uploadStart))))
}()
for _, fbl := range segPaths.deltaInfo {
......@@ -526,7 +526,7 @@ func (t *compactionTask) compact() error {
}
rpcEnd := time.Now()
defer func() {
log.Debug("rpc elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(rpcEnd.Sub(rpcStart))))
log.Debug("rpc elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(rpcEnd.Sub(rpcStart))))
}()
// Compaction I: update pk range.
......@@ -550,7 +550,7 @@ func (t *compactionTask) compact() error {
ti.injectDone(true)
uninjectEnd := time.Now()
defer func() {
log.Debug("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart))))
log.Debug("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart))))
}()
log.Info("compaction done",
......@@ -561,7 +561,7 @@ func (t *compactionTask) compact() error {
zap.Int("num of delta paths", len(segPaths.deltaInfo)),
)
log.Info("overall elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(time.Since(compactStart))))
log.Info("overall elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(time.Since(compactStart))))
metrics.DataNodeCompactionLatency.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Observe(float64(t.tr.ElapseSpan().Milliseconds()))
return nil
......
......@@ -457,9 +457,9 @@ func (node *DataNode) Start() error {
},
Count: 1,
})
if err != nil {
log.Warn("fail to alloc timestamp", zap.Error(err))
return err
if err != nil || rep.Status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("fail to alloc timestamp", zap.Any("rep", rep), zap.Error(err))
return errors.New("DataNode fail to alloc timestamp")
}
connectEtcdFn := func() error {
......@@ -480,10 +480,6 @@ func (node *DataNode) Start() error {
node.chunkManager = chunkManager
if rep.Status.ErrorCode != commonpb.ErrorCode_Success || err != nil {
return errors.New("DataNode fail to start")
}
go node.BackGroundGC(node.clearSignal)
go node.compactionExecutor.start(node.ctx)
......@@ -509,8 +505,7 @@ func (node *DataNode) GetStateCode() internalpb.StateCode {
}
func (node *DataNode) isHealthy() bool {
code := node.State.Load().(internalpb.StateCode)
return code == internalpb.StateCode_Healthy
return node.GetStateCode() == internalpb.StateCode_Healthy
}
// WatchDmChannels is not in use
......@@ -543,9 +538,9 @@ func (node *DataNode) GetComponentStates(ctx context.Context) (*internalpb.Compo
return states, nil
}
// ReadyToFlush tells wether DataNode is ready for flushing
// ReadyToFlush tells whether DataNode is ready for flushing
func (node *DataNode) ReadyToFlush() error {
if node.State.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy {
if !node.isHealthy() {
return errors.New("DataNode not in HEALTHY state")
}
return nil
......@@ -565,7 +560,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
if node.State.Load().(internalpb.StateCode) != internalpb.StateCode_Healthy {
if !node.isHealthy() {
errStatus.Reason = "dataNode not in HEALTHY state"
return errStatus, nil
}
......@@ -653,7 +648,6 @@ func (node *DataNode) ResendSegmentStats(ctx context.Context, req *datapb.Resend
return &datapb.ResendSegmentStatsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
SegResent: segResent,
}, nil
......@@ -689,9 +683,7 @@ func (node *DataNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringR
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: "",
}, nil
}
......@@ -700,9 +692,7 @@ func (node *DataNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Value: "",
}, nil
}
......@@ -724,7 +714,6 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgDataNodeIsUnhealthy(Params.DataNodeCfg.GetNodeID()),
},
Response: "",
}, nil
}
......@@ -740,7 +729,6 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Response: "",
}, nil
}
......@@ -770,7 +758,6 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: metricsinfo.MsgUnimplementedMetric,
},
Response: "",
}, nil
}
......@@ -818,8 +805,8 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
zap.Int64("task ID", req.GetImportTask().GetTaskId()),
zap.Int64("collection ID", req.GetImportTask().GetCollectionId()),
zap.Int64("partition ID", req.GetImportTask().GetPartitionId()),
zap.Any("channel names", req.GetImportTask().GetChannelNames()),
zap.Any("working dataNodes", req.WorkingNodes))
zap.Strings("channel names", req.GetImportTask().GetChannelNames()),
zap.Int64s("working dataNodes", req.WorkingNodes))
defer func() {
log.Info("DataNode finish import request", zap.Int64("task ID", req.GetImportTask().GetTaskId()))
}()
......@@ -956,6 +943,7 @@ func (node *DataNode) AddSegment(ctx context.Context, req *datapb.AddSegmentRequ
return &commonpb.Status{
// TODO: Add specific error code.
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}, nil
}
}
......@@ -972,7 +960,7 @@ func importFlushReqFunc(node *DataNode, req *datapb.ImportTaskRequest, res *root
log.Error("import task returns invalid shard number",
zap.Int("shard num", shardNum),
zap.Int("# of channels", len(req.GetImportTask().GetChannelNames())),
zap.Any("channel names", req.GetImportTask().GetChannelNames()),
zap.Strings("channel names", req.GetImportTask().GetChannelNames()),
)
return fmt.Errorf("syncSegmentID Failed: invalid shard number %d", shardNum)
}
......
......@@ -131,14 +131,14 @@ func newParallelConfig() parallelConfig {
return parallelConfig{Params.DataNodeCfg.FlowGraphMaxQueueLength, Params.DataNodeCfg.FlowGraphMaxParallelism}
}
// start starts the flowgraph in datasyncservice
// start starts the flow graph in datasyncservice
func (dsService *dataSyncService) start() {
if dsService.fg != nil {
log.Info("dataSyncService starting flowgraph", zap.Int64("collectionID", dsService.collectionID),
log.Info("dataSyncService starting flow graph", zap.Int64("collectionID", dsService.collectionID),
zap.String("vChanName", dsService.vchannelName))
dsService.fg.Start()
} else {
log.Warn("dataSyncService starting flowgraph is nil", zap.Int64("collectionID", dsService.collectionID),
log.Warn("dataSyncService starting flow graph is nil", zap.Int64("collectionID", dsService.collectionID),
zap.String("vChanName", dsService.vchannelName))
}
}
......@@ -351,7 +351,6 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
// getSegmentInfos return the SegmentInfo details according to the given ids through RPC to datacoord
func (dsService *dataSyncService) getSegmentInfos(segmentIDs []int64) ([]*datapb.SegmentInfo, error) {
var segmentInfos []*datapb.SegmentInfo
infoResp, err := dsService.dataCoord.GetSegmentInfo(dsService.ctx, &datapb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
......@@ -371,6 +370,5 @@ func (dsService *dataSyncService) getSegmentInfos(segmentIDs []int64) ([]*datapb
log.Error("Fail to get datapb.SegmentInfo by ids from datacoord", zap.Error(err))
return nil, err
}
segmentInfos = infoResp.Infos
return segmentInfos, nil
return infoResp.Infos, nil
}
......@@ -46,31 +46,13 @@ func newMetaService(rc types.RootCoord, collectionID UniqueID) *metaService {
}
}
// TODO: Replace with getCollectionInfo below.
// getCollectionSchema get collection schema with provided collection id at specified timestamp.
func (mService *metaService) getCollectionSchema(ctx context.Context, collID UniqueID, timestamp Timestamp) (*schemapb.CollectionSchema, error) {
req := &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
MsgID: 0, //GOOSE TODO
Timestamp: 0, // GOOSE TODO
SourceID: Params.DataNodeCfg.GetNodeID(),
},
DbName: "default", // GOOSE TODO
CollectionID: collID,
TimeStamp: timestamp,
response, err := mService.getCollectionInfo(ctx, collID, timestamp)
if response != nil {
return response.GetSchema(), err
}
response, err := mService.rootCoord.DescribeCollection(ctx, req)
if err != nil {
return nil, fmt.Errorf("grpc error when describe collection %v from rootcoord: %s", collID, err.Error())
}
if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return nil, fmt.Errorf("describe collection %v from rootcoord wrong: %s", collID, response.GetStatus().GetReason())
}
return response.GetSchema(), nil
return nil, err
}
// getCollectionInfo get collection info with provided collection id at specified timestamp.
......
......@@ -298,14 +298,7 @@ func (replica *SegmentReplica) listCompactedSegmentIDs() map[UniqueID][]UniqueID
compactedTo2From := make(map[UniqueID][]UniqueID)
for segID, seg := range replica.compactedSegments {
var from []UniqueID
from, ok := compactedTo2From[seg.compactedTo]
if !ok {
from = []UniqueID{}
}
from = append(from, segID)
compactedTo2From[seg.compactedTo] = from
compactedTo2From[seg.compactedTo] = append(compactedTo2From[seg.compactedTo], segID)
}
return compactedTo2From
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册