From 084424d636aa504b75c5bb3482ea8ada9d5ca89d Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 11 May 2023 13:51:20 +0800 Subject: [PATCH] Make data node start only once (#24031) Signed-off-by: Congqi Xia --- internal/datanode/data_node.go | 87 +++++++++++++++++++--------------- 1 file changed, 48 insertions(+), 39 deletions(-) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 643d6c066..e453f81ee 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -116,6 +116,7 @@ type DataNode struct { //call once initOnce sync.Once + startOnce sync.Once sessionMu sync.Mutex // to fix data race session *sessionutil.Session watchKv kv.MetaKv @@ -485,54 +486,62 @@ func (node *DataNode) BackGroundGC(vChannelCh <-chan string) { // Start will update DataNode state to HEALTHY func (node *DataNode) Start() error { - if err := node.allocator.Start(); err != nil { - log.Error("failed to start id allocator", zap.Error(err), zap.String("role", typeutil.DataNodeRole)) - return err - } - log.Info("start id allocator done", zap.String("role", typeutil.DataNodeRole)) - - rep, err := node.rootCoord.AllocTimestamp(node.ctx, &rootcoordpb.AllocTimestampRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO), - commonpbutil.WithMsgID(0), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - Count: 1, - }) - if err != nil || rep.Status.ErrorCode != commonpb.ErrorCode_Success { - log.Warn("fail to alloc timestamp", zap.Any("rep", rep), zap.Error(err)) - return errors.New("DataNode fail to alloc timestamp") - } + var startErr error + node.startOnce.Do(func() { + if err := node.allocator.Start(); err != nil { + log.Error("failed to start id allocator", zap.Error(err), zap.String("role", typeutil.DataNodeRole)) + startErr = err + return + } + log.Info("start id allocator done", zap.String("role", typeutil.DataNodeRole)) + + rep, err := node.rootCoord.AllocTimestamp(node.ctx, &rootcoordpb.AllocTimestampRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_RequestTSO), + commonpbutil.WithMsgID(0), + commonpbutil.WithSourceID(paramtable.GetNodeID()), + ), + Count: 1, + }) + if err != nil || rep.Status.ErrorCode != commonpb.ErrorCode_Success { + log.Warn("fail to alloc timestamp", zap.Any("rep", rep), zap.Error(err)) + startErr = errors.New("DataNode fail to alloc timestamp") + return + } - connectEtcdFn := func() error { - etcdKV := etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue()) - node.watchKv = etcdKV - return nil - } - err = retry.Do(node.ctx, connectEtcdFn, retry.Attempts(ConnectEtcdMaxRetryTime)) - if err != nil { - return errors.New("DataNode fail to connect etcd") - } + connectEtcdFn := func() error { + etcdKV := etcdkv.NewEtcdKV(node.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue()) + node.watchKv = etcdKV + return nil + } + err = retry.Do(node.ctx, connectEtcdFn, retry.Attempts(ConnectEtcdMaxRetryTime)) + if err != nil { + startErr = errors.New("DataNode fail to connect etcd") + return + } - chunkManager, err := node.factory.NewPersistentStorageChunkManager(node.ctx) + chunkManager, err := node.factory.NewPersistentStorageChunkManager(node.ctx) - if err != nil { - return err - } + if err != nil { + startErr = err + return + } - node.chunkManager = chunkManager + node.chunkManager = chunkManager - go node.BackGroundGC(node.clearSignal) + go node.BackGroundGC(node.clearSignal) - go node.compactionExecutor.start(node.ctx) + go node.compactionExecutor.start(node.ctx) - // Start node watch node - go node.StartWatchChannels(node.ctx) + // Start node watch node + go node.StartWatchChannels(node.ctx) - go node.flowgraphManager.start() + go node.flowgraphManager.start() - node.UpdateStateCode(commonpb.StateCode_Healthy) - return nil + node.UpdateStateCode(commonpb.StateCode_Healthy) + + }) + return startErr } // UpdateStateCode updates datanode's state code -- GitLab