input_node.go 2.2 KB
Newer Older
X
Xiangyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12 13 14
package flowgraph

import (
X
Xiangyu Wang 已提交
15 16
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/util/trace"
17
	"github.com/opentracing/opentracing-go"
G
godchen 已提交
18
	oplog "github.com/opentracing/opentracing-go/log"
19 20 21 22 23 24 25 26 27 28 29 30
)

type InputNode struct {
	BaseNode
	inStream *msgstream.MsgStream
	name     string
}

func (inNode *InputNode) IsInputNode() bool {
	return true
}

31
func (inNode *InputNode) Close() {
B
bigsheeper 已提交
32
	// do nothing
33 34
}

35 36 37 38 39 40 41 42 43
func (inNode *InputNode) Name() string {
	return inNode.name
}

func (inNode *InputNode) InStream() *msgstream.MsgStream {
	return inNode.inStream
}

// empty input and return one *Msg
44
func (inNode *InputNode) Operate(in []Msg) []Msg {
C
cai.zhang 已提交
45
	//fmt.Println("Do InputNode operation")
G
godchen 已提交
46

47
	msgPack := (*inNode.inStream).Consume()
N
neza2017 已提交
48

C
cai.zhang 已提交
49 50
	// TODO: add status
	if msgPack == nil {
51 52 53 54 55
		return nil
	}
	var spans []opentracing.Span
	for _, msg := range msgPack.Msgs {
		sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
G
godchen 已提交
56
		sp.LogFields(oplog.String("input_node name", inNode.Name()))
57 58
		spans = append(spans, sp)
		msg.SetTraceCtx(ctx)
C
cai.zhang 已提交
59 60
	}

61
	var msgStreamMsg Msg = &MsgStreamMsg{
62 63 64 65
		tsMessages:     msgPack.Msgs,
		timestampMin:   msgPack.BeginTs,
		timestampMax:   msgPack.EndTs,
		startPositions: msgPack.StartPositions,
X
XuanYang-cn 已提交
66
		endPositions:   msgPack.EndPositions,
67 68
	}

69 70 71 72 73
	for _, span := range spans {
		span.Finish()
	}

	return []Msg{msgStreamMsg}
74 75
}

N
neza2017 已提交
76
func NewInputNode(inStream *msgstream.MsgStream, nodeName string, maxQueueLength int32, maxParallelism int32) *InputNode {
77
	baseNode := BaseNode{}
F
FluorineDog 已提交
78 79
	baseNode.SetMaxQueueLength(maxQueueLength)
	baseNode.SetMaxParallelism(maxParallelism)
80 81 82 83 84 85 86

	return &InputNode{
		BaseNode: baseNode,
		inStream: inStream,
		name:     nodeName,
	}
}