data_sync_service.go 4.4 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4
package datanode

import (
	"context"
5
	"time"
X
XuanYang-cn 已提交
6

X
XuanYang-cn 已提交
7
	etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
X
XuanYang-cn 已提交
8
	"github.com/zilliztech/milvus-distributed/internal/log"
G
groot 已提交
9
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
X
XuanYang-cn 已提交
10
	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
11
	"github.com/zilliztech/milvus-distributed/internal/util/retry"
X
XuanYang-cn 已提交
12
	"go.etcd.io/etcd/clientv3"
X
XuanYang-cn 已提交
13 14

	"go.uber.org/zap"
X
XuanYang-cn 已提交
15 16 17
)

type dataSyncService struct {
X
XuanYang-cn 已提交
18 19 20
	ctx         context.Context
	fg          *flowgraph.TimeTickedFlowGraph
	flushChan   chan *flushMsg
21
	replica     Replica
22
	idAllocator allocatorInterface
G
groot 已提交
23
	msFactory   msgstream.Factory
X
XuanYang-cn 已提交
24 25
}

X
XuanYang-cn 已提交
26
func newDataSyncService(ctx context.Context, flushChan chan *flushMsg,
27
	replica Replica, alloc allocatorInterface, factory msgstream.Factory) *dataSyncService {
28
	service := &dataSyncService{
X
XuanYang-cn 已提交
29 30 31 32 33
		ctx:         ctx,
		fg:          nil,
		flushChan:   flushChan,
		replica:     replica,
		idAllocator: alloc,
G
groot 已提交
34
		msFactory:   factory,
X
XuanYang-cn 已提交
35
	}
36
	return service
37 38
}

39 40
func (dsService *dataSyncService) init() {
	if len(Params.InsertChannelNames) == 0 {
X
XuanYang-cn 已提交
41
		log.Error("InsertChannels not readly, init datasync service failed")
42 43 44
		return
	}

B
bigsheeper 已提交
45
	dsService.initNodes()
46 47 48
}

func (dsService *dataSyncService) start() {
X
XuanYang-cn 已提交
49
	log.Debug("Data Sync Service Start Successfully")
X
XuanYang-cn 已提交
50 51 52 53 54 55 56 57 58 59 60
	dsService.fg.Start()
}

func (dsService *dataSyncService) close() {
	if dsService.fg != nil {
		dsService.fg.Close()
	}
}

func (dsService *dataSyncService) initNodes() {
	// TODO: add delete pipeline support
61 62
	var kvClient *clientv3.Client
	var err error
63
	connectEtcdFn := func() error {
64
		kvClient, err = clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
65 66 67 68 69
		if err != nil {
			return err
		}
		return nil
	}
70
	err = retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
X
XuanYang-cn 已提交
71 72 73
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
74

75 76 77 78
	etcdKV := etcdkv.NewEtcdKV(kvClient, Params.MetaRootPath)
	// New binlogMeta
	mt, _ := NewBinlogMeta(etcdKV, dsService.idAllocator)

X
XuanYang-cn 已提交
79 80
	dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)

G
groot 已提交
81 82 83 84 85 86 87 88 89 90 91
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = dsService.msFactory.SetParams(m)
	if err != nil {
		panic(err)
	}

	var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.msFactory)
	var ddStreamNode Node = newDDInputNode(dsService.ctx, dsService.msFactory)
X
XuanYang-cn 已提交
92 93

	var filterDmNode Node = newFilteredDmNode()
94 95
	var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica)
	var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.msFactory)
X
XuanYang-cn 已提交
96 97
	var gcNode Node = newGCNode(dsService.replica)

98 99
	dsService.fg.AddNode(dmStreamNode)
	dsService.fg.AddNode(ddStreamNode)
X
XuanYang-cn 已提交
100

101 102
	dsService.fg.AddNode(filterDmNode)
	dsService.fg.AddNode(ddNode)
X
XuanYang-cn 已提交
103

104 105
	dsService.fg.AddNode(insertBufferNode)
	dsService.fg.AddNode(gcNode)
X
XuanYang-cn 已提交
106 107

	// dmStreamNode
X
XuanYang-cn 已提交
108
	err = dsService.fg.SetEdges(dmStreamNode.Name(),
X
XuanYang-cn 已提交
109 110 111 112
		[]string{},
		[]string{filterDmNode.Name()},
	)
	if err != nil {
X
XuanYang-cn 已提交
113 114
		log.Error("set edges failed in node", zap.String("name", dmStreamNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
115 116 117 118 119 120 121 122
	}

	// ddStreamNode
	err = dsService.fg.SetEdges(ddStreamNode.Name(),
		[]string{},
		[]string{ddNode.Name()},
	)
	if err != nil {
X
XuanYang-cn 已提交
123 124
		log.Error("set edges failed in node", zap.String("name", ddStreamNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
125 126 127 128 129 130 131 132
	}

	// filterDmNode
	err = dsService.fg.SetEdges(filterDmNode.Name(),
		[]string{dmStreamNode.Name(), ddNode.Name()},
		[]string{insertBufferNode.Name()},
	)
	if err != nil {
X
XuanYang-cn 已提交
133 134
		log.Error("set edges failed in node", zap.String("name", filterDmNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
135 136 137 138 139 140 141 142
	}

	// ddNode
	err = dsService.fg.SetEdges(ddNode.Name(),
		[]string{ddStreamNode.Name()},
		[]string{filterDmNode.Name()},
	)
	if err != nil {
X
XuanYang-cn 已提交
143 144
		log.Error("set edges failed in node", zap.String("name", ddNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
145 146 147 148 149 150 151 152
	}

	// insertBufferNode
	err = dsService.fg.SetEdges(insertBufferNode.Name(),
		[]string{filterDmNode.Name()},
		[]string{gcNode.Name()},
	)
	if err != nil {
X
XuanYang-cn 已提交
153 154
		log.Error("set edges failed in node", zap.String("name", insertBufferNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
155 156 157 158 159 160 161
	}

	// gcNode
	err = dsService.fg.SetEdges(gcNode.Name(),
		[]string{insertBufferNode.Name()},
		[]string{})
	if err != nil {
X
XuanYang-cn 已提交
162 163
		log.Error("set edges failed in node", zap.String("name", gcNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
164 165
	}
}