data_sync_service.go 6.6 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

X
XuanYang-cn 已提交
12 13 14 15
package datanode

import (
	"context"
16
	"fmt"
X
XuanYang-cn 已提交
17

X
Xiangyu Wang 已提交
18 19
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
20
	"github.com/milvus-io/milvus/internal/proto/commonpb"
S
sunby 已提交
21
	"github.com/milvus-io/milvus/internal/proto/datapb"
22
	"github.com/milvus-io/milvus/internal/types"
X
Xiangyu Wang 已提交
23
	"github.com/milvus-io/milvus/internal/util/flowgraph"
X
XuanYang-cn 已提交
24 25

	"go.uber.org/zap"
X
XuanYang-cn 已提交
26 27 28
)

type dataSyncService struct {
29 30 31 32 33 34 35 36
	ctx          context.Context
	cancelFn     context.CancelFunc
	fg           *flowgraph.TimeTickedFlowGraph
	flushChan    <-chan *flushMsg
	replica      Replica
	idAllocator  allocatorInterface
	msFactory    msgstream.Factory
	collectionID UniqueID
37
	dataCoord    types.DataCoord
38
	clearSignal  chan<- UniqueID
X
XuanYang-cn 已提交
39 40
}

S
sunby 已提交
41 42 43 44 45
func newDataSyncService(ctx context.Context,
	flushChan <-chan *flushMsg,
	replica Replica,
	alloc allocatorInterface,
	factory msgstream.Factory,
46 47
	vchan *datapb.VchannelInfo,
	clearSignal chan<- UniqueID,
48
	dataCoord types.DataCoord,
49

50
) (*dataSyncService, error) {
51 52

	ctx1, cancel := context.WithCancel(ctx)
S
sunby 已提交
53

54
	service := &dataSyncService{
55 56 57 58 59 60 61 62
		ctx:          ctx1,
		cancelFn:     cancel,
		fg:           nil,
		flushChan:    flushChan,
		replica:      replica,
		idAllocator:  alloc,
		msFactory:    factory,
		collectionID: vchan.GetCollectionID(),
63
		dataCoord:    dataCoord,
64
		clearSignal:  clearSignal,
X
XuanYang-cn 已提交
65
	}
S
sunby 已提交
66

67 68 69 70
	if err := service.initNodes(vchan); err != nil {
		return nil, err
	}
	return service, nil
71 72
}

73
func (dsService *dataSyncService) start() {
74
	if dsService.fg != nil {
75
		log.Debug("Data Sync Service starting flowgraph")
76 77 78 79
		dsService.fg.Start()
	} else {
		log.Debug("Data Sync Service flowgraph nil")
	}
X
XuanYang-cn 已提交
80 81 82 83
}

func (dsService *dataSyncService) close() {
	if dsService.fg != nil {
84
		log.Debug("Data Sync Service closing flowgraph")
X
XuanYang-cn 已提交
85 86
		dsService.fg.Close()
	}
87 88

	dsService.cancelFn()
X
XuanYang-cn 已提交
89 90
}

91
func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) error {
X
XuanYang-cn 已提交
92 93 94
	// TODO: add delete pipeline support
	dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)

G
groot 已提交
95 96 97
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
S
sunby 已提交
98 99 100
		"PulsarBufSize":  1024,
	}

101
	err := dsService.msFactory.SetParams(m)
G
groot 已提交
102
	if err != nil {
103
		return err
G
groot 已提交
104 105
	}

N
neza2017 已提交
106
	saveBinlog := func(fu *segmentFlushUnit) error {
S
sunby 已提交
107
		id2path := []*datapb.FieldBinlog{}
108 109
		checkPoints := []*datapb.CheckPoint{}
		for k, v := range fu.field2Path {
S
sunby 已提交
110
			id2path = append(id2path, &datapb.FieldBinlog{FieldID: k, Binlogs: []string{v}})
111
		}
N
neza2017 已提交
112
		for k, v := range fu.checkPoint {
113 114 115
			v := v
			checkPoints = append(checkPoints, &datapb.CheckPoint{
				SegmentID: k,
N
neza2017 已提交
116 117
				NumOfRows: v.numRows,
				Position:  &v.pos,
118 119
			})
		}
S
sunby 已提交
120 121 122 123 124
		log.Debug("SaveBinlogPath",
			zap.Int64("SegmentID", fu.segID),
			zap.Int64("CollectionID", fu.collID),
			zap.Int("Length of Field2BinlogPaths", len(id2path)),
		)
125 126 127

		req := &datapb.SaveBinlogPathsRequest{
			Base: &commonpb.MsgBase{
S
sunby 已提交
128 129 130
				MsgType:   0, //TODO msg type
				MsgID:     0, //TODO msg id
				Timestamp: 0, //TODO time stamp
131 132 133
				SourceID:  Params.NodeID,
			},
			SegmentID:         fu.segID,
S
sunby 已提交
134
			CollectionID:      fu.collID,
135 136
			Field2BinlogPaths: id2path,
			CheckPoints:       checkPoints,
137
			StartPositions:    fu.startPositions,
138 139
			Flushed:           fu.flushed,
		}
140
		rsp, err := dsService.dataCoord.SaveBinlogPaths(dsService.ctx, req)
141
		if err != nil {
S
sunby 已提交
142
			return fmt.Errorf(err.Error())
143 144 145 146 147 148
		}
		if rsp.ErrorCode != commonpb.ErrorCode_Success {
			return fmt.Errorf("data service save bin log path failed, reason = %s", rsp.Reason)
		}
		return nil
	}
149

X
XuanYang-cn 已提交
150 151 152
	var dmStreamNode Node = newDmInputNode(
		dsService.ctx,
		dsService.msFactory,
153 154
		vchanInfo.CollectionID,
		vchanInfo.GetChannelName(),
X
XuanYang-cn 已提交
155 156 157
		vchanInfo.GetSeekPosition(),
	)
	var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo)
C
congqixia 已提交
158 159
	var insertBufferNode Node
	insertBufferNode, err = newInsertBufferNode(
N
neza2017 已提交
160 161 162 163 164
		dsService.ctx,
		dsService.replica,
		dsService.msFactory,
		dsService.idAllocator,
		dsService.flushChan,
165
		saveBinlog,
S
sunby 已提交
166
		vchanInfo.GetChannelName(),
N
neza2017 已提交
167
	)
C
congqixia 已提交
168 169 170
	if err != nil {
		return err
	}
X
XuanYang-cn 已提交
171

G
godchen 已提交
172 173 174 175 176
	var deleteNode Node = newDeleteDNode(
		dsService.ctx,
		dsService.replica,
	)

S
sunby 已提交
177 178 179 180
	// recover segment checkpoints
	for _, us := range vchanInfo.GetUnflushedSegments() {
		if us.CollectionID != dsService.collectionID ||
			us.GetInsertChannel() != vchanInfo.ChannelName {
181 182 183 184 185 186
			log.Warn("Collection ID or ChannelName not compact",
				zap.Int64("Wanted ID", dsService.collectionID),
				zap.Int64("Actual ID", us.CollectionID),
				zap.String("Wanted Channel Name", vchanInfo.ChannelName),
				zap.String("Actual Channel Name", us.GetInsertChannel()),
			)
S
sunby 已提交
187 188 189
			continue
		}

190 191 192 193 194
		log.Info("Recover Segment NumOfRows form checkpoints",
			zap.String("InsertChannel", us.GetInsertChannel()),
			zap.Int64("SegmentID", us.GetID()),
			zap.Int64("NumOfRows", us.GetNumOfRows()),
		)
X
XuanYang-cn 已提交
195 196 197

		dsService.replica.addNormalSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel(),
			us.GetNumOfRows(), &segmentCheckPoint{us.GetNumOfRows(), *us.GetDmlPosition()})
S
sunby 已提交
198 199
	}

200 201 202
	dsService.fg.AddNode(dmStreamNode)
	dsService.fg.AddNode(ddNode)
	dsService.fg.AddNode(insertBufferNode)
G
godchen 已提交
203
	dsService.fg.AddNode(deleteNode)
X
XuanYang-cn 已提交
204 205

	// ddStreamNode
206
	err = dsService.fg.SetEdges(dmStreamNode.Name(),
X
XuanYang-cn 已提交
207 208 209 210
		[]string{},
		[]string{ddNode.Name()},
	)
	if err != nil {
211
		log.Error("set edges failed in node", zap.String("name", dmStreamNode.Name()), zap.Error(err))
212
		return err
X
XuanYang-cn 已提交
213 214 215 216
	}

	// ddNode
	err = dsService.fg.SetEdges(ddNode.Name(),
217 218
		[]string{dmStreamNode.Name()},
		[]string{insertBufferNode.Name()},
X
XuanYang-cn 已提交
219 220
	)
	if err != nil {
X
XuanYang-cn 已提交
221
		log.Error("set edges failed in node", zap.String("name", ddNode.Name()), zap.Error(err))
222
		return err
X
XuanYang-cn 已提交
223 224 225 226
	}

	// insertBufferNode
	err = dsService.fg.SetEdges(insertBufferNode.Name(),
227
		[]string{ddNode.Name()},
G
godchen 已提交
228
		[]string{deleteNode.Name()},
X
XuanYang-cn 已提交
229 230
	)
	if err != nil {
X
XuanYang-cn 已提交
231
		log.Error("set edges failed in node", zap.String("name", insertBufferNode.Name()), zap.Error(err))
232
		return err
X
XuanYang-cn 已提交
233
	}
G
godchen 已提交
234 235 236 237 238 239 240 241 242 243

	//deleteNode
	err = dsService.fg.SetEdges(deleteNode.Name(),
		[]string{insertBufferNode.Name()},
		[]string{},
	)
	if err != nil {
		log.Error("set edges failed in node", zap.String("name", deleteNode.Name()), zap.Error(err))
		return err
	}
244
	return nil
X
XuanYang-cn 已提交
245
}