input_node.go 2.7 KB
Newer Older
X
Xiangyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12 13 14
package flowgraph

import (
15
	"github.com/milvus-io/milvus/internal/log"
X
Xiangyu Wang 已提交
16 17
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/util/trace"
18
	"github.com/opentracing/opentracing-go"
G
godchen 已提交
19
	oplog "github.com/opentracing/opentracing-go/log"
20
	"go.uber.org/zap"
21 22
)

23
// InputNode is the entry point of flowgragh
24 25
type InputNode struct {
	BaseNode
26
	inStream msgstream.MsgStream
27 28 29
	name     string
}

30
// IsInputNode returns whether Node is InputNode
31 32 33 34
func (inNode *InputNode) IsInputNode() bool {
	return true
}

35 36 37 38 39
func (inNode *InputNode) Start() {
	inNode.inStream.Start()
}

// Close implements node
40
func (inNode *InputNode) Close() {
41 42 43 44
	inNode.inStream.Close()
	log.Debug("message stream closed",
		zap.String("node name", inNode.name),
	)
45 46
}

47
// Name returns node name
48 49 50 51
func (inNode *InputNode) Name() string {
	return inNode.name
}

52 53
// InStream returns the internal MsgStream
func (inNode *InputNode) InStream() msgstream.MsgStream {
54 55 56 57
	return inNode.inStream
}

// empty input and return one *Msg
58
func (inNode *InputNode) Operate(in []Msg) []Msg {
59
	msgPack := inNode.inStream.Consume()
N
neza2017 已提交
60

C
cai.zhang 已提交
61 62
	// TODO: add status
	if msgPack == nil {
63 64 65 66 67
		return nil
	}
	var spans []opentracing.Span
	for _, msg := range msgPack.Msgs {
		sp, ctx := trace.StartSpanFromContext(msg.TraceCtx())
G
godchen 已提交
68
		sp.LogFields(oplog.String("input_node name", inNode.Name()))
69 70
		spans = append(spans, sp)
		msg.SetTraceCtx(ctx)
C
cai.zhang 已提交
71 72
	}

73
	var msgStreamMsg Msg = &MsgStreamMsg{
74 75 76 77
		tsMessages:     msgPack.Msgs,
		timestampMin:   msgPack.BeginTs,
		timestampMax:   msgPack.EndTs,
		startPositions: msgPack.StartPositions,
X
XuanYang-cn 已提交
78
		endPositions:   msgPack.EndPositions,
79 80
	}

81 82 83 84 85
	for _, span := range spans {
		span.Finish()
	}

	return []Msg{msgStreamMsg}
86 87
}

88 89
// NewInputNode composes an InputNode with provided MsgStream, name and parameters
func NewInputNode(inStream msgstream.MsgStream, nodeName string, maxQueueLength int32, maxParallelism int32) *InputNode {
90
	baseNode := BaseNode{}
F
FluorineDog 已提交
91 92
	baseNode.SetMaxQueueLength(maxQueueLength)
	baseNode.SetMaxParallelism(maxParallelism)
93 94 95 96 97 98 99

	return &InputNode{
		BaseNode: baseNode,
		inStream: inStream,
		name:     nodeName,
	}
}