input_node.go 1.1 KB
Newer Older
1 2 3
package flowgraph

import (
C
cai.zhang 已提交
4
	"log"
C
cai.zhang 已提交
5 6

	"github.com/zilliztech/milvus-distributed/internal/msgstream"
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
)

type InputNode struct {
	BaseNode
	inStream *msgstream.MsgStream
	name     string
}

func (inNode *InputNode) IsInputNode() bool {
	return true
}

func (inNode *InputNode) Name() string {
	return inNode.name
}

func (inNode *InputNode) InStream() *msgstream.MsgStream {
	return inNode.inStream
}

// empty input and return one *Msg
28
func (inNode *InputNode) Operate(in []*Msg) []*Msg {
C
cai.zhang 已提交
29
	//fmt.Println("Do InputNode operation")
30

31
	msgPack := (*inNode.inStream).Consume()
G
godchen 已提交
32

C
cai.zhang 已提交
33 34 35 36 37 38
	// TODO: add status
	if msgPack == nil {
		log.Println("null msg pack")
		return nil
	}

39 40 41 42 43 44 45 46 47
	var msgStreamMsg Msg = &MsgStreamMsg{
		tsMessages:   msgPack.Msgs,
		timestampMin: msgPack.BeginTs,
		timestampMax: msgPack.EndTs,
	}

	return []*Msg{&msgStreamMsg}
}

N
neza2017 已提交
48
func NewInputNode(inStream *msgstream.MsgStream, nodeName string, maxQueueLength int32, maxParallelism int32) *InputNode {
49
	baseNode := BaseNode{}
F
FluorineDog 已提交
50 51
	baseNode.SetMaxQueueLength(maxQueueLength)
	baseNode.SetMaxParallelism(maxParallelism)
52 53 54 55 56 57 58

	return &InputNode{
		BaseNode: baseNode,
		inStream: inStream,
		name:     nodeName,
	}
}