input_node.go 1.5 KB
Newer Older
1 2 3
package flowgraph

import (
4
	"context"
C
cai.zhang 已提交
5
	"log"
C
cai.zhang 已提交
6

S
sunby 已提交
7 8
	"errors"

9
	"github.com/opentracing/opentracing-go"
C
cai.zhang 已提交
10
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
11
	"github.com/zilliztech/milvus-distributed/internal/util/trace"
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
)

type InputNode struct {
	BaseNode
	inStream *msgstream.MsgStream
	name     string
}

func (inNode *InputNode) IsInputNode() bool {
	return true
}

func (inNode *InputNode) Name() string {
	return inNode.name
}

func (inNode *InputNode) InStream() *msgstream.MsgStream {
	return inNode.inStream
}

// empty input and return one *Msg
33
func (inNode *InputNode) Operate(ctx context.Context, msgs []Msg) ([]Msg, context.Context) {
C
cai.zhang 已提交
34
	//fmt.Println("Do InputNode operation")
G
godchen 已提交
35

36 37 38 39
	msgPack, ctx := (*inNode.inStream).Consume()

	sp, ctx := trace.StartSpanFromContext(ctx, opentracing.Tag{Key: "NodeName", Value: inNode.Name()})
	defer sp.Finish()
N
neza2017 已提交
40

C
cai.zhang 已提交
41 42 43
	// TODO: add status
	if msgPack == nil {
		log.Println("null msg pack")
44 45
		trace.LogError(sp, errors.New("null msg pack"))
		return nil, ctx
C
cai.zhang 已提交
46 47
	}

48
	var msgStreamMsg Msg = &MsgStreamMsg{
49 50 51 52
		tsMessages:     msgPack.Msgs,
		timestampMin:   msgPack.BeginTs,
		timestampMax:   msgPack.EndTs,
		startPositions: msgPack.StartPositions,
53 54
	}

55
	return []Msg{msgStreamMsg}, ctx
56 57
}

N
neza2017 已提交
58
func NewInputNode(inStream *msgstream.MsgStream, nodeName string, maxQueueLength int32, maxParallelism int32) *InputNode {
59
	baseNode := BaseNode{}
F
FluorineDog 已提交
60 61
	baseNode.SetMaxQueueLength(maxQueueLength)
	baseNode.SetMaxParallelism(maxParallelism)
62 63 64 65 66 67 68

	return &InputNode{
		BaseNode: baseNode,
		inStream: inStream,
		name:     nodeName,
	}
}