data_sync_service.go 5.1 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

X
XuanYang-cn 已提交
12 13 14 15
package datanode

import (
	"context"
16
	"time"
X
XuanYang-cn 已提交
17

X
Xiangyu Wang 已提交
18 19 20 21 22
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/util/flowgraph"
	"github.com/milvus-io/milvus/internal/util/retry"
X
XuanYang-cn 已提交
23
	"go.etcd.io/etcd/clientv3"
X
XuanYang-cn 已提交
24 25

	"go.uber.org/zap"
X
XuanYang-cn 已提交
26 27 28
)

type dataSyncService struct {
X
XuanYang-cn 已提交
29 30
	ctx         context.Context
	fg          *flowgraph.TimeTickedFlowGraph
X
XuanYang-cn 已提交
31
	flushChan   <-chan *flushMsg
32
	replica     Replica
33
	idAllocator allocatorInterface
G
groot 已提交
34
	msFactory   msgstream.Factory
X
XuanYang-cn 已提交
35 36
}

X
XuanYang-cn 已提交
37
func newDataSyncService(ctx context.Context, flushChan <-chan *flushMsg,
38
	replica Replica, alloc allocatorInterface, factory msgstream.Factory) *dataSyncService {
39
	service := &dataSyncService{
X
XuanYang-cn 已提交
40 41 42 43 44
		ctx:         ctx,
		fg:          nil,
		flushChan:   flushChan,
		replica:     replica,
		idAllocator: alloc,
G
groot 已提交
45
		msFactory:   factory,
X
XuanYang-cn 已提交
46
	}
47
	return service
48 49
}

50 51
func (dsService *dataSyncService) init() {
	if len(Params.InsertChannelNames) == 0 {
X
XuanYang-cn 已提交
52
		log.Error("InsertChannels not readly, init datasync service failed")
53 54 55
		return
	}

B
bigsheeper 已提交
56
	dsService.initNodes()
57 58 59
}

func (dsService *dataSyncService) start() {
X
XuanYang-cn 已提交
60
	log.Debug("Data Sync Service Start Successfully")
61 62 63 64 65
	if dsService.fg != nil {
		dsService.fg.Start()
	} else {
		log.Debug("Data Sync Service flowgraph nil")
	}
X
XuanYang-cn 已提交
66 67 68 69 70 71 72 73 74 75
}

func (dsService *dataSyncService) close() {
	if dsService.fg != nil {
		dsService.fg.Close()
	}
}

func (dsService *dataSyncService) initNodes() {
	// TODO: add delete pipeline support
76 77
	var kvClient *clientv3.Client
	var err error
78
	connectEtcdFn := func() error {
79
		kvClient, err = clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
80 81 82 83 84
		if err != nil {
			return err
		}
		return nil
	}
85
	err = retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
X
XuanYang-cn 已提交
86 87 88
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
89

90 91 92 93
	etcdKV := etcdkv.NewEtcdKV(kvClient, Params.MetaRootPath)
	// New binlogMeta
	mt, _ := NewBinlogMeta(etcdKV, dsService.idAllocator)

X
XuanYang-cn 已提交
94 95
	dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)

G
groot 已提交
96 97 98 99 100 101 102 103 104 105 106
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = dsService.msFactory.SetParams(m)
	if err != nil {
		panic(err)
	}

	var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.msFactory)
	var ddStreamNode Node = newDDInputNode(dsService.ctx, dsService.msFactory)
X
XuanYang-cn 已提交
107 108

	var filterDmNode Node = newFilteredDmNode()
X
XuanYang-cn 已提交
109
	var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica, dsService.idAllocator)
110
	var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.msFactory, dsService.idAllocator)
X
XuanYang-cn 已提交
111 112
	var gcNode Node = newGCNode(dsService.replica)

113 114
	dsService.fg.AddNode(dmStreamNode)
	dsService.fg.AddNode(ddStreamNode)
X
XuanYang-cn 已提交
115

116 117
	dsService.fg.AddNode(filterDmNode)
	dsService.fg.AddNode(ddNode)
X
XuanYang-cn 已提交
118

119 120
	dsService.fg.AddNode(insertBufferNode)
	dsService.fg.AddNode(gcNode)
X
XuanYang-cn 已提交
121 122

	// dmStreamNode
X
XuanYang-cn 已提交
123
	err = dsService.fg.SetEdges(dmStreamNode.Name(),
X
XuanYang-cn 已提交
124 125 126 127
		[]string{},
		[]string{filterDmNode.Name()},
	)
	if err != nil {
X
XuanYang-cn 已提交
128 129
		log.Error("set edges failed in node", zap.String("name", dmStreamNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
130 131 132 133 134 135 136 137
	}

	// ddStreamNode
	err = dsService.fg.SetEdges(ddStreamNode.Name(),
		[]string{},
		[]string{ddNode.Name()},
	)
	if err != nil {
X
XuanYang-cn 已提交
138 139
		log.Error("set edges failed in node", zap.String("name", ddStreamNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
140 141 142 143 144 145 146 147
	}

	// filterDmNode
	err = dsService.fg.SetEdges(filterDmNode.Name(),
		[]string{dmStreamNode.Name(), ddNode.Name()},
		[]string{insertBufferNode.Name()},
	)
	if err != nil {
X
XuanYang-cn 已提交
148 149
		log.Error("set edges failed in node", zap.String("name", filterDmNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
150 151 152 153 154 155 156 157
	}

	// ddNode
	err = dsService.fg.SetEdges(ddNode.Name(),
		[]string{ddStreamNode.Name()},
		[]string{filterDmNode.Name()},
	)
	if err != nil {
X
XuanYang-cn 已提交
158 159
		log.Error("set edges failed in node", zap.String("name", ddNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
160 161 162 163 164 165 166 167
	}

	// insertBufferNode
	err = dsService.fg.SetEdges(insertBufferNode.Name(),
		[]string{filterDmNode.Name()},
		[]string{gcNode.Name()},
	)
	if err != nil {
X
XuanYang-cn 已提交
168 169
		log.Error("set edges failed in node", zap.String("name", insertBufferNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
170 171 172 173 174 175 176
	}

	// gcNode
	err = dsService.fg.SetEdges(gcNode.Name(),
		[]string{insertBufferNode.Name()},
		[]string{})
	if err != nil {
X
XuanYang-cn 已提交
177 178
		log.Error("set edges failed in node", zap.String("name", gcNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
179 180
	}
}