From fbb5d32cb68da69cd2c2bbfeb9120e3f1334dd6a Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 17 Aug 2023 15:44:18 +0800 Subject: [PATCH] Make write binlog in parallel (#26325) Signed-off-by: Congqi Xia --- internal/datanode/flush_manager.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index c72054ed2..b921f9e71 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -27,6 +27,7 @@ import ( "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" @@ -670,7 +671,15 @@ func (t *flushBufferInsertTask) flushInsertData() error { defer cancel() if t.ChunkManager != nil && len(t.data) > 0 { tr := timerecord.NewTimeRecorder("insertData") - err := t.MultiWrite(ctx, t.data) + group, ctx := errgroup.WithContext(ctx) + for key, data := range t.data { + key := key + data := data + group.Go(func() error { + return t.Write(ctx, key, data) + }) + } + err := group.Wait() metrics.DataNodeSave2StorageLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) if err == nil { for _, d := range t.data { -- GitLab