From c3ac1375d86af1387e009557b2077ed57ae1686a Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 18 May 2021 16:33:05 +0800 Subject: [PATCH] Add buffer function for auto flush (#5271) Auto-flush of a segment is not considered flush-completed. So we need to buffer binlog paths generated by auto-flush. See also: #5268 #5220 Signed-off-by: yangxuan --- go.mod | 2 +- internal/datanode/binlog_meta.go | 1 + internal/datanode/collection_replica.go | 44 +++++++++++++++++++- internal/datanode/collection_replica_test.go | 18 ++++++++ internal/datanode/param_table.go | 10 +++-- 5 files changed, 69 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index b1db075aa..94fe702b1 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/apache/pulsar-client-go v0.4.0 github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 // indirect github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect - github.com/coreos/etcd v3.3.13+incompatible // indirect + github.com/coreos/etcd v3.3.13+incompatible github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 // indirect github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect diff --git a/internal/datanode/binlog_meta.go b/internal/datanode/binlog_meta.go index 4eb56ed9b..838c0f5fb 100644 --- a/internal/datanode/binlog_meta.go +++ b/internal/datanode/binlog_meta.go @@ -8,6 +8,7 @@ // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. +// GOOSE TODO remove this package datanode diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go index bb09e3994..760e67eec 100644 --- a/internal/datanode/collection_replica.go +++ b/internal/datanode/collection_replica.go @@ -42,6 +42,8 @@ type Replica interface { updateStatistics(segmentID UniqueID, numRows int64) error getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) getSegmentByID(segmentID UniqueID) (*Segment, error) + bufferAutoFlushBinlogPaths(segmentID UniqueID, field2Path map[UniqueID]string) error + getBufferPaths(segID UniqueID) (map[UniqueID][]string, error) } // Segment is the data structure of segments in data node replica. @@ -59,6 +61,7 @@ type Segment struct { startPosition *internalpb.MsgPosition endPosition *internalpb.MsgPosition // not using channelName string + field2Paths map[UniqueID][]string // fieldID to binlog paths, only auto-flushed paths will be buffered. } // CollectionSegmentReplica is the data replication of persistent data in datanode. @@ -69,6 +72,8 @@ type CollectionSegmentReplica struct { collections map[UniqueID]*Collection } +var _ Replica = &CollectionSegmentReplica{} + func newReplica() Replica { segments := make(map[UniqueID]*Segment) collections := make(map[UniqueID]*Collection) @@ -80,7 +85,43 @@ func newReplica() Replica { return replica } -// --- segment --- +// bufferAutoFlushBinlogPaths buffers binlog paths generated by auto-flush +func (replica *CollectionSegmentReplica) bufferAutoFlushBinlogPaths(segID UniqueID, field2Path map[UniqueID]string) error { + replica.mu.RLock() + defer replica.mu.RUnlock() + + seg, ok := replica.segments[segID] + if !ok { + return fmt.Errorf("Cannot find segment, id = %v", segID) + } + + for fieldID, path := range field2Path { + buffpaths, ok := seg.field2Paths[fieldID] + + if !ok { + buffpaths = make([]string, 0) + } + + buffpaths = append(buffpaths, path) + seg.field2Paths[fieldID] = buffpaths + } + log.Info("Buffer auto flush binlog paths", zap.Int64("segment ID", segID)) + + return nil +} + +func (replica *CollectionSegmentReplica) getBufferPaths(segID UniqueID) (map[UniqueID][]string, error) { + replica.mu.RLock() + defer replica.mu.RUnlock() + + if seg, ok := replica.segments[segID]; ok { + return seg.field2Paths, nil + } + + return nil, fmt.Errorf("Cannot find segment, id = %v", segID) + +} + func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) { replica.mu.RLock() defer replica.mu.RUnlock() @@ -118,6 +159,7 @@ func (replica *CollectionSegmentReplica) addSegment( startPosition: position, endPosition: new(internalpb.MsgPosition), channelName: channelName, + field2Paths: make(map[UniqueID][]string), } seg.isNew.Store(true) diff --git a/internal/datanode/collection_replica_test.go b/internal/datanode/collection_replica_test.go index 4953750f1..dc7474047 100644 --- a/internal/datanode/collection_replica_test.go +++ b/internal/datanode/collection_replica_test.go @@ -138,6 +138,24 @@ func TestReplica_Segment(t *testing.T) { assert.NotNil(t, update.StartPosition) assert.Nil(t, update.EndPosition) + f2p := map[UniqueID]string{ + 1: "a", + 2: "b", + } + + err = replica.bufferAutoFlushBinlogPaths(UniqueID(0), f2p) + assert.NoError(t, err) + r, err := replica.getBufferPaths(0) + assert.NoError(t, err) + assert.ElementsMatch(t, []string{"a"}, r[1]) + assert.ElementsMatch(t, []string{"b"}, r[2]) + err = replica.bufferAutoFlushBinlogPaths(UniqueID(0), f2p) + assert.NoError(t, err) + r, err = replica.getBufferPaths(0) + assert.NoError(t, err) + assert.ElementsMatch(t, []string{"a", "a"}, r[1]) + assert.ElementsMatch(t, []string{"b", "b"}, r[2]) + err = replica.setIsFlushed(0) assert.NoError(t, err) err = replica.setStartPosition(0, &internalpb.MsgPosition{}) diff --git a/internal/datanode/param_table.go b/internal/datanode/param_table.go index 35aae0413..8b218b780 100644 --- a/internal/datanode/param_table.go +++ b/internal/datanode/param_table.go @@ -61,8 +61,8 @@ type ParamTable struct { // --- ETCD --- EtcdAddress string MetaRootPath string - SegFlushMetaSubPath string - DDLFlushMetaSubPath string + SegFlushMetaSubPath string // GOOSE TODO remove + DDLFlushMetaSubPath string // GOOSE TODO remove // --- MinIO --- MinioAddress string @@ -109,8 +109,8 @@ func (p *ParamTable) Init() { // --- ETCD --- p.initEtcdAddress() p.initMetaRootPath() - p.initSegFlushMetaSubPath() - p.initDDLFlushMetaSubPath() + p.initSegFlushMetaSubPath() // GOOSE TODO remove + p.initDDLFlushMetaSubPath() // GOOSE TODO remove // --- MinIO --- p.initMinioAddress() @@ -220,6 +220,7 @@ func (p *ParamTable) initMetaRootPath() { p.MetaRootPath = path.Join(rootPath, subPath) } +// GOOSE TODO remove func (p *ParamTable) initSegFlushMetaSubPath() { subPath, err := p.Load("etcd.segFlushMetaSubPath") if err != nil { @@ -228,6 +229,7 @@ func (p *ParamTable) initSegFlushMetaSubPath() { p.SegFlushMetaSubPath = subPath } +// GOOSE TODO remove func (p *ParamTable) initDDLFlushMetaSubPath() { subPath, err := p.Load("etcd.ddlFlushMetaSubPath") if err != nil { -- GitLab