input_node.go 1.5 KB
Newer Older
1 2 3
package flowgraph

import (
4
	"context"
S
sunby 已提交
5

6
	"github.com/opentracing/opentracing-go"
Z
zhenshan.cao 已提交
7

C
cai.zhang 已提交
8
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
9
	"github.com/zilliztech/milvus-distributed/internal/util/trace"
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
)

type InputNode struct {
	BaseNode
	inStream *msgstream.MsgStream
	name     string
}

func (inNode *InputNode) IsInputNode() bool {
	return true
}

func (inNode *InputNode) Name() string {
	return inNode.name
}

func (inNode *InputNode) InStream() *msgstream.MsgStream {
	return inNode.inStream
}

// empty input and return one *Msg
31
func (inNode *InputNode) Operate(ctx context.Context, msgs []Msg) ([]Msg, context.Context) {
C
cai.zhang 已提交
32
	//fmt.Println("Do InputNode operation")
G
godchen 已提交
33

34 35 36 37
	msgPack, ctx := (*inNode.inStream).Consume()

	sp, ctx := trace.StartSpanFromContext(ctx, opentracing.Tag{Key: "NodeName", Value: inNode.Name()})
	defer sp.Finish()
N
neza2017 已提交
38

C
cai.zhang 已提交
39 40
	// TODO: add status
	if msgPack == nil {
41
		return nil, ctx
C
cai.zhang 已提交
42 43
	}

44
	var msgStreamMsg Msg = &MsgStreamMsg{
45 46 47 48
		tsMessages:     msgPack.Msgs,
		timestampMin:   msgPack.BeginTs,
		timestampMax:   msgPack.EndTs,
		startPositions: msgPack.StartPositions,
X
XuanYang-cn 已提交
49
		endPositions:   msgPack.EndPositions,
50 51
	}

52
	return []Msg{msgStreamMsg}, ctx
53 54
}

N
neza2017 已提交
55
func NewInputNode(inStream *msgstream.MsgStream, nodeName string, maxQueueLength int32, maxParallelism int32) *InputNode {
56
	baseNode := BaseNode{}
F
FluorineDog 已提交
57 58
	baseNode.SetMaxQueueLength(maxQueueLength)
	baseNode.SetMaxParallelism(maxParallelism)
59 60 61 62 63 64 65

	return &InputNode{
		BaseNode: baseNode,
		inStream: inStream,
		name:     nodeName,
	}
}