未验证 提交 00b3fcb9 编写于 作者: J Jiquan Long 提交者: GitHub

Add log for flowgraph (#14441)

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 69087ff8
......@@ -26,6 +26,8 @@ import (
"syscall"
"time"
"github.com/milvus-io/milvus/internal/util/timerecord"
datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
......@@ -454,9 +456,9 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
ttMsgStream.Start()
go func() {
var checker *LongTermChecker
var checker *timerecord.LongTermChecker
if enableTtChecker {
checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
checker = timerecord.NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
checker.Start()
defer checker.Stop()
}
......
......@@ -19,10 +19,8 @@ package datacoord
import (
"context"
"errors"
"fmt"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/util/tsoutil"
)
......@@ -68,54 +66,6 @@ func FailResponse(status *commonpb.Status, reason string) {
status.Reason = reason
}
// LongTermChecker checks we receive at least one msg in d duration. If not, checker
// will print a warn message.
type LongTermChecker struct {
d time.Duration
t *time.Ticker
ch chan struct{}
warn string
name string
}
// NewLongTermChecker creates a long term checker specified name, checking interval and warning string to print
func NewLongTermChecker(ctx context.Context, name string, d time.Duration, warn string) *LongTermChecker {
c := &LongTermChecker{
name: name,
d: d,
warn: warn,
ch: make(chan struct{}),
}
return c
}
// Start starts the check process
func (c *LongTermChecker) Start() {
c.t = time.NewTicker(c.d)
go func() {
for {
select {
case <-c.ch:
log.Warn(fmt.Sprintf("long term checker [%s] shutdown", c.name))
return
case <-c.t.C:
log.Warn(c.warn)
}
}
}()
}
// Check resets the time ticker
func (c *LongTermChecker) Check() {
c.t.Reset(c.d)
}
// Stop stops the checker
func (c *LongTermChecker) Stop() {
c.t.Stop()
close(c.ch)
}
func getTimetravelReverseTime(ctx context.Context, allocator allocator) (*timetravel, error) {
ts, err := allocator.allocTimestamp(ctx)
if err != nil {
......
......@@ -12,14 +12,23 @@
package flowgraph
import (
"context"
"fmt"
"sync"
"time"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
const (
// TODO: better to be configured
nodeCtxTtInterval = 2 * time.Minute
enableTtChecker = true
)
// Node is the interface defines the behavior of flowgraph
type Node interface {
Name() string
......@@ -61,6 +70,17 @@ func (nodeCtx *nodeCtx) Start(wg *sync.WaitGroup) {
// 2. invoke node.Operate
// 3. deliver the Operate result to downstream nodes
func (nodeCtx *nodeCtx) work() {
// TODO: necessary to check every node?
name := fmt.Sprintf("nodeCtxTtChecker-%s", nodeCtx.node.Name())
warn := fmt.Sprintf("node %s haven't received input for %f minutes",
nodeCtx.node.Name(), nodeCtxTtInterval.Minutes())
var checker *timerecord.LongTermChecker
if enableTtChecker {
checker = timerecord.NewLongTermChecker(context.Background(), name, nodeCtxTtInterval, warn)
checker.Start()
defer checker.Stop()
}
for {
select {
case <-nodeCtx.closeCh:
......@@ -76,6 +96,10 @@ func (nodeCtx *nodeCtx) work() {
n := nodeCtx.node
res = n.Operate(inputs)
if enableTtChecker {
checker.Check()
}
downstreamLength := len(nodeCtx.downstreamInputChanIdx)
if len(nodeCtx.downstream) < downstreamLength {
log.Warn("", zap.Any("nodeCtx.downstream length", len(nodeCtx.downstream)))
......
......@@ -12,6 +12,8 @@
package timerecord
import (
"context"
"fmt"
"strconv"
"time"
......@@ -75,3 +77,51 @@ func (tr *TimeRecorder) printTimeRecord(msg string, span time.Duration) {
str += "ms)"
log.Debug(str)
}
// LongTermChecker checks we receive at least one msg in d duration. If not, checker
// will print a warn message.
type LongTermChecker struct {
d time.Duration
t *time.Ticker
ch chan struct{}
warn string
name string
}
// NewLongTermChecker creates a long term checker specified name, checking interval and warning string to print
func NewLongTermChecker(ctx context.Context, name string, d time.Duration, warn string) *LongTermChecker {
c := &LongTermChecker{
name: name,
d: d,
warn: warn,
ch: make(chan struct{}),
}
return c
}
// Start starts the check process
func (c *LongTermChecker) Start() {
c.t = time.NewTicker(c.d)
go func() {
for {
select {
case <-c.ch:
log.Warn(fmt.Sprintf("long term checker [%s] shutdown", c.name))
return
case <-c.t.C:
log.Warn(c.warn)
}
}
}()
}
// Check resets the time ticker
func (c *LongTermChecker) Check() {
c.t.Reset(c.d)
}
// Stop stops the checker
func (c *LongTermChecker) Stop() {
c.t.Stop()
close(c.ch)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册