util.go 2.1 KB
Newer Older
S
sunby 已提交
1 2 3 4 5 6 7 8 9 10
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
11
package datacoord
S
sunby 已提交
12 13

import (
14
	"context"
S
sunby 已提交
15
	"errors"
16 17
	"fmt"
	"time"
S
sunby 已提交
18

19
	"github.com/milvus-io/milvus/internal/log"
X
Xiangyu Wang 已提交
20
	"github.com/milvus-io/milvus/internal/proto/commonpb"
S
sunby 已提交
21 22 23 24 25 26
)

type Response interface {
	GetStatus() *commonpb.Status
}

S
sunby 已提交
27 28 29
var errNilResponse = errors.New("response is nil")
var errUnknownResponseType = errors.New("unknown response type")

S
sunby 已提交
30 31 32 33 34
func VerifyResponse(response interface{}, err error) error {
	if err != nil {
		return err
	}
	if response == nil {
S
sunby 已提交
35
		return errNilResponse
S
sunby 已提交
36 37 38
	}
	switch resp := response.(type) {
	case Response:
39
		if resp.GetStatus().ErrorCode != commonpb.ErrorCode_Success {
S
sunby 已提交
40 41 42
			return errors.New(resp.GetStatus().Reason)
		}
	case *commonpb.Status:
43
		if resp.ErrorCode != commonpb.ErrorCode_Success {
S
sunby 已提交
44 45 46
			return errors.New(resp.Reason)
		}
	default:
S
sunby 已提交
47
		return errUnknownResponseType
S
sunby 已提交
48 49 50
	}
	return nil
}
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90

// LongTermChecker checks we receive at least one msg in d duration. If not, checker
// will print a warn message.
type LongTermChecker struct {
	d    time.Duration
	t    *time.Ticker
	ctx  context.Context
	warn string
	name string
}

func NewLongTermChecker(ctx context.Context, name string, d time.Duration, warn string) *LongTermChecker {
	c := &LongTermChecker{
		name: name,
		ctx:  ctx,
		d:    d,
		warn: warn,
	}
	return c
}

func (c *LongTermChecker) Start() {
	c.t = time.NewTicker(c.d)
	go func() {
		for {
			select {
			case <-c.ctx.Done():
				log.Warn(fmt.Sprintf("long term checker [%s] shutdown", c.name))
				return
			case <-c.t.C:
				log.Warn(c.warn)
			}
		}
	}()
}

// Check reset the time ticker
func (c *LongTermChecker) Check() {
	c.t.Reset(c.d)
}