flow_graph_message.go 2.0 KB
Newer Older
N
neza2017 已提交
1
package querynode
2 3 4 5 6 7 8

import (
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)

type Msg = flowgraph.Msg
9
type MsgStreamMsg = flowgraph.MsgStreamMsg
10 11

type key2SegMsg struct {
X
xige-16 已提交
12
	tsMessages []msgstream.TsMsg
13 14 15
	timeRange  TimeRange
}

16 17 18
type ddMsg struct {
	collectionRecords map[string][]metaOperateRecord
	partitionRecords  map[string][]metaOperateRecord
19
	gcRecord          *gcRecord
20
	timeRange         TimeRange
21 22
}

B
bigsheeper 已提交
23 24 25 26 27
type metaOperateRecord struct {
	createOrDrop bool // create: true, drop: false
	timestamp    Timestamp
}

28
type insertMsg struct {
29
	insertMessages []*msgstream.InsertMsg
30
	gcRecord       *gcRecord
C
cai.zhang 已提交
31
	timeRange      TimeRange
32 33 34
}

type deleteMsg struct {
35
	deleteMessages []*msgstream.DeleteMsg
C
cai.zhang 已提交
36
	timeRange      TimeRange
37 38 39
}

type serviceTimeMsg struct {
40 41 42 43 44 45
	gcRecord  *gcRecord
	timeRange TimeRange
}

type gcMsg struct {
	gcRecord  *gcRecord
Z
zhenshan.cao 已提交
46
	timeRange TimeRange
47 48 49
}

type DeleteData struct {
Z
zhenshan.cao 已提交
50 51 52
	deleteIDs        map[UniqueID][]UniqueID
	deleteTimestamps map[UniqueID][]Timestamp
	deleteOffset     map[UniqueID]int64
53 54 55
}

type DeleteRecord struct {
Z
zhenshan.cao 已提交
56 57 58
	entityID  UniqueID
	timestamp Timestamp
	segmentID UniqueID
59 60 61 62 63 64 65
}

type DeletePreprocessData struct {
	deleteRecords []*DeleteRecord
	count         int32
}

66 67 68 69
// TODO: replace partitionWithID by partition id
type partitionWithID struct {
	partitionTag string
	collectionID UniqueID
70 71
}

72 73 74 75 76
type gcRecord struct {
	// collections and partitions to be dropped
	collections []UniqueID
	// TODO: use partition id
	partitions []partitionWithID
77 78
}

79 80
func (ksMsg *key2SegMsg) TimeTick() Timestamp {
	return ksMsg.timeRange.timestampMax
81 82
}

83 84
func (suMsg *ddMsg) TimeTick() Timestamp {
	return suMsg.timeRange.timestampMax
85 86 87 88 89 90 91 92 93 94 95 96 97 98
}

func (iMsg *insertMsg) TimeTick() Timestamp {
	return iMsg.timeRange.timestampMax
}

func (dMsg *deleteMsg) TimeTick() Timestamp {
	return dMsg.timeRange.timestampMax
}

func (stMsg *serviceTimeMsg) TimeTick() Timestamp {
	return stMsg.timeRange.timestampMax
}

99 100
func (gcMsg *gcMsg) TimeTick() Timestamp {
	return gcMsg.timeRange.timestampMax
101
}