input_node.go 2.1 KB
Newer Older
1 2 3
package flowgraph

import (
4
	"fmt"
C
cai.zhang 已提交
5
	"log"
C
cai.zhang 已提交
6

7
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
8

9
	"github.com/opentracing/opentracing-go"
C
cai.zhang 已提交
10
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
)

type InputNode struct {
	BaseNode
	inStream *msgstream.MsgStream
	name     string
}

func (inNode *InputNode) IsInputNode() bool {
	return true
}

func (inNode *InputNode) Name() string {
	return inNode.name
}

func (inNode *InputNode) InStream() *msgstream.MsgStream {
	return inNode.inStream
}

// empty input and return one *Msg
32
func (inNode *InputNode) Operate([]*Msg) []*Msg {
C
cai.zhang 已提交
33
	//fmt.Println("Do InputNode operation")
G
godchen 已提交
34

B
bigsheeper 已提交
35
	msgPack := (*inNode.inStream).Consume()
N
neza2017 已提交
36

37 38 39 40
	var childs []opentracing.Span
	tracer := opentracing.GlobalTracer()
	if tracer != nil && msgPack != nil {
		for _, msg := range msgPack.Msgs {
41
			if msg.Type() == commonpb.MsgType_kInsert {
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
				var child opentracing.Span
				ctx := msg.GetMsgContext()
				if parent := opentracing.SpanFromContext(ctx); parent != nil {
					child = tracer.StartSpan(fmt.Sprintf("through msg input node, start time = %d", msg.BeginTs()),
						opentracing.FollowsFrom(parent.Context()))
				} else {
					child = tracer.StartSpan(fmt.Sprintf("through msg input node, start time = %d", msg.BeginTs()))
				}
				child.SetTag("hash keys", msg.HashKeys())
				child.SetTag("start time", msg.BeginTs())
				child.SetTag("end time", msg.EndTs())
				msg.SetMsgContext(opentracing.ContextWithSpan(ctx, child))
				childs = append(childs, child)
			}
		}
	}

C
cai.zhang 已提交
59 60 61 62 63 64
	// TODO: add status
	if msgPack == nil {
		log.Println("null msg pack")
		return nil
	}

65 66 67 68 69 70
	var msgStreamMsg Msg = &MsgStreamMsg{
		tsMessages:   msgPack.Msgs,
		timestampMin: msgPack.BeginTs,
		timestampMax: msgPack.EndTs,
	}

71 72 73 74
	for _, child := range childs {
		child.Finish()
	}

75 76 77
	return []*Msg{&msgStreamMsg}
}

N
neza2017 已提交
78
func NewInputNode(inStream *msgstream.MsgStream, nodeName string, maxQueueLength int32, maxParallelism int32) *InputNode {
79
	baseNode := BaseNode{}
F
FluorineDog 已提交
80 81
	baseNode.SetMaxQueueLength(maxQueueLength)
	baseNode.SetMaxParallelism(maxParallelism)
82 83 84 85 86 87 88

	return &InputNode{
		BaseNode: baseNode,
		inStream: inStream,
		name:     nodeName,
	}
}