data_sync_service.go 3.4 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

X
XuanYang-cn 已提交
12 13 14 15 16
package datanode

import (
	"context"

X
Xiangyu Wang 已提交
17 18
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
S
sunby 已提交
19
	"github.com/milvus-io/milvus/internal/proto/datapb"
X
Xiangyu Wang 已提交
20
	"github.com/milvus-io/milvus/internal/util/flowgraph"
X
XuanYang-cn 已提交
21 22

	"go.uber.org/zap"
X
XuanYang-cn 已提交
23 24 25
)

type dataSyncService struct {
S
sunby 已提交
26 27 28 29 30 31 32
	ctx          context.Context
	fg           *flowgraph.TimeTickedFlowGraph
	flushChan    <-chan *flushMsg
	replica      Replica
	idAllocator  allocatorInterface
	msFactory    msgstream.Factory
	collectionID UniqueID
X
XuanYang-cn 已提交
33 34
}

S
sunby 已提交
35 36 37 38 39 40 41
func newDataSyncService(ctx context.Context,
	flushChan <-chan *flushMsg,
	replica Replica,
	alloc allocatorInterface,
	factory msgstream.Factory,
	vchanPair *datapb.VchannelPair) *dataSyncService {

42
	service := &dataSyncService{
S
sunby 已提交
43 44 45 46 47 48 49
		ctx:          ctx,
		fg:           nil,
		flushChan:    flushChan,
		replica:      replica,
		idAllocator:  alloc,
		msFactory:    factory,
		collectionID: vchanPair.GetCollectionID(),
X
XuanYang-cn 已提交
50
	}
S
sunby 已提交
51 52

	service.initNodes(vchanPair)
53
	return service
54 55
}

56
func (dsService *dataSyncService) start() {
X
XuanYang-cn 已提交
57
	log.Debug("Data Sync Service Start Successfully")
58 59 60 61 62
	if dsService.fg != nil {
		dsService.fg.Start()
	} else {
		log.Debug("Data Sync Service flowgraph nil")
	}
X
XuanYang-cn 已提交
63 64 65 66 67 68 69 70
}

func (dsService *dataSyncService) close() {
	if dsService.fg != nil {
		dsService.fg.Close()
	}
}

S
sunby 已提交
71
func (dsService *dataSyncService) initNodes(vchanPair *datapb.VchannelPair) {
X
XuanYang-cn 已提交
72 73 74
	// TODO: add delete pipeline support
	dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)

G
groot 已提交
75 76 77
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
S
sunby 已提交
78 79 80
		"PulsarBufSize":  1024,
	}

81
	err := dsService.msFactory.SetParams(m)
G
groot 已提交
82 83 84 85
	if err != nil {
		panic(err)
	}

S
sunby 已提交
86
	var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.msFactory, vchanPair.GetDmlVchannelName(), vchanPair.GetDmlPosition())
87 88
	var ddNode Node = newDDNode()
	var insertBufferNode Node = newInsertBufferNode(dsService.ctx, dsService.replica, dsService.msFactory, dsService.idAllocator, dsService.flushChan)
X
XuanYang-cn 已提交
89

90 91 92
	dsService.fg.AddNode(dmStreamNode)
	dsService.fg.AddNode(ddNode)
	dsService.fg.AddNode(insertBufferNode)
X
XuanYang-cn 已提交
93 94

	// ddStreamNode
95
	err = dsService.fg.SetEdges(dmStreamNode.Name(),
X
XuanYang-cn 已提交
96 97 98 99
		[]string{},
		[]string{ddNode.Name()},
	)
	if err != nil {
100
		log.Error("set edges failed in node", zap.String("name", dmStreamNode.Name()), zap.Error(err))
X
XuanYang-cn 已提交
101
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
102 103 104 105
	}

	// ddNode
	err = dsService.fg.SetEdges(ddNode.Name(),
106 107
		[]string{dmStreamNode.Name()},
		[]string{insertBufferNode.Name()},
X
XuanYang-cn 已提交
108 109
	)
	if err != nil {
X
XuanYang-cn 已提交
110 111
		log.Error("set edges failed in node", zap.String("name", ddNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
112 113 114 115
	}

	// insertBufferNode
	err = dsService.fg.SetEdges(insertBufferNode.Name(),
116 117
		[]string{ddNode.Name()},
		[]string{},
X
XuanYang-cn 已提交
118 119
	)
	if err != nil {
X
XuanYang-cn 已提交
120 121
		log.Error("set edges failed in node", zap.String("name", insertBufferNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
122 123
	}
}