diff --git a/internal/datanode/binlog_meta.go b/internal/datanode/binlog_meta.go new file mode 100644 index 0000000000000000000000000000000000000000..7513f6feeb2cde5ec7eaa7a30c76167e99468829 --- /dev/null +++ b/internal/datanode/binlog_meta.go @@ -0,0 +1,152 @@ +package datanode + +import ( + "path" + "strconv" + + "github.com/golang/protobuf/proto" + "github.com/zilliztech/milvus-distributed/internal/kv" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" +) + +// ddl binlog meta key: +// ${prefix}/${collectionID}/${idx} +// segment binlog meta key: +// ${prefix}/${segmentID}/${fieldID}/${idx} +type binlogMeta struct { + client kv.TxnBase // etcd kv + idAllocator allocatorInterface +} + +func NewBinlogMeta(kv kv.TxnBase, idAllocator allocatorInterface) (*binlogMeta, error) { + mt := &binlogMeta{ + client: kv, + idAllocator: idAllocator, + } + return mt, nil +} + +// if alloc is true, the returned keys will have a generated-unique ID at the end. +// if alloc is false, the returned keys will only consist of provided ids. +func (bm *binlogMeta) genKey(alloc bool, ids ...UniqueID) (key string, err error) { + if alloc { + idx, err := bm.idAllocator.allocID() + if err != nil { + return "", err + } + ids = append(ids, idx) + } + + idStr := make([]string, len(ids)) + for _, id := range ids { + idStr = append(idStr, strconv.FormatInt(id, 10)) + } + + key = path.Join(idStr...) + return +} + +func (bm *binlogMeta) SaveSegmentBinlogMetaTxn(segmentID UniqueID, field2Path map[UniqueID]string) error { + + kvs := make(map[string]string, len(field2Path)) + for fieldID, p := range field2Path { + key, err := bm.genKey(true, segmentID, fieldID) + if err != nil { + return err + } + + v := proto.MarshalTextString(&datapb.SegmentFieldBinlogMeta{ + FieldID: fieldID, + BinlogPath: p, + }) + kvs[path.Join(Params.SegFlushMetaSubPath, key)] = v + } + return bm.client.MultiSave(kvs) +} + +func (bm *binlogMeta) getFieldBinlogMeta(segmentID UniqueID, + fieldID UniqueID) (metas []*datapb.SegmentFieldBinlogMeta, err error) { + + prefix, err := bm.genKey(false, segmentID, fieldID) + if err != nil { + return nil, err + } + + _, vs, err := bm.client.LoadWithPrefix(path.Join(Params.SegFlushMetaSubPath, prefix)) + if err != nil { + return nil, err + } + + for _, blob := range vs { + m := &datapb.SegmentFieldBinlogMeta{} + if err = proto.UnmarshalText(blob, m); err != nil { + return nil, err + } + + metas = append(metas, m) + } + + return +} + +func (bm *binlogMeta) getSegmentBinlogMeta(segmentID UniqueID) (metas []*datapb.SegmentFieldBinlogMeta, err error) { + + prefix, err := bm.genKey(false, segmentID) + if err != nil { + return nil, err + } + + _, vs, err := bm.client.LoadWithPrefix(path.Join(Params.SegFlushMetaSubPath, prefix)) + if err != nil { + return nil, err + } + + for _, blob := range vs { + m := &datapb.SegmentFieldBinlogMeta{} + if err = proto.UnmarshalText(blob, m); err != nil { + return nil, err + } + + metas = append(metas, m) + } + return +} + +// ddl binlog meta key: +// ${prefix}/${collectionID}/${idx} +// --- DDL --- +func (bm *binlogMeta) SaveDDLBinlogMetaTxn(collID UniqueID, tsPath string, ddlPath string) error { + + k, err := bm.genKey(true, collID) + if err != nil { + return err + } + v := proto.MarshalTextString(&datapb.DDLBinlogMeta{ + DdlBinlogPath: ddlPath, + TsBinlogPath: tsPath, + }) + + return bm.client.Save(path.Join(Params.DDLFlushMetaSubPath, k), v) +} + +func (bm *binlogMeta) getDDLBinlogMete(collID UniqueID) (metas []*datapb.DDLBinlogMeta, err error) { + prefix, err := bm.genKey(false, collID) + if err != nil { + return nil, err + } + + _, vs, err := bm.client.LoadWithPrefix(path.Join(Params.DDLFlushMetaSubPath, prefix)) + if err != nil { + return nil, err + } + + for _, blob := range vs { + m := &datapb.DDLBinlogMeta{} + if err = proto.UnmarshalText(blob, m); err != nil { + return nil, err + } + + metas = append(metas, m) + } + return +} diff --git a/internal/datanode/binlog_meta_test.go b/internal/datanode/binlog_meta_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b460e50962cce414502949f10a982fbdd49d0721 --- /dev/null +++ b/internal/datanode/binlog_meta_test.go @@ -0,0 +1,124 @@ +package datanode + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem" +) + +func TestMetaTable_Basic(t *testing.T) { + + kvMock := memkv.NewMemoryKV() + allocMock := NewAllocatorFactory(22222) + meta, err := NewBinlogMeta(kvMock, allocMock) + require.NoError(t, err) + defer meta.client.Close() + + t.Run("TestBasic_genKey", func(t *testing.T) { + // 0/1 + alloc := true + k, err := meta.genKey(alloc, 0) + assert.NoError(t, err) + assert.True(t, strings.HasPrefix(k, "0/")) + + // rand int64 + _, err = meta.genKey(alloc) + assert.NoError(t, err) + + // 1/2/3/1 + k, err = meta.genKey(alloc, 1, 2, 3) + assert.NoError(t, err) + assert.True(t, strings.HasPrefix(k, "1/2/3/")) + + // 0 + alloc = false + k, err = meta.genKey(alloc, 0) + assert.NoError(t, err) + assert.Equal(t, "0", k) + + // "" + k, err = meta.genKey(alloc) + assert.NoError(t, err) + assert.Equal(t, "", k) + + // 1/2/3 + k, err = meta.genKey(alloc, 1, 2, 3) + assert.NoError(t, err) + assert.Equal(t, "1/2/3", k) + }) + + t.Run("TestBasic_SaveSegmentBinlogMetaTxn", func(t *testing.T) { + segID := UniqueID(999999) + fieldID2Path := map[UniqueID]string{ + 100: "a", + 200: "b", + 300: "c", + } + + err := meta.SaveSegmentBinlogMetaTxn(segID, fieldID2Path) + assert.NoError(t, err) + + metas, err := meta.getFieldBinlogMeta(segID, 100) + assert.NoError(t, err) + assert.Equal(t, 1, len(metas)) + assert.Equal(t, "a", metas[0].GetBinlogPath()) + + metas, err = meta.getFieldBinlogMeta(segID, 200) + assert.NoError(t, err) + assert.Equal(t, 1, len(metas)) + assert.Equal(t, "b", metas[0].GetBinlogPath()) + + metas, err = meta.getFieldBinlogMeta(segID, 300) + assert.NoError(t, err) + assert.Equal(t, 1, len(metas)) + assert.Equal(t, "c", metas[0].GetBinlogPath()) + + fieldID2Path2 := map[UniqueID]string{ + 100: "aa", + 200: "bb", + 300: "cc", + } + + err = meta.SaveSegmentBinlogMetaTxn(segID, fieldID2Path2) + assert.NoError(t, err) + + metas, err = meta.getSegmentBinlogMeta(segID) + assert.NoError(t, err) + assert.Equal(t, 6, len(metas)) + + paths := make([]string, 0, 6) + for _, meta := range metas { + paths = append(paths, meta.GetBinlogPath()) + } + + assert.ElementsMatch(t, []string{"a", "b", "c", "aa", "bb", "cc"}, paths) + }) + + t.Run("TestBasic_SaveDDLBinlogMetaTxn", func(t *testing.T) { + collID := UniqueID(888888) + tsPath := "a/b/c" + ddlPath := "c/b/a" + + err := meta.SaveDDLBinlogMetaTxn(collID, tsPath, ddlPath) + assert.NoError(t, err) + + metas, err := meta.getDDLBinlogMete(collID) + assert.NoError(t, err) + assert.Equal(t, 1, len(metas)) + assert.Equal(t, "a/b/c", metas[0].GetTsBinlogPath()) + assert.Equal(t, "c/b/a", metas[0].GetDdlBinlogPath()) + + err = meta.SaveDDLBinlogMetaTxn(collID, tsPath, ddlPath) + assert.NoError(t, err) + + metas, err = meta.getDDLBinlogMete(collID) + assert.NoError(t, err) + assert.Equal(t, 2, len(metas)) + assert.Equal(t, "a/b/c", metas[0].GetTsBinlogPath()) + assert.Equal(t, "c/b/a", metas[0].GetDdlBinlogPath()) + }) + +} diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 37b4babed6309bd714b129e885a1b70a724c8c05..85a1f782ebe750dc34ac6587556be79267f3674c 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -2,6 +2,7 @@ package datanode import ( "context" + "errors" "fmt" "io" "math/rand" @@ -10,8 +11,6 @@ import ( "go.uber.org/zap" - "errors" - "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/types" @@ -196,6 +195,7 @@ func (node *DataNode) GetComponentStates(ctx context.Context) (*internalpb.Compo } func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) { + log.Debug("FlushSegments ...", zap.Int("num", len(req.SegmentIDs))) ids := make([]UniqueID, 0) ids = append(ids, req.SegmentIDs...) diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 72661b3c0e4d3365a2500e4220151a4cf949cd3f..0eb0ea343749e8560ccf272d1f4d304af5b8e24f 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -9,11 +9,13 @@ import ( "os" "strconv" "testing" + "time" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem" "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/types" @@ -53,11 +55,10 @@ func TestMain(m *testing.M) { os.Exit(exitCode) } -func newMetaTable() *metaTable { - etcdClient, _ := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) - - etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) - mt, _ := NewMetaTable(etcdKV) +func newBinlogMeta() *binlogMeta { + kvMock := memkv.NewMemoryKV() + idAllocMock := NewAllocatorFactory(1) + mt, _ := NewBinlogMeta(kvMock, idAllocMock) return mt } @@ -102,10 +103,6 @@ type DataFactory struct { rawData []byte } -type AllocatorFactory struct { - ID UniqueID -} - type MasterServiceFactory struct { types.MasterService ID UniqueID @@ -386,23 +383,20 @@ func (df *DataFactory) GetMsgStreamInsertMsgs(n int) (inMsgs []*msgstream.Insert return } +type AllocatorFactory struct { + r *rand.Rand +} + func NewAllocatorFactory(id ...UniqueID) *AllocatorFactory { - f := &AllocatorFactory{} - if len(id) == 1 { - f.ID = id[0] + f := &AllocatorFactory{ + r: rand.New(rand.NewSource(time.Now().UnixNano())), } - return f -} -func (alloc AllocatorFactory) setID(id UniqueID) { - alloc.ID = id + return f } func (alloc AllocatorFactory) allocID() (UniqueID, error) { - if alloc.ID == 0 { - return UniqueID(0), nil // GOOSE TODO: random ID generating - } - return alloc.ID, nil + return alloc.r.Int63n(1000000), nil } func (m *MasterServiceFactory) setID(id UniqueID) { diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 44703ec309f614ca7d5549db79b8ef6a2fff3938..008edef8ea5ed8fadfe276230db138ad6c5f5c7c 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -58,26 +58,24 @@ func (dsService *dataSyncService) close() { func (dsService *dataSyncService) initNodes() { // TODO: add delete pipeline support - // New metaTable - var mt *metaTable + var kvClient *clientv3.Client + var err error connectEtcdFn := func() error { - etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) - if err != nil { - return err - } - - etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) - mt, err = NewMetaTable(etcdKV) + kvClient, err = clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) if err != nil { return err } return nil } - err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn) + err = retry.Retry(100000, time.Millisecond*200, connectEtcdFn) if err != nil { panic(err) } + etcdKV := etcdkv.NewEtcdKV(kvClient, Params.MetaRootPath) + // New binlogMeta + mt, _ := NewBinlogMeta(etcdKV, dsService.idAllocator) + dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) m := map[string]interface{}{ @@ -93,8 +91,8 @@ func (dsService *dataSyncService) initNodes() { var ddStreamNode Node = newDDInputNode(dsService.ctx, dsService.msFactory) var filterDmNode Node = newFilteredDmNode() - var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica, dsService.idAllocator) - var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.idAllocator, dsService.msFactory) + var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica) + var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.msFactory) var gcNode Node = newGCNode(dsService.replica) dsService.fg.AddNode(dmStreamNode) diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 1ed0afb80c4fe8ad9335eb25e9867c7f512e18be..a4b90888396b5c3b389994e15efda698486af4e6 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -6,6 +6,7 @@ import ( "path" "sort" "strconv" + "sync" "github.com/golang/protobuf/proto" "go.uber.org/zap" @@ -24,12 +25,12 @@ type ddNode struct { ddMsg *ddMsg ddRecords *ddRecords ddBuffer *ddBuffer + flushMap *sync.Map inFlushCh chan *flushMsg - idAllocator allocatorInterface - kv kv.Base - replica Replica - flushMeta *metaTable + kv kv.Base + replica Replica + binlogMeta *binlogMeta } type ddData struct { @@ -121,28 +122,47 @@ func (ddNode *ddNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Con } } + // generate binlog + if ddNode.ddBuffer.full() { + for k, v := range ddNode.ddBuffer.ddData { + ddNode.flushMap.Store(k, v) + } + ddNode.ddBuffer.ddData = make(map[UniqueID]*ddData) + log.Debug(". dd buffer full, auto flushing ...") + go flushTxn(ddNode.flushMap, ddNode.kv, ddNode.binlogMeta) + } + select { case fmsg := <-ddNode.inFlushCh: - log.Debug(". receive flush message, flushing ...") - localSegs := make([]UniqueID, 0) + log.Debug(". receive flush message ...") + localSegs := make([]UniqueID, 0, len(fmsg.segmentIDs)) for _, segID := range fmsg.segmentIDs { if ddNode.replica.hasSegment(segID) { localSegs = append(localSegs, segID) } } - if len(localSegs) > 0 { - ddNode.flush() - fmsg.segmentIDs = localSegs - ddNode.ddMsg.flushMessages = append(ddNode.ddMsg.flushMessages, fmsg) + + if len(localSegs) <= 0 { + log.Debug(".. Segment not exist in this datanode, skip flushing ...") + break } - default: - } + log.Debug(".. Segments exist, notifying insertbuffer ...") + fmsg.segmentIDs = localSegs + ddNode.ddMsg.flushMessages = append(ddNode.ddMsg.flushMessages, fmsg) - // generate binlog - if ddNode.ddBuffer.full() { - log.Debug(". dd buffer full, auto flushing ...") - ddNode.flush() + if ddNode.ddBuffer.size() > 0 { + log.Debug(".. ddl buffer not empty, flushing ...") + for k, v := range ddNode.ddBuffer.ddData { + ddNode.flushMap.Store(k, v) + } + ddNode.ddBuffer.ddData = make(map[UniqueID]*ddData) + + go flushTxn(ddNode.flushMap, ddNode.kv, ddNode.binlogMeta) + + } + + default: } var res Msg = ddNode.ddMsg @@ -150,7 +170,7 @@ func (ddNode *ddNode) Operate(ctx context.Context, in []Msg) ([]Msg, context.Con } /* -flush() will do the following: +flushTxn() will do the following: generate binlogs for all buffer data in ddNode, store the generated binlogs to minIO/S3, store the keys(paths to minIO/s3) of the binlogs to etcd. @@ -160,60 +180,68 @@ The keys of the binlogs are generated as below: ${tenant}/data_definition_log/${collection_id}/ddl/${log_idx} */ -func (ddNode *ddNode) flush() { +func flushTxn(ddlData *sync.Map, + kv kv.Base, + meta *binlogMeta) { // generate binlog ddCodec := &storage.DataDefinitionCodec{} - for collectionID, data := range ddNode.ddBuffer.ddData { - // buffer data to binlog + ddlData.Range(func(cID, d interface{}) bool { + + data := d.(*ddData) + collID := cID.(int64) + log.Debug(".. ddl flushing ...", zap.Int64("collectionID", collID), zap.Int("length", len(data.ddRequestString))) binLogs, err := ddCodec.Serialize(data.timestamps, data.ddRequestString, data.eventTypes) - if err != nil { + if err != nil || len(binLogs) != 2 { log.Error("Codec Serialize wrong", zap.Error(err)) - continue - } - if len(binLogs) != 2 { - log.Error("illegal binLogs") - continue + return false } - // binLogs -> minIO/S3 if len(data.ddRequestString) != len(data.timestamps) || len(data.timestamps) != len(data.eventTypes) { log.Error("illegal ddBuffer, failed to save binlog") - continue - } else { - log.Debug(".. dd buffer flushing ...") - keyCommon := path.Join(Params.DdBinlogRootPath, strconv.FormatInt(collectionID, 10)) - - // save ts binlog - timestampLogIdx, err := ddNode.idAllocator.allocID() - if err != nil { - log.Error("Id allocate wrong", zap.Error(err)) - } - timestampKey := path.Join(keyCommon, binLogs[0].GetKey(), strconv.FormatInt(timestampLogIdx, 10)) - err = ddNode.kv.Save(timestampKey, string(binLogs[0].GetValue())) - if err != nil { - log.Error("Save to minIO/S3 Wrong", zap.Error(err)) - } - log.Debug("save ts binlog", zap.String("key", timestampKey)) + return false + } - // save dd binlog - ddLogIdx, err := ddNode.idAllocator.allocID() - if err != nil { - log.Error("Id allocate wrong", zap.Error(err)) - } - ddKey := path.Join(keyCommon, binLogs[1].GetKey(), strconv.FormatInt(ddLogIdx, 10)) - err = ddNode.kv.Save(ddKey, string(binLogs[1].GetValue())) - if err != nil { - log.Error("Save to minIO/S3 Wrong", zap.Error(err)) - } - log.Debug("save dd binlog", zap.String("key", ddKey)) + kvs := make(map[string]string, 2) + tsIdx, err := meta.genKey(true) + if err != nil { + log.Error("Id allocate wrong", zap.Error(err)) + return false + } + tsKey := path.Join(Params.DdlBinlogRootPath, strconv.FormatInt(collID, 10), binLogs[0].GetKey(), tsIdx) + kvs[tsKey] = string(binLogs[0].GetValue()) - ddNode.flushMeta.AppendDDLBinlogPaths(collectionID, []string{timestampKey, ddKey}) + ddlIdx, err := meta.genKey(true) + if err != nil { + log.Error("Id allocate wrong", zap.Error(err)) + return false } + ddlKey := path.Join(Params.DdlBinlogRootPath, strconv.FormatInt(collID, 10), binLogs[1].GetKey(), ddlIdx) + kvs[ddlKey] = string(binLogs[1].GetValue()) - } - // clear buffer - ddNode.ddBuffer.ddData = make(map[UniqueID]*ddData) + // save ddl/ts binlog to minIO/s3 + log.Debug(".. Saving ddl binlog to minIO/s3 ...") + err = kv.MultiSave(kvs) + if err != nil { + log.Error("Save to minIO/S3 Wrong", zap.Error(err)) + _ = kv.MultiRemove([]string{tsKey, ddlKey}) + return false + } + + log.Debug(".. Saving ddl binlog meta ...") + err = meta.SaveDDLBinlogMetaTxn(collID, tsKey, ddlKey) + if err != nil { + log.Error("Save binlog meta to etcd Wrong", zap.Error(err)) + _ = kv.MultiRemove([]string{tsKey, ddlKey}) + return false + } + + log.Debug(".. Clearing ddl flush buffer ...") + ddlData.Delete(collID) + return true + + }) + log.Debug(".. DDL flushing completed ...") } func (ddNode *ddNode) createCollection(msg *msgstream.CreateCollectionMsg) { @@ -372,8 +400,8 @@ func (ddNode *ddNode) dropPartition(msg *msgstream.DropPartitionMsg) { append(ddNode.ddBuffer.ddData[collectionID].eventTypes, storage.DropPartitionEventType) } -func newDDNode(ctx context.Context, flushMeta *metaTable, - inFlushCh chan *flushMsg, replica Replica, alloc allocatorInterface) *ddNode { +func newDDNode(ctx context.Context, binlogMeta *binlogMeta, + inFlushCh chan *flushMsg, replica Replica) *ddNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -409,9 +437,10 @@ func newDDNode(ctx context.Context, flushMeta *metaTable, }, inFlushCh: inFlushCh, - idAllocator: alloc, - kv: minioKV, - replica: replica, - flushMeta: flushMeta, + // idAllocator: alloc, + kv: minioKV, + replica: replica, + binlogMeta: binlogMeta, + flushMap: &sync.Map{}, } } diff --git a/internal/datanode/flow_graph_dd_node_test.go b/internal/datanode/flow_graph_dd_node_test.go index 9c15a964a6a13487e6aa00c4c8c0ea90612cac95..8fa6ffa56bb9c3f559baa16741640098889cfd03 100644 --- a/internal/datanode/flow_graph_dd_node_test.go +++ b/internal/datanode/flow_graph_dd_node_test.go @@ -37,8 +37,7 @@ func TestFlowGraphDDNode_Operate(t *testing.T) { // Params.FlushDdBufferSize = 4 replica := newReplica() - allocatorMock := NewAllocatorFactory() - ddNode := newDDNode(ctx, newMetaTable(), inFlushCh, replica, allocatorMock) + ddNode := newDDNode(ctx, newBinlogMeta(), inFlushCh, replica) collID := UniqueID(0) collName := "col-test-0" diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index e9674343ec6a48a88a04ea58fe0272ad8549c508..c4e875a567def660172dc03fcd41ea95891a8f33 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -4,9 +4,9 @@ import ( "bytes" "context" "encoding/binary" - "fmt" "path" "strconv" + "sync" "unsafe" "go.uber.org/zap" @@ -36,12 +36,10 @@ type ( BaseNode insertBuffer *insertBuffer replica Replica - flushMeta *metaTable + flushMeta *binlogMeta + flushMap sync.Map - minIOKV kv.Base - minioPrefix string - - idAllocator allocatorInterface + minIOKV kv.Base timeTickStream msgstream.MsgStream segmentStatisticsStream msgstream.MsgStream @@ -139,13 +137,6 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c } } - if !ibNode.flushMeta.hasSegmentFlush(currentSegID) { - err := ibNode.flushMeta.addSegmentFlush(currentSegID) - if err != nil { - log.Error("add segment flush meta wrong", zap.Error(err)) - } - } - segNum := uniqueSeg[currentSegID] uniqueSeg[currentSegID] = segNum + int64(len(msg.RowIDs)) } @@ -452,15 +443,35 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c ibNode.replica.setEndPosition(currentSegID, endPosition) } - // 1.4 if full - // 1.4.1 generate binlogs + // 1.4 if full, auto flush if ibNode.insertBuffer.full(currentSegID) { - log.Debug(". Insert Buffer full, auto flushing ", zap.Int32("num of rows", ibNode.insertBuffer.size(currentSegID))) - - err = ibNode.flushSegment(currentSegID, msg.GetPartitionID(), collection.GetID()) + log.Debug(". Insert Buffer full, auto flushing ", + zap.Int32("num of rows", ibNode.insertBuffer.size(currentSegID))) + collSch, err := ibNode.getCollectionSchemaByID(collection.GetID()) if err != nil { - log.Error("flush segment fail", zap.Int64("segmentID", currentSegID), zap.Error(err)) + log.Error("Auto flush failed .. cannot get collection schema ..", zap.Error(err)) + continue } + collMeta := &etcdpb.CollectionMeta{ + Schema: collSch, + ID: collection.GetID(), + } + + ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID]) + delete(ibNode.insertBuffer.insertData, currentSegID) + + finishCh := make(chan bool) + go flushSegmentTxn(collMeta, currentSegID, msg.GetPartitionID(), collection.GetID(), + &ibNode.flushMap, ibNode.flushMeta, ibNode.minIOKV, + finishCh) + + go func(finishCh <-chan bool) { + if finished := <-finishCh; !finished { + log.Debug(".. Auto Flush failed ..") + return + } + log.Debug(".. Auto Flush completed ..") + }(finishCh) } } @@ -482,25 +493,39 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c for _, msg := range iMsg.flushMessages { for _, currentSegID := range msg.segmentIDs { log.Debug(". Receiving flush message", zap.Int64("segmentID", currentSegID)) - if ibNode.insertBuffer.size(currentSegID) > 0 { - log.Debug(".. Buffer not empty, flushing ...") - seg, err := ibNode.replica.getSegmentByID(currentSegID) - if err != nil { - log.Error("flush segment fail", zap.Error(err)) - continue - } - err = ibNode.flushSegment(currentSegID, seg.partitionID, seg.collectionID) - if err != nil { - log.Error("flush segment fail", zap.Int64("segmentID", currentSegID), zap.Error(err)) - continue - } + finishCh := make(chan bool) + go ibNode.completeFlush(currentSegID, finishCh) + + if ibNode.insertBuffer.size(currentSegID) <= 0 { + log.Debug(".. Buffer empty ...") + finishCh <- true + continue } - err := ibNode.completeFlush(currentSegID) + + log.Debug(".. Buffer not empty, flushing ..") + ibNode.flushMap.Store(currentSegID, ibNode.insertBuffer.insertData[currentSegID]) + delete(ibNode.insertBuffer.insertData, currentSegID) + + seg, err := ibNode.replica.getSegmentByID(currentSegID) + if err != nil { + log.Error("Flush failed .. cannot get segment ..", zap.Error(err)) + continue + } + + collSch, err := ibNode.getCollectionSchemaByID(seg.collectionID) if err != nil { - log.Error("complete flush wrong", zap.Error(err)) + log.Error("Flush failed .. cannot get collection schema ..", zap.Error(err)) + continue } - log.Debug("Flush completed") + + collMeta := &etcdpb.CollectionMeta{ + Schema: collSch, + ID: seg.collectionID, + } + + go flushSegmentTxn(collMeta, currentSegID, seg.partitionID, seg.collectionID, + &ibNode.flushMap, ibNode.flushMeta, ibNode.minIOKV, finishCh) } } @@ -516,63 +541,83 @@ func (ibNode *insertBufferNode) Operate(ctx context.Context, in []Msg) ([]Msg, c return []Msg{res}, ctx } -func (ibNode *insertBufferNode) flushSegment(segID UniqueID, partitionID UniqueID, collID UniqueID) error { - - collSch, err := ibNode.getCollectionSchemaByID(collID) - if err != nil { - return fmt.Errorf("Get collection by ID wrong, %v", err) - } +func flushSegmentTxn(collMeta *etcdpb.CollectionMeta, segID UniqueID, partitionID UniqueID, collID UniqueID, + insertData *sync.Map, meta *binlogMeta, kv kv.Base, finishCh chan<- bool) { - collMeta := &etcdpb.CollectionMeta{ - Schema: collSch, - ID: collID, - } + defer func() { + log.Debug(".. Clearing flush Buffer ..") + insertData.Delete(segID) + }() inCodec := storage.NewInsertCodec(collMeta) // buffer data to binlogs - binLogs, err := inCodec.Serialize(partitionID, - segID, ibNode.insertBuffer.insertData[segID]) + data, ok := insertData.Load(segID) + if !ok { + log.Error("Flush failed ... cannot load insertData ..") + finishCh <- false + return + } + binLogs, err := inCodec.Serialize(partitionID, segID, data.(*InsertData)) if err != nil { - return fmt.Errorf("generate binlog wrong: %v", err) + log.Error("Flush failed ... cannot generate binlog ..", zap.Error(err)) + finishCh <- false + return } - // clear buffer - delete(ibNode.insertBuffer.insertData, segID) - log.Debug(".. Clearing buffer") - - // 1.5.2 binLogs -> minIO/S3 - collIDStr := strconv.FormatInt(collID, 10) - partitionIDStr := strconv.FormatInt(partitionID, 10) - segIDStr := strconv.FormatInt(segID, 10) - keyPrefix := path.Join(ibNode.minioPrefix, collIDStr, partitionIDStr, segIDStr) - - log.Debug(".. Saving binlogs to MinIO ...", zap.Int("number", len(binLogs))) - for index, blob := range binLogs { - uid, err := ibNode.idAllocator.allocID() + log.Debug(".. Saving binlogs to MinIO ..", zap.Int("number", len(binLogs))) + field2Path := make(map[UniqueID]string, len(binLogs)) + kvs := make(map[string]string, len(binLogs)) + paths := make([]string, 0, len(binLogs)) + for _, blob := range binLogs { + fieldID, err := strconv.ParseInt(blob.GetKey(), 10, 64) if err != nil { - return fmt.Errorf("Allocate Id failed, %v", err) + log.Error("Flush failed ... cannot parse string to fieldID ..", zap.Error(err)) + finishCh <- false + return } - key := path.Join(keyPrefix, blob.Key, strconv.FormatInt(uid, 10)) - err = ibNode.minIOKV.Save(key, string(blob.Value[:])) + k, err := meta.genKey(true, collID, partitionID, segID, fieldID) if err != nil { - return fmt.Errorf("Save to MinIO failed, %v", err) + log.Error("Flush failed ... cannot alloc ID ..", zap.Error(err)) + finishCh <- false + return } - fieldID, err := strconv.ParseInt(blob.Key, 10, 32) - if err != nil { - return fmt.Errorf("string to fieldID wrong, %v", err) - } + key := path.Join(Params.InsertBinlogRootPath, k) + paths = append(paths, key) + kvs[key] = string(blob.Value[:]) + field2Path[fieldID] = key + } + + err = kv.MultiSave(kvs) + if err != nil { + log.Error("Flush failed ... cannot save to MinIO ..", zap.Error(err)) + _ = kv.MultiRemove(paths) + finishCh <- false + return + } - log.Debug("... Appending binlog paths ...", zap.Int("number", index)) - ibNode.flushMeta.AppendSegBinlogPaths(segID, fieldID, []string{key}) + log.Debug(".. Saving binlog paths to etcd ..", zap.Int("number", len(binLogs))) + err = meta.SaveSegmentBinlogMetaTxn(segID, field2Path) + if err != nil { + log.Error("Flush failed ... cannot save binlog paths ..", zap.Error(err)) + _ = kv.MultiRemove(paths) + finishCh <- false + return } - return nil + + finishCh <- true + } -func (ibNode *insertBufferNode) completeFlush(segID UniqueID) error { +func (ibNode *insertBufferNode) completeFlush(segID UniqueID, finishCh <-chan bool) { + if finished := <-finishCh; !finished { + return + } + + log.Debug(".. Segment flush completed ..") ibNode.replica.setIsFlushed(segID) ibNode.updateSegStatistics([]UniqueID{segID}) @@ -594,7 +639,10 @@ func (ibNode *insertBufferNode) completeFlush(segID UniqueID) error { } msgPack.Msgs = append(msgPack.Msgs, msg) - return ibNode.completeFlushStream.Produce(context.TODO(), &msgPack) + err := ibNode.completeFlushStream.Produce(context.TODO(), &msgPack) + if err != nil { + log.Error(".. Produce complete flush msg failed ..", zap.Error(err)) + } } func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error { @@ -661,8 +709,8 @@ func (ibNode *insertBufferNode) getCollectionSchemaByID(collectionID UniqueID) ( return ret.schema, nil } -func newInsertBufferNode(ctx context.Context, flushMeta *metaTable, - replica Replica, alloc allocatorInterface, factory msgstream.Factory) *insertBufferNode { +func newInsertBufferNode(ctx context.Context, flushMeta *binlogMeta, + replica Replica, factory msgstream.Factory) *insertBufferNode { maxQueueLength := Params.FlowGraphMaxQueueLength maxParallelism := Params.FlowGraphMaxParallelism @@ -690,7 +738,6 @@ func newInsertBufferNode(ctx context.Context, flushMeta *metaTable, if err != nil { panic(err) } - minioPrefix := Params.InsertBinlogRootPath //input stream, data node time tick wTt, _ := factory.NewMsgStream(ctx) @@ -717,12 +764,11 @@ func newInsertBufferNode(ctx context.Context, flushMeta *metaTable, BaseNode: baseNode, insertBuffer: iBuffer, minIOKV: minIOKV, - minioPrefix: minioPrefix, - idAllocator: alloc, timeTickStream: wTtMsgStream, segmentStatisticsStream: segStatisticsMsgStream, completeFlushStream: completeFlushStream, replica: replica, flushMeta: flushMeta, + flushMap: sync.Map{}, } } diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index f30e1421c2018ec2889a3319e88158c6819b9840..c71e1d319710b74180652fb51dfe898d18482951 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -41,8 +41,6 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { err = replica.addCollection(collMeta.ID, collMeta.Schema) require.NoError(t, err) - idFactory := AllocatorFactory{} - msFactory := pulsarms.NewFactory() m := map[string]interface{}{ "receiveBufSize": 1024, @@ -51,7 +49,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { err = msFactory.SetParams(m) assert.Nil(t, err) - iBNode := newInsertBufferNode(ctx, newMetaTable(), replica, idFactory, msFactory) + iBNode := newInsertBufferNode(ctx, newBinlogMeta(), replica, msFactory) inMsg := genInsertMsg() var iMsg flowgraph.Msg = &inMsg iBNode.Operate(ctx, []flowgraph.Msg{iMsg}) diff --git a/internal/datanode/meta_table.go b/internal/datanode/meta_table.go deleted file mode 100644 index 28257cebb6f888993648e61c630fff4d7ef24b49..0000000000000000000000000000000000000000 --- a/internal/datanode/meta_table.go +++ /dev/null @@ -1,219 +0,0 @@ -package datanode - -import ( - "fmt" - "path" - "strconv" - "sync" - - "github.com/golang/protobuf/proto" - "github.com/zilliztech/milvus-distributed/internal/kv" - "github.com/zilliztech/milvus-distributed/internal/proto/datapb" -) - -type metaTable struct { - client kv.Base // - segID2FlushMeta map[UniqueID]*datapb.SegmentFlushMeta - collID2DdlMeta map[UniqueID]*datapb.DDLFlushMeta - - lock sync.RWMutex -} - -func NewMetaTable(kv kv.TxnBase) (*metaTable, error) { - mt := &metaTable{ - client: kv, - lock: sync.RWMutex{}, - } - err := mt.reloadSegMetaFromKV() - if err != nil { - return nil, err - } - - err = mt.reloadDdlMetaFromKV() - if err != nil { - return nil, err - } - return mt, nil -} - -func (mt *metaTable) AppendSegBinlogPaths(segmentID UniqueID, fieldID int64, dataPaths []string) error { - _, ok := mt.segID2FlushMeta[segmentID] - if !ok { - err := mt.addSegmentFlush(segmentID) - if err != nil { - return err - } - } - - meta := mt.segID2FlushMeta[segmentID] - - found := false - for _, field := range meta.Fields { - if field.FieldID == fieldID { - field.BinlogPaths = append(field.BinlogPaths, dataPaths...) - found = true - break - } - } - - if !found { - newField := &datapb.FieldFlushMeta{ - FieldID: fieldID, - BinlogPaths: dataPaths, - } - meta.Fields = append(meta.Fields, newField) - } - - return mt.saveSegFlushMeta(meta) -} - -func (mt *metaTable) CompleteFlush(segmentID UniqueID) error { - mt.lock.Lock() - defer mt.lock.Unlock() - meta, ok := mt.segID2FlushMeta[segmentID] - if !ok { - return fmt.Errorf("segment not exists with ID = %v", segmentID) - } - meta.IsFlushed = true - - return mt.saveSegFlushMeta(meta) -} - -func (mt *metaTable) reloadSegMetaFromKV() error { - mt.segID2FlushMeta = make(map[UniqueID]*datapb.SegmentFlushMeta) - - _, values, err := mt.client.LoadWithPrefix(Params.SegFlushMetaSubPath) - if err != nil { - return err - } - - for _, value := range values { - flushMeta := &datapb.SegmentFlushMeta{} - err = proto.UnmarshalText(value, flushMeta) - if err != nil { - return err - } - mt.segID2FlushMeta[flushMeta.SegmentID] = flushMeta - } - - return nil -} - -// metaTable.lock.Lock() before call this function -func (mt *metaTable) saveSegFlushMeta(meta *datapb.SegmentFlushMeta) error { - value := proto.MarshalTextString(meta) - - mt.segID2FlushMeta[meta.SegmentID] = meta - prefix := path.Join(Params.SegFlushMetaSubPath, strconv.FormatInt(meta.SegmentID, 10)) - - return mt.client.Save(prefix, value) -} - -func (mt *metaTable) addSegmentFlush(segmentID UniqueID) error { - mt.lock.Lock() - defer mt.lock.Unlock() - - meta := &datapb.SegmentFlushMeta{ - IsFlushed: false, - SegmentID: segmentID, - } - return mt.saveSegFlushMeta(meta) -} - -func (mt *metaTable) hasSegmentFlush(segmentID UniqueID) bool { - mt.lock.RLock() - defer mt.lock.RUnlock() - _, ok := mt.segID2FlushMeta[segmentID] - return ok -} - -func (mt *metaTable) checkFlushComplete(segmentID UniqueID) (bool, error) { - mt.lock.RLock() - defer mt.lock.RUnlock() - meta, ok := mt.segID2FlushMeta[segmentID] - if !ok { - return false, fmt.Errorf("segment not exists with ID = %v", segmentID) - } - return meta.IsFlushed, nil -} - -func (mt *metaTable) getSegBinlogPaths(segmentID UniqueID) (map[int64][]string, error) { - mt.lock.RLock() - defer mt.lock.RUnlock() - meta, ok := mt.segID2FlushMeta[segmentID] - if !ok { - return nil, fmt.Errorf("segment not exists with ID = %v", segmentID) - } - ret := make(map[int64][]string) - for _, field := range meta.Fields { - ret[field.FieldID] = field.BinlogPaths - } - return ret, nil -} - -// --- DDL --- -func (mt *metaTable) AppendDDLBinlogPaths(collID UniqueID, paths []string) error { - mt.lock.Lock() - defer mt.lock.Unlock() - - _, ok := mt.collID2DdlMeta[collID] - if !ok { - mt.collID2DdlMeta[collID] = &datapb.DDLFlushMeta{ - CollectionID: collID, - BinlogPaths: make([]string, 0), - } - } - - meta := mt.collID2DdlMeta[collID] - meta.BinlogPaths = append(meta.BinlogPaths, paths...) - - return mt.saveDDLFlushMeta(meta) -} - -func (mt *metaTable) hasDDLFlushMeta(collID UniqueID) bool { - mt.lock.RLock() - defer mt.lock.RUnlock() - - _, ok := mt.collID2DdlMeta[collID] - return ok -} - -// metaTable.lock.Lock() before call this function -func (mt *metaTable) saveDDLFlushMeta(meta *datapb.DDLFlushMeta) error { - value := proto.MarshalTextString(meta) - - mt.collID2DdlMeta[meta.CollectionID] = meta - prefix := path.Join(Params.DDLFlushMetaSubPath, strconv.FormatInt(meta.CollectionID, 10)) - - return mt.client.Save(prefix, value) -} - -func (mt *metaTable) reloadDdlMetaFromKV() error { - mt.collID2DdlMeta = make(map[UniqueID]*datapb.DDLFlushMeta) - _, values, err := mt.client.LoadWithPrefix(Params.DDLFlushMetaSubPath) - if err != nil { - return err - } - - for _, value := range values { - ddlMeta := &datapb.DDLFlushMeta{} - err = proto.UnmarshalText(value, ddlMeta) - if err != nil { - return err - } - mt.collID2DdlMeta[ddlMeta.CollectionID] = ddlMeta - } - return nil -} - -func (mt *metaTable) getDDLBinlogPaths(collID UniqueID) (map[UniqueID][]string, error) { - mt.lock.RLock() - defer mt.lock.RUnlock() - meta, ok := mt.collID2DdlMeta[collID] - if !ok { - return nil, fmt.Errorf("collection not exists with ID = %v", collID) - } - ret := make(map[UniqueID][]string) - ret[meta.CollectionID] = meta.BinlogPaths - return ret, nil -} diff --git a/internal/datanode/meta_table_test.go b/internal/datanode/meta_table_test.go deleted file mode 100644 index 247cbff51f82542f7ca7e5597787686abfb856e7..0000000000000000000000000000000000000000 --- a/internal/datanode/meta_table_test.go +++ /dev/null @@ -1,110 +0,0 @@ -package datanode - -import ( - "testing" - - "github.com/stretchr/testify/assert" - memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem" -) - -func TestMetaTable_SegmentFlush(t *testing.T) { - - kvMock := memkv.NewMemoryKV() - meta, err := NewMetaTable(kvMock) - assert.NoError(t, err) - defer meta.client.Close() - - t.Run("TestMetaTable_addSegmentFlush", func(t *testing.T) { - err := meta.addSegmentFlush(101) - assert.NoError(t, err) - - err = meta.addSegmentFlush(102) - assert.NoError(t, err) - - err = meta.addSegmentFlush(103) - assert.NoError(t, err) - - err = meta.reloadSegMetaFromKV() - assert.NoError(t, err) - }) - - t.Run("TestMetaTable_AppendSegBinlogPaths", func(t *testing.T) { - segmentID := UniqueID(201) - err := meta.addSegmentFlush(segmentID) - assert.Nil(t, err) - - exp := map[int64][]string{ - 1: {"a", "b", "c"}, - 2: {"b", "a", "c"}, - } - for fieldID, dataPaths := range exp { - for _, dp := range dataPaths { - err = meta.AppendSegBinlogPaths(segmentID, fieldID, []string{dp}) - assert.Nil(t, err) - err = meta.AppendSegBinlogPaths(segmentID, fieldID, []string{dp}) - assert.Nil(t, err) - } - } - - ret, err := meta.getSegBinlogPaths(segmentID) - assert.Nil(t, err) - assert.Equal(t, - map[int64][]string{ - 1: {"a", "a", "b", "b", "c", "c"}, - 2: {"b", "b", "a", "a", "c", "c"}}, - ret) - }) - - t.Run("TestMetaTable_CompleteFlush", func(t *testing.T) { - - var segmentID UniqueID = 401 - - err := meta.addSegmentFlush(segmentID) - assert.NoError(t, err) - - ret, err := meta.checkFlushComplete(segmentID) - assert.NoError(t, err) - assert.Equal(t, false, ret) - - meta.CompleteFlush(segmentID) - - ret, err = meta.checkFlushComplete(segmentID) - assert.NoError(t, err) - assert.Equal(t, true, ret) - }) - -} - -func TestMetaTable_DDLFlush(t *testing.T) { - kvMock := memkv.NewMemoryKV() - meta, err := NewMetaTable(kvMock) - assert.NoError(t, err) - defer meta.client.Close() - - t.Run("TestMetaTable_AppendDDLBinlogPaths", func(t *testing.T) { - - assert.False(t, meta.hasDDLFlushMeta(301)) - assert.False(t, meta.hasDDLFlushMeta(302)) - - collID2Paths := map[UniqueID][]string{ - 301: {"a", "b", "c"}, - 302: {"c", "b", "a"}, - } - - for collID, dataPaths := range collID2Paths { - for _, dp := range dataPaths { - err = meta.AppendDDLBinlogPaths(collID, []string{dp}) - assert.Nil(t, err) - } - } - - for k, v := range collID2Paths { - ret, err := meta.getDDLBinlogPaths(k) - assert.Nil(t, err) - assert.Equal(t, map[UniqueID][]string{k: v}, ret) - } - - assert.True(t, meta.hasDDLFlushMeta(301)) - assert.True(t, meta.hasDDLFlushMeta(302)) - }) -} diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go index 298fcdcd5a96b167805a5f4f46c0949ae3da1df6..455bac68a97657e4de97143bb67314c9eed7a549 100644 --- a/internal/datanode/param_table.go +++ b/internal/datanode/param_table.go @@ -25,7 +25,7 @@ type ParamTable struct { FlushInsertBufferSize int32 FlushDdBufferSize int32 InsertBinlogRootPath string - DdBinlogRootPath string + DdlBinlogRootPath string Log log.Config // === DataNode External Components Configs === @@ -82,7 +82,7 @@ func (p *ParamTable) Init() { p.initFlushInsertBufferSize() p.initFlushDdBufferSize() p.initInsertBinlogRootPath() - p.initDdBinlogRootPath() + p.initDdlBinlogRootPath() p.initLogCfg() // === DataNode External Components Configs === @@ -159,13 +159,13 @@ func (p *ParamTable) initInsertBinlogRootPath() { p.InsertBinlogRootPath = path.Join(rootPath, "insert_log") } -func (p *ParamTable) initDdBinlogRootPath() { +func (p *ParamTable) initDdlBinlogRootPath() { // GOOSE TODO: rootPath change to TenentID rootPath, err := p.Load("etcd.rootPath") if err != nil { panic(err) } - p.DdBinlogRootPath = path.Join(rootPath, "data_definition_log") + p.DdlBinlogRootPath = path.Join(rootPath, "data_definition_log") } // ---- Pulsar ---- diff --git a/internal/datanode/param_table_test.go b/internal/datanode/param_table_test.go index ce6146ca9e5724d9e9995634342865720fb80d00..9ab2e5c4887b6bef43508d3f7d38c3efbb1fa3ae 100644 --- a/internal/datanode/param_table_test.go +++ b/internal/datanode/param_table_test.go @@ -39,8 +39,8 @@ func TestParamTable_DataNode(t *testing.T) { log.Println("InsertBinlogRootPath:", path) }) - t.Run("Test DdBinlogRootPath", func(t *testing.T) { - path := Params.DdBinlogRootPath + t.Run("Test DdlBinlogRootPath", func(t *testing.T) { + path := Params.DdlBinlogRootPath log.Println("DdBinlogRootPath:", path) }) diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index d5fae749a1fdc9d124ec9549fc012b2cf6582222..085532317f22d214f060fbe7dbc8aa3d338feb19 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -690,25 +690,29 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert }, } p := path.Join(Params.SegmentFlushMetaPath, strconv.FormatInt(req.SegmentID, 10)) - value, err := s.client.Load(p) + _, values, err := s.client.LoadWithPrefix(p) if err != nil { resp.Status.Reason = err.Error() return resp, nil } - flushMeta := &datapb.SegmentFlushMeta{} - err = proto.UnmarshalText(value, flushMeta) - if err != nil { - resp.Status.Reason = err.Error() - return resp, nil + m := make(map[int64][]string) + tMeta := &datapb.SegmentFieldBinlogMeta{} + for _, v := range values { + if err := proto.UnmarshalText(v, tMeta); err != nil { + resp.Status.Reason = err.Error() + return resp, nil + } + m[tMeta.FieldID] = append(m[tMeta.FieldID], tMeta.BinlogPath) } - fields := make([]UniqueID, len(flushMeta.Fields)) - paths := make([]*internalpb.StringList, len(flushMeta.Fields)) - for i, field := range flushMeta.Fields { - fields[i] = field.FieldID - paths[i] = &internalpb.StringList{Values: field.BinlogPaths} + + fids := make([]UniqueID, len(m)) + paths := make([]*internalpb.StringList, len(m)) + for k, v := range m { + fids = append(fids, k) + paths = append(paths, &internalpb.StringList{Values: v}) } resp.Status.ErrorCode = commonpb.ErrorCode_Success - resp.FieldIDs = fields + resp.FieldIDs = fids resp.Paths = paths return resp, nil } diff --git a/internal/log/log.go b/internal/log/log.go index 3cf0fd49fce3afc8e70df3d1cf25201f2886895a..476958a652ae974a931e53805b6bccfba51dabe9 100644 --- a/internal/log/log.go +++ b/internal/log/log.go @@ -94,7 +94,7 @@ func initFileLog(cfg *FileLogConfig) (*lumberjack.Logger, error) { } func newStdLogger() (*zap.Logger, *ZapProperties) { - conf := &Config{Level: "info", File: FileLogConfig{}} + conf := &Config{Level: "debug", File: FileLogConfig{}} lg, r, _ := InitLogger(conf) return lg, r }