data_sync_service.go 3.1 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4 5 6
package datanode

import (
	"context"
	"log"

X
XuanYang-cn 已提交
7
	etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
X
XuanYang-cn 已提交
8
	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
X
XuanYang-cn 已提交
9
	"go.etcd.io/etcd/clientv3"
X
XuanYang-cn 已提交
10 11 12
)

type dataSyncService struct {
X
XuanYang-cn 已提交
13 14 15 16 17
	ctx         context.Context
	fg          *flowgraph.TimeTickedFlowGraph
	flushChan   chan *flushMsg
	replica     collectionReplica
	idAllocator allocator
X
XuanYang-cn 已提交
18 19
}

X
XuanYang-cn 已提交
20
func newDataSyncService(ctx context.Context, flushChan chan *flushMsg,
X
XuanYang-cn 已提交
21
	replica collectionReplica, alloc allocator) *dataSyncService {
B
bigsheeper 已提交
22 23

	return &dataSyncService{
X
XuanYang-cn 已提交
24 25 26 27 28
		ctx:         ctx,
		fg:          nil,
		flushChan:   flushChan,
		replica:     replica,
		idAllocator: alloc,
X
XuanYang-cn 已提交
29
	}
30 31 32
}

func (dsService *dataSyncService) start() {
B
bigsheeper 已提交
33
	dsService.initNodes()
X
XuanYang-cn 已提交
34 35 36 37 38 39 40 41 42 43 44
	dsService.fg.Start()
}

func (dsService *dataSyncService) close() {
	if dsService.fg != nil {
		dsService.fg.Close()
	}
}

func (dsService *dataSyncService) initNodes() {
	// TODO: add delete pipeline support
X
XuanYang-cn 已提交
45 46 47 48 49 50 51 52 53 54 55
	// New metaTable
	etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
	if err != nil {
		panic(err)
	}

	etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
	mt, err := NewMetaTable(etcdKV)
	if err != nil {
		panic(err)
	}
X
XuanYang-cn 已提交
56 57 58 59 60 61 62

	dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)

	var dmStreamNode Node = newDmInputNode(dsService.ctx)
	var ddStreamNode Node = newDDInputNode(dsService.ctx)

	var filterDmNode Node = newFilteredDmNode()
B
bigsheeper 已提交
63

X
XuanYang-cn 已提交
64 65
	var ddNode Node = newDDNode(dsService.ctx, mt, dsService.flushChan, dsService.replica, dsService.idAllocator)
	var insertBufferNode Node = newInsertBufferNode(dsService.ctx, mt, dsService.replica, dsService.idAllocator)
X
XuanYang-cn 已提交
66 67 68 69 70 71 72 73 74 75 76 77
	var gcNode Node = newGCNode(dsService.replica)

	dsService.fg.AddNode(&dmStreamNode)
	dsService.fg.AddNode(&ddStreamNode)

	dsService.fg.AddNode(&filterDmNode)
	dsService.fg.AddNode(&ddNode)

	dsService.fg.AddNode(&insertBufferNode)
	dsService.fg.AddNode(&gcNode)

	// dmStreamNode
X
XuanYang-cn 已提交
78
	err = dsService.fg.SetEdges(dmStreamNode.Name(),
X
XuanYang-cn 已提交
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
		[]string{},
		[]string{filterDmNode.Name()},
	)
	if err != nil {
		log.Fatal("set edges failed in node:", dmStreamNode.Name())
	}

	// ddStreamNode
	err = dsService.fg.SetEdges(ddStreamNode.Name(),
		[]string{},
		[]string{ddNode.Name()},
	)
	if err != nil {
		log.Fatal("set edges failed in node:", ddStreamNode.Name())
	}

	// filterDmNode
	err = dsService.fg.SetEdges(filterDmNode.Name(),
		[]string{dmStreamNode.Name(), ddNode.Name()},
		[]string{insertBufferNode.Name()},
	)
	if err != nil {
		log.Fatal("set edges failed in node:", filterDmNode.Name())
	}

	// ddNode
	err = dsService.fg.SetEdges(ddNode.Name(),
		[]string{ddStreamNode.Name()},
		[]string{filterDmNode.Name()},
	)
	if err != nil {
		log.Fatal("set edges failed in node:", ddNode.Name())
	}

	// insertBufferNode
	err = dsService.fg.SetEdges(insertBufferNode.Name(),
		[]string{filterDmNode.Name()},
		[]string{gcNode.Name()},
	)
	if err != nil {
		log.Fatal("set edges failed in node:", insertBufferNode.Name())
	}

	// gcNode
	err = dsService.fg.SetEdges(gcNode.Name(),
		[]string{insertBufferNode.Name()},
		[]string{})
	if err != nil {
		log.Fatal("set edges failed in node:", gcNode.Name())
	}
}