data_sync_service.go 6.3 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

X
XuanYang-cn 已提交
12 13 14 15
package datanode

import (
	"context"
16
	"fmt"
X
XuanYang-cn 已提交
17

X
Xiangyu Wang 已提交
18 19
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
20
	"github.com/milvus-io/milvus/internal/proto/commonpb"
S
sunby 已提交
21
	"github.com/milvus-io/milvus/internal/proto/datapb"
22
	"github.com/milvus-io/milvus/internal/rootcoord"
23
	"github.com/milvus-io/milvus/internal/types"
X
Xiangyu Wang 已提交
24
	"github.com/milvus-io/milvus/internal/util/flowgraph"
X
XuanYang-cn 已提交
25 26

	"go.uber.org/zap"
X
XuanYang-cn 已提交
27 28 29
)

type dataSyncService struct {
30 31 32 33 34 35 36 37 38 39
	ctx          context.Context
	cancelFn     context.CancelFunc
	fg           *flowgraph.TimeTickedFlowGraph
	flushChan    <-chan *flushMsg
	replica      Replica
	idAllocator  allocatorInterface
	msFactory    msgstream.Factory
	collectionID UniqueID
	dataService  types.DataService
	clearSignal  chan<- UniqueID
X
XuanYang-cn 已提交
40 41
}

S
sunby 已提交
42 43 44 45 46
func newDataSyncService(ctx context.Context,
	flushChan <-chan *flushMsg,
	replica Replica,
	alloc allocatorInterface,
	factory msgstream.Factory,
47 48
	vchan *datapb.VchannelInfo,
	clearSignal chan<- UniqueID,
S
sunby 已提交
49
	dataService types.DataService,
50

51
) (*dataSyncService, error) {
52 53

	ctx1, cancel := context.WithCancel(ctx)
S
sunby 已提交
54

55
	service := &dataSyncService{
56 57 58 59 60 61 62 63 64 65
		ctx:          ctx1,
		cancelFn:     cancel,
		fg:           nil,
		flushChan:    flushChan,
		replica:      replica,
		idAllocator:  alloc,
		msFactory:    factory,
		collectionID: vchan.GetCollectionID(),
		dataService:  dataService,
		clearSignal:  clearSignal,
X
XuanYang-cn 已提交
66
	}
S
sunby 已提交
67

68 69 70 71
	if err := service.initNodes(vchan); err != nil {
		return nil, err
	}
	return service, nil
72 73
}

74
func (dsService *dataSyncService) start() {
75
	if dsService.fg != nil {
76
		log.Debug("Data Sync Service starting flowgraph")
77 78 79 80
		dsService.fg.Start()
	} else {
		log.Debug("Data Sync Service flowgraph nil")
	}
X
XuanYang-cn 已提交
81 82 83 84
}

func (dsService *dataSyncService) close() {
	if dsService.fg != nil {
85
		log.Debug("Data Sync Service closing flowgraph")
X
XuanYang-cn 已提交
86 87
		dsService.fg.Close()
	}
88 89

	dsService.cancelFn()
X
XuanYang-cn 已提交
90 91
}

92
func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) error {
X
XuanYang-cn 已提交
93 94 95
	// TODO: add delete pipeline support
	dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)

G
groot 已提交
96 97 98
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
S
sunby 已提交
99 100 101
		"PulsarBufSize":  1024,
	}

102
	err := dsService.msFactory.SetParams(m)
G
groot 已提交
103
	if err != nil {
104
		return err
G
groot 已提交
105 106
	}

N
neza2017 已提交
107
	saveBinlog := func(fu *segmentFlushUnit) error {
108 109 110 111 112
		id2path := []*datapb.ID2PathList{}
		checkPoints := []*datapb.CheckPoint{}
		for k, v := range fu.field2Path {
			id2path = append(id2path, &datapb.ID2PathList{ID: k, Paths: []string{v}})
		}
N
neza2017 已提交
113
		for k, v := range fu.checkPoint {
114 115 116
			v := v
			checkPoints = append(checkPoints, &datapb.CheckPoint{
				SegmentID: k,
N
neza2017 已提交
117 118
				NumOfRows: v.numRows,
				Position:  &v.pos,
119 120
			})
		}
S
sunby 已提交
121 122 123 124
		log.Debug("SaveBinlogPath",
			zap.Int64("SegmentID", fu.segID),
			zap.Int64("CollectionID", fu.collID),
			zap.Int("Length of Field2BinlogPaths", len(id2path)),
125
			zap.Any("Start Positions", fu.startPositions),
S
sunby 已提交
126
		)
127 128 129

		req := &datapb.SaveBinlogPathsRequest{
			Base: &commonpb.MsgBase{
S
sunby 已提交
130 131 132
				MsgType:   0, //TODO msg type
				MsgID:     0, //TODO msg id
				Timestamp: 0, //TODO time stamp
133 134 135
				SourceID:  Params.NodeID,
			},
			SegmentID:         fu.segID,
S
sunby 已提交
136
			CollectionID:      fu.collID,
137 138
			Field2BinlogPaths: id2path,
			CheckPoints:       checkPoints,
139
			StartPositions:    fu.startPositions,
140 141 142 143
			Flushed:           fu.flushed,
		}
		rsp, err := dsService.dataService.SaveBinlogPaths(dsService.ctx, req)
		if err != nil {
S
sunby 已提交
144
			return fmt.Errorf(err.Error())
145 146 147 148 149 150
		}
		if rsp.ErrorCode != commonpb.ErrorCode_Success {
			return fmt.Errorf("data service save bin log path failed, reason = %s", rsp.Reason)
		}
		return nil
	}
151

152
	pchan := rootcoord.ToPhysicalChannel(vchanInfo.GetChannelName())
X
XuanYang-cn 已提交
153 154 155
	var dmStreamNode Node = newDmInputNode(
		dsService.ctx,
		dsService.msFactory,
156
		pchan,
X
XuanYang-cn 已提交
157 158 159
		vchanInfo.GetSeekPosition(),
	)
	var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo)
N
neza2017 已提交
160 161 162 163 164 165
	var insertBufferNode Node = newInsertBufferNode(
		dsService.ctx,
		dsService.replica,
		dsService.msFactory,
		dsService.idAllocator,
		dsService.flushChan,
166
		saveBinlog,
S
sunby 已提交
167
		vchanInfo.GetChannelName(),
N
neza2017 已提交
168
	)
X
XuanYang-cn 已提交
169

S
sunby 已提交
170 171 172 173
	// recover segment checkpoints
	for _, us := range vchanInfo.GetUnflushedSegments() {
		if us.CollectionID != dsService.collectionID ||
			us.GetInsertChannel() != vchanInfo.ChannelName {
174 175 176 177 178 179
			log.Warn("Collection ID or ChannelName not compact",
				zap.Int64("Wanted ID", dsService.collectionID),
				zap.Int64("Actual ID", us.CollectionID),
				zap.String("Wanted Channel Name", vchanInfo.ChannelName),
				zap.String("Actual Channel Name", us.GetInsertChannel()),
			)
S
sunby 已提交
180 181 182
			continue
		}

183 184 185 186 187
		log.Info("Recover Segment NumOfRows form checkpoints",
			zap.String("InsertChannel", us.GetInsertChannel()),
			zap.Int64("SegmentID", us.GetID()),
			zap.Int64("NumOfRows", us.GetNumOfRows()),
		)
S
sunby 已提交
188 189 190 191
		dsService.replica.addSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel())
		dsService.replica.updateStatistics(us.GetID(), us.GetNumOfRows())
	}

192 193 194
	dsService.fg.AddNode(dmStreamNode)
	dsService.fg.AddNode(ddNode)
	dsService.fg.AddNode(insertBufferNode)
X
XuanYang-cn 已提交
195 196

	// ddStreamNode
197
	err = dsService.fg.SetEdges(dmStreamNode.Name(),
X
XuanYang-cn 已提交
198 199 200 201
		[]string{},
		[]string{ddNode.Name()},
	)
	if err != nil {
202
		log.Error("set edges failed in node", zap.String("name", dmStreamNode.Name()), zap.Error(err))
203
		return err
X
XuanYang-cn 已提交
204 205 206 207
	}

	// ddNode
	err = dsService.fg.SetEdges(ddNode.Name(),
208 209
		[]string{dmStreamNode.Name()},
		[]string{insertBufferNode.Name()},
X
XuanYang-cn 已提交
210 211
	)
	if err != nil {
X
XuanYang-cn 已提交
212
		log.Error("set edges failed in node", zap.String("name", ddNode.Name()), zap.Error(err))
213
		return err
X
XuanYang-cn 已提交
214 215 216 217
	}

	// insertBufferNode
	err = dsService.fg.SetEdges(insertBufferNode.Name(),
218 219
		[]string{ddNode.Name()},
		[]string{},
X
XuanYang-cn 已提交
220 221
	)
	if err != nil {
X
XuanYang-cn 已提交
222
		log.Error("set edges failed in node", zap.String("name", insertBufferNode.Name()), zap.Error(err))
223
		return err
X
XuanYang-cn 已提交
224
	}
225
	return nil
X
XuanYang-cn 已提交
226
}