data_sync_service.go 4.5 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4
package datanode

import (
	"context"
5
	"time"
X
XuanYang-cn 已提交
6

X
XuanYang-cn 已提交
7
	etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
X
XuanYang-cn 已提交
8
	"github.com/zilliztech/milvus-distributed/internal/log"
G
groot 已提交
9
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
X
XuanYang-cn 已提交
10
	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
11
	"github.com/zilliztech/milvus-distributed/internal/util/retry"
X
XuanYang-cn 已提交
12
	"go.etcd.io/etcd/clientv3"
X
XuanYang-cn 已提交
13 14

	"go.uber.org/zap"
X
XuanYang-cn 已提交
15 16 17
)

type dataSyncService struct {
X
XuanYang-cn 已提交
18 19 20
	ctx         context.Context
	fg          *flowgraph.TimeTickedFlowGraph
	flushChan   chan *flushMsg
21
	replica     Replica
22
	idAllocator allocatorInterface
G
groot 已提交
23
	msFactory   msgstream.Factory
X
XuanYang-cn 已提交
24 25
}

X
XuanYang-cn 已提交
26
func newDataSyncService(ctx context.Context, flushChan chan *flushMsg,
27
	replica Replica, alloc allocatorInterface, factory msgstream.Factory) *dataSyncService {
28
	service := &dataSyncService{
X
XuanYang-cn 已提交
29 30 31 32 33
		ctx:         ctx,
		fg:          nil,
		flushChan:   flushChan,
		replica:     replica,
		idAllocator: alloc,
G
groot 已提交
34
		msFactory:   factory,
X
XuanYang-cn 已提交
35
	}
36
	return service
37 38
}

39 40
func (dsService *dataSyncService) init() {
	if len(Params.InsertChannelNames) == 0 {
X
XuanYang-cn 已提交
41
		log.Error("InsertChannels not readly, init datasync service failed")
42 43 44
		return
	}

B
bigsheeper 已提交
45
	dsService.initNodes()
46 47 48
}

func (dsService *dataSyncService) start() {
X
XuanYang-cn 已提交
49
	log.Debug("Data Sync Service Start Successfully")
X
XuanYang-cn 已提交
50 51 52 53 54 55 56 57 58 59 60
	dsService.fg.Start()
}

func (dsService *dataSyncService) close() {
	if dsService.fg != nil {
		dsService.fg.Close()
	}
}

func (dsService *dataSyncService) initNodes() {
	// TODO: add delete pipeline support
X
XuanYang-cn 已提交
61
	// New metaTable
62 63 64 65 66 67
	var mt *metaTable
	connectEtcdFn := func() error {
		etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
		if err != nil {
			return err
		}
X
XuanYang-cn 已提交
68

69 70 71 72 73 74 75
		etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
		mt, err = NewMetaTable(etcdKV)
		if err != nil {
			return err
		}
		return nil
	}
Z
zhenshan.cao 已提交
76
	err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
X
XuanYang-cn 已提交
77 78 79
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
80 81 82

	dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)

G
groot 已提交
83 84 85 86 87 88 89 90 91 92 93
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = dsService.msFactory.SetParams(m)
	if err != nil {
		panic(err)
	}

	var dmStreamNode Node = newDmInputNode(dsService.ctx, dsService.msFactory)
	var ddStreamNode Node = newDDInputNode(dsService.ctx, dsService.msFactory)
X
XuanYang-cn 已提交
94 95

	var filterDmNode Node = newFilteredDmNode()
X
XuanYang-cn 已提交
96
	var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica, dsService.idAllocator)
G
groot 已提交
97
	var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.idAllocator, dsService.msFactory)
X
XuanYang-cn 已提交
98 99
	var gcNode Node = newGCNode(dsService.replica)

100 101
	dsService.fg.AddNode(dmStreamNode)
	dsService.fg.AddNode(ddStreamNode)
X
XuanYang-cn 已提交
102

103 104
	dsService.fg.AddNode(filterDmNode)
	dsService.fg.AddNode(ddNode)
X
XuanYang-cn 已提交
105

106 107
	dsService.fg.AddNode(insertBufferNode)
	dsService.fg.AddNode(gcNode)
X
XuanYang-cn 已提交
108 109

	// dmStreamNode
X
XuanYang-cn 已提交
110
	err = dsService.fg.SetEdges(dmStreamNode.Name(),
X
XuanYang-cn 已提交
111 112 113 114
		[]string{},
		[]string{filterDmNode.Name()},
	)
	if err != nil {
X
XuanYang-cn 已提交
115 116
		log.Error("set edges failed in node", zap.String("name", dmStreamNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
117 118 119 120 121 122 123 124
	}

	// ddStreamNode
	err = dsService.fg.SetEdges(ddStreamNode.Name(),
		[]string{},
		[]string{ddNode.Name()},
	)
	if err != nil {
X
XuanYang-cn 已提交
125 126
		log.Error("set edges failed in node", zap.String("name", ddStreamNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
127 128 129 130 131 132 133 134
	}

	// filterDmNode
	err = dsService.fg.SetEdges(filterDmNode.Name(),
		[]string{dmStreamNode.Name(), ddNode.Name()},
		[]string{insertBufferNode.Name()},
	)
	if err != nil {
X
XuanYang-cn 已提交
135 136
		log.Error("set edges failed in node", zap.String("name", filterDmNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
137 138 139 140 141 142 143 144
	}

	// ddNode
	err = dsService.fg.SetEdges(ddNode.Name(),
		[]string{ddStreamNode.Name()},
		[]string{filterDmNode.Name()},
	)
	if err != nil {
X
XuanYang-cn 已提交
145 146
		log.Error("set edges failed in node", zap.String("name", ddNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
147 148 149 150 151 152 153 154
	}

	// insertBufferNode
	err = dsService.fg.SetEdges(insertBufferNode.Name(),
		[]string{filterDmNode.Name()},
		[]string{gcNode.Name()},
	)
	if err != nil {
X
XuanYang-cn 已提交
155 156
		log.Error("set edges failed in node", zap.String("name", insertBufferNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
157 158 159 160 161 162 163
	}

	// gcNode
	err = dsService.fg.SetEdges(gcNode.Name(),
		[]string{insertBufferNode.Name()},
		[]string{})
	if err != nil {
X
XuanYang-cn 已提交
164 165
		log.Error("set edges failed in node", zap.String("name", gcNode.Name()), zap.Error(err))
		panic("set edges faild in the node")
X
XuanYang-cn 已提交
166 167
	}
}