data_sync_service.go 2.8 KB
Newer Older
N
neza2017 已提交
1
package querynode
2 3 4 5 6 7 8 9 10

import (
	"context"
	"log"

	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)

type dataSyncService struct {
D
dragondriver 已提交
11 12
	ctx context.Context
	fg  *flowgraph.TimeTickedFlowGraph
13

X
XuanYang-cn 已提交
14
	replica collectionReplica
15 16
}

X
XuanYang-cn 已提交
17
func newDataSyncService(ctx context.Context, replica collectionReplica) *dataSyncService {
18 19

	return &dataSyncService{
D
dragondriver 已提交
20 21
		ctx: ctx,
		fg:  nil,
22

D
dragondriver 已提交
23
		replica: replica,
24 25 26 27 28 29 30 31
	}
}

func (dsService *dataSyncService) start() {
	dsService.initNodes()
	dsService.fg.Start()
}

C
cai.zhang 已提交
32
func (dsService *dataSyncService) close() {
B
bigsheeper 已提交
33 34 35
	if dsService.fg != nil {
		dsService.fg.Close()
	}
C
cai.zhang 已提交
36 37
}

38 39 40 41 42
func (dsService *dataSyncService) initNodes() {
	// TODO: add delete pipeline support

	dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)

D
dragondriver 已提交
43
	var dmStreamNode Node = newDmInputNode(dsService.ctx)
44 45
	var ddStreamNode Node = newDDInputNode(dsService.ctx)

46
	var filterDmNode Node = newFilteredDmNode()
47 48
	var ddNode Node = newDDNode(dsService.replica)

D
dragondriver 已提交
49 50
	var insertNode Node = newInsertNode(dsService.replica)
	var serviceTimeNode Node = newServiceTimeNode(dsService.replica)
51
	var gcNode Node = newGCNode(dsService.replica)
52 53

	dsService.fg.AddNode(&dmStreamNode)
54 55
	dsService.fg.AddNode(&ddStreamNode)

56
	dsService.fg.AddNode(&filterDmNode)
57 58
	dsService.fg.AddNode(&ddNode)

59 60
	dsService.fg.AddNode(&insertNode)
	dsService.fg.AddNode(&serviceTimeNode)
61
	dsService.fg.AddNode(&gcNode)
62

63
	// dmStreamNode
64 65 66 67 68 69 70 71
	var err = dsService.fg.SetEdges(dmStreamNode.Name(),
		[]string{},
		[]string{filterDmNode.Name()},
	)
	if err != nil {
		log.Fatal("set edges failed in node:", dmStreamNode.Name())
	}

72 73 74 75 76 77 78 79 80 81
	// ddStreamNode
	err = dsService.fg.SetEdges(ddStreamNode.Name(),
		[]string{},
		[]string{ddNode.Name()},
	)
	if err != nil {
		log.Fatal("set edges failed in node:", ddStreamNode.Name())
	}

	// filterDmNode
82
	err = dsService.fg.SetEdges(filterDmNode.Name(),
83
		[]string{dmStreamNode.Name(), ddNode.Name()},
84 85 86 87 88 89
		[]string{insertNode.Name()},
	)
	if err != nil {
		log.Fatal("set edges failed in node:", filterDmNode.Name())
	}

90 91 92 93 94 95 96 97 98 99
	// ddNode
	err = dsService.fg.SetEdges(ddNode.Name(),
		[]string{ddStreamNode.Name()},
		[]string{filterDmNode.Name()},
	)
	if err != nil {
		log.Fatal("set edges failed in node:", ddNode.Name())
	}

	// insertNode
100 101 102 103 104 105 106 107
	err = dsService.fg.SetEdges(insertNode.Name(),
		[]string{filterDmNode.Name()},
		[]string{serviceTimeNode.Name()},
	)
	if err != nil {
		log.Fatal("set edges failed in node:", insertNode.Name())
	}

108
	// serviceTimeNode
109 110
	err = dsService.fg.SetEdges(serviceTimeNode.Name(),
		[]string{insertNode.Name()},
111
		[]string{gcNode.Name()},
112 113 114 115
	)
	if err != nil {
		log.Fatal("set edges failed in node:", serviceTimeNode.Name())
	}
116 117 118 119 120 121 122 123

	// gcNode
	err = dsService.fg.SetEdges(gcNode.Name(),
		[]string{serviceTimeNode.Name()},
		[]string{})
	if err != nil {
		log.Fatal("set edges failed in node:", gcNode.Name())
	}
124
}