data_sync_service.go 6.6 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

X
XuanYang-cn 已提交
12 13 14 15
package datanode

import (
	"context"
16
	"fmt"
X
XuanYang-cn 已提交
17

X
Xiangyu Wang 已提交
18 19
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
20
	"github.com/milvus-io/milvus/internal/proto/commonpb"
S
sunby 已提交
21
	"github.com/milvus-io/milvus/internal/proto/datapb"
22
	"github.com/milvus-io/milvus/internal/rootcoord"
23
	"github.com/milvus-io/milvus/internal/types"
X
Xiangyu Wang 已提交
24
	"github.com/milvus-io/milvus/internal/util/flowgraph"
X
XuanYang-cn 已提交
25 26

	"go.uber.org/zap"
X
XuanYang-cn 已提交
27 28 29
)

type dataSyncService struct {
30 31 32 33 34 35 36 37
	ctx          context.Context
	cancelFn     context.CancelFunc
	fg           *flowgraph.TimeTickedFlowGraph
	flushChan    <-chan *flushMsg
	replica      Replica
	idAllocator  allocatorInterface
	msFactory    msgstream.Factory
	collectionID UniqueID
38
	dataCoord    types.DataCoord
39
	clearSignal  chan<- UniqueID
X
XuanYang-cn 已提交
40 41
}

S
sunby 已提交
42 43 44 45 46
func newDataSyncService(ctx context.Context,
	flushChan <-chan *flushMsg,
	replica Replica,
	alloc allocatorInterface,
	factory msgstream.Factory,
47 48
	vchan *datapb.VchannelInfo,
	clearSignal chan<- UniqueID,
49
	dataCoord types.DataCoord,
50

51
) (*dataSyncService, error) {
52 53

	ctx1, cancel := context.WithCancel(ctx)
S
sunby 已提交
54

55
	service := &dataSyncService{
56 57 58 59 60 61 62 63
		ctx:          ctx1,
		cancelFn:     cancel,
		fg:           nil,
		flushChan:    flushChan,
		replica:      replica,
		idAllocator:  alloc,
		msFactory:    factory,
		collectionID: vchan.GetCollectionID(),
64
		dataCoord:    dataCoord,
65
		clearSignal:  clearSignal,
X
XuanYang-cn 已提交
66
	}
S
sunby 已提交
67

68 69 70 71
	if err := service.initNodes(vchan); err != nil {
		return nil, err
	}
	return service, nil
72 73
}

74
func (dsService *dataSyncService) start() {
75
	if dsService.fg != nil {
76
		log.Debug("Data Sync Service starting flowgraph")
77 78 79 80
		dsService.fg.Start()
	} else {
		log.Debug("Data Sync Service flowgraph nil")
	}
X
XuanYang-cn 已提交
81 82 83 84
}

func (dsService *dataSyncService) close() {
	if dsService.fg != nil {
85
		log.Debug("Data Sync Service closing flowgraph")
X
XuanYang-cn 已提交
86 87
		dsService.fg.Close()
	}
88 89

	dsService.cancelFn()
X
XuanYang-cn 已提交
90 91
}

92
func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) error {
X
XuanYang-cn 已提交
93 94 95
	// TODO: add delete pipeline support
	dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)

G
groot 已提交
96 97 98
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
S
sunby 已提交
99 100 101
		"PulsarBufSize":  1024,
	}

102
	err := dsService.msFactory.SetParams(m)
G
groot 已提交
103
	if err != nil {
104
		return err
G
groot 已提交
105 106
	}

N
neza2017 已提交
107
	saveBinlog := func(fu *segmentFlushUnit) error {
S
sunby 已提交
108
		id2path := []*datapb.FieldBinlog{}
109 110
		checkPoints := []*datapb.CheckPoint{}
		for k, v := range fu.field2Path {
S
sunby 已提交
111
			id2path = append(id2path, &datapb.FieldBinlog{FieldID: k, Binlogs: []string{v}})
112
		}
N
neza2017 已提交
113
		for k, v := range fu.checkPoint {
114 115 116
			v := v
			checkPoints = append(checkPoints, &datapb.CheckPoint{
				SegmentID: k,
N
neza2017 已提交
117 118
				NumOfRows: v.numRows,
				Position:  &v.pos,
119 120
			})
		}
S
sunby 已提交
121 122 123 124 125
		log.Debug("SaveBinlogPath",
			zap.Int64("SegmentID", fu.segID),
			zap.Int64("CollectionID", fu.collID),
			zap.Int("Length of Field2BinlogPaths", len(id2path)),
		)
126 127 128

		req := &datapb.SaveBinlogPathsRequest{
			Base: &commonpb.MsgBase{
S
sunby 已提交
129 130 131
				MsgType:   0, //TODO msg type
				MsgID:     0, //TODO msg id
				Timestamp: 0, //TODO time stamp
132 133 134
				SourceID:  Params.NodeID,
			},
			SegmentID:         fu.segID,
S
sunby 已提交
135
			CollectionID:      fu.collID,
136 137
			Field2BinlogPaths: id2path,
			CheckPoints:       checkPoints,
138
			StartPositions:    fu.startPositions,
139 140
			Flushed:           fu.flushed,
		}
141
		rsp, err := dsService.dataCoord.SaveBinlogPaths(dsService.ctx, req)
142
		if err != nil {
S
sunby 已提交
143
			return fmt.Errorf(err.Error())
144 145 146 147 148 149
		}
		if rsp.ErrorCode != commonpb.ErrorCode_Success {
			return fmt.Errorf("data service save bin log path failed, reason = %s", rsp.Reason)
		}
		return nil
	}
150

151
	pchan := rootcoord.ToPhysicalChannel(vchanInfo.GetChannelName())
X
XuanYang-cn 已提交
152 153 154
	var dmStreamNode Node = newDmInputNode(
		dsService.ctx,
		dsService.msFactory,
155
		pchan,
X
XuanYang-cn 已提交
156 157 158
		vchanInfo.GetSeekPosition(),
	)
	var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo)
C
congqixia 已提交
159 160
	var insertBufferNode Node
	insertBufferNode, err = newInsertBufferNode(
N
neza2017 已提交
161 162 163 164 165
		dsService.ctx,
		dsService.replica,
		dsService.msFactory,
		dsService.idAllocator,
		dsService.flushChan,
166
		saveBinlog,
S
sunby 已提交
167
		vchanInfo.GetChannelName(),
N
neza2017 已提交
168
	)
C
congqixia 已提交
169 170 171
	if err != nil {
		return err
	}
X
XuanYang-cn 已提交
172

G
godchen 已提交
173 174 175 176 177
	var deleteNode Node = newDeleteDNode(
		dsService.ctx,
		dsService.replica,
	)

S
sunby 已提交
178 179 180 181
	// recover segment checkpoints
	for _, us := range vchanInfo.GetUnflushedSegments() {
		if us.CollectionID != dsService.collectionID ||
			us.GetInsertChannel() != vchanInfo.ChannelName {
182 183 184 185 186 187
			log.Warn("Collection ID or ChannelName not compact",
				zap.Int64("Wanted ID", dsService.collectionID),
				zap.Int64("Actual ID", us.CollectionID),
				zap.String("Wanted Channel Name", vchanInfo.ChannelName),
				zap.String("Actual Channel Name", us.GetInsertChannel()),
			)
S
sunby 已提交
188 189 190
			continue
		}

191 192 193 194 195
		log.Info("Recover Segment NumOfRows form checkpoints",
			zap.String("InsertChannel", us.GetInsertChannel()),
			zap.Int64("SegmentID", us.GetID()),
			zap.Int64("NumOfRows", us.GetNumOfRows()),
		)
X
XuanYang-cn 已提交
196 197 198

		dsService.replica.addNormalSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel(),
			us.GetNumOfRows(), &segmentCheckPoint{us.GetNumOfRows(), *us.GetDmlPosition()})
S
sunby 已提交
199 200
	}

201 202 203
	dsService.fg.AddNode(dmStreamNode)
	dsService.fg.AddNode(ddNode)
	dsService.fg.AddNode(insertBufferNode)
G
godchen 已提交
204
	dsService.fg.AddNode(deleteNode)
X
XuanYang-cn 已提交
205 206

	// ddStreamNode
207
	err = dsService.fg.SetEdges(dmStreamNode.Name(),
X
XuanYang-cn 已提交
208 209 210 211
		[]string{},
		[]string{ddNode.Name()},
	)
	if err != nil {
212
		log.Error("set edges failed in node", zap.String("name", dmStreamNode.Name()), zap.Error(err))
213
		return err
X
XuanYang-cn 已提交
214 215 216 217
	}

	// ddNode
	err = dsService.fg.SetEdges(ddNode.Name(),
218 219
		[]string{dmStreamNode.Name()},
		[]string{insertBufferNode.Name()},
X
XuanYang-cn 已提交
220 221
	)
	if err != nil {
X
XuanYang-cn 已提交
222
		log.Error("set edges failed in node", zap.String("name", ddNode.Name()), zap.Error(err))
223
		return err
X
XuanYang-cn 已提交
224 225 226 227
	}

	// insertBufferNode
	err = dsService.fg.SetEdges(insertBufferNode.Name(),
228
		[]string{ddNode.Name()},
G
godchen 已提交
229
		[]string{deleteNode.Name()},
X
XuanYang-cn 已提交
230 231
	)
	if err != nil {
X
XuanYang-cn 已提交
232
		log.Error("set edges failed in node", zap.String("name", insertBufferNode.Name()), zap.Error(err))
233
		return err
X
XuanYang-cn 已提交
234
	}
G
godchen 已提交
235 236 237 238 239 240 241 242 243 244

	//deleteNode
	err = dsService.fg.SetEdges(deleteNode.Name(),
		[]string{insertBufferNode.Name()},
		[]string{},
	)
	if err != nil {
		log.Error("set edges failed in node", zap.String("name", deleteNode.Name()), zap.Error(err))
		return err
	}
245
	return nil
X
XuanYang-cn 已提交
246
}