input_node.go 1.1 KB
Newer Older
1 2 3
package flowgraph

import (
C
cai.zhang 已提交
4
	"log"
C
cai.zhang 已提交
5 6

	"github.com/zilliztech/milvus-distributed/internal/msgstream"
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
)

type InputNode struct {
	BaseNode
	inStream *msgstream.MsgStream
	name     string
}

func (inNode *InputNode) IsInputNode() bool {
	return true
}

func (inNode *InputNode) Name() string {
	return inNode.name
}

func (inNode *InputNode) InStream() *msgstream.MsgStream {
	return inNode.inStream
}

// empty input and return one *Msg
func (inNode *InputNode) Operate(in []*Msg) []*Msg {
C
cai.zhang 已提交
29 30
	//fmt.Println("Do InputNode operation")

31 32
	msgPack := (*inNode.inStream).Consume()

C
cai.zhang 已提交
33 34 35 36 37 38
	// TODO: add status
	if msgPack == nil {
		log.Println("null msg pack")
		return nil
	}

39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
	var msgStreamMsg Msg = &MsgStreamMsg{
		tsMessages:   msgPack.Msgs,
		timestampMin: msgPack.BeginTs,
		timestampMax: msgPack.EndTs,
	}

	return []*Msg{&msgStreamMsg}
}

func NewInputNode(inStream *msgstream.MsgStream, nodeName string) *InputNode {
	baseNode := BaseNode{}
	baseNode.SetMaxQueueLength(MaxQueueLength)
	baseNode.SetMaxParallelism(MaxParallelism)

	return &InputNode{
		BaseNode: baseNode,
		inStream: inStream,
		name:     nodeName,
	}
}