提交 c6950eb7 编写于 作者: Z zhenshan.cao 提交者: yefu.chen

Add log for allocator

Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 a532444d
......@@ -2,10 +2,13 @@ package allocator
import (
"context"
"errors"
"fmt"
"sync"
"time"
"errors"
"github.com/zilliztech/milvus-distributed/internal/log"
"go.uber.org/zap"
)
const (
......@@ -106,11 +109,13 @@ type Allocator struct {
TChan TickerChan
ForceSyncChan chan Request
SyncFunc func() bool
SyncFunc func() (bool, error)
ProcessFunc func(req Request) error
CheckSyncFunc func(timeout bool) bool
PickCanDoFunc func()
SyncErr error
Role string
}
func (ta *Allocator) Start() error {
......@@ -183,7 +188,7 @@ func (ta *Allocator) pickCanDo() {
func (ta *Allocator) sync(timeout bool) bool {
if ta.SyncFunc == nil || ta.CheckSyncFunc == nil {
ta.CanDoReqs = ta.ToDoReqs
ta.ToDoReqs = ta.ToDoReqs[0:0]
ta.ToDoReqs = nil
return true
}
if !timeout && len(ta.ToDoReqs) == 0 {
......@@ -193,7 +198,8 @@ func (ta *Allocator) sync(timeout bool) bool {
return false
}
ret := ta.SyncFunc()
var ret bool
ret, ta.SyncErr = ta.SyncFunc()
if !timeout {
ta.TChan.Reset()
......@@ -207,16 +213,28 @@ func (ta *Allocator) finishSyncRequest() {
req.Notify(nil)
}
}
ta.SyncReqs = ta.SyncReqs[0:0]
ta.SyncReqs = nil
}
func (ta *Allocator) failRemainRequest() {
var err error
if ta.SyncErr != nil {
err = fmt.Errorf("%s failRemainRequest err:%w", ta.Role, ta.SyncErr)
} else {
errMsg := fmt.Sprintf("%s failRemainRequest unexpected error", ta.Role)
err = errors.New(errMsg)
}
if len(ta.ToDoReqs) > 0 {
log.Debug("Allocator has some reqs to fail",
zap.Any("Role", ta.Role),
zap.Any("reqLen", len(ta.ToDoReqs)))
}
for _, req := range ta.ToDoReqs {
if req != nil {
req.Notify(errors.New("failed: unexpected error"))
req.Notify(err)
}
}
ta.ToDoReqs = []Request{}
ta.ToDoReqs = nil
}
func (ta *Allocator) finishRequest() {
......@@ -241,7 +259,8 @@ func (ta *Allocator) Close() {
ta.CancelFunc()
ta.wg.Wait()
ta.TChan.Close()
ta.revokeRequest(errors.New("closing"))
errMsg := fmt.Sprintf("%s is closing", ta.Role)
ta.revokeRequest(errors.New(errMsg))
}
func (ta *Allocator) CleanCache() {
......
......@@ -2,14 +2,16 @@ package allocator
import (
"context"
"log"
"fmt"
"time"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
......@@ -41,6 +43,7 @@ func NewIDAllocator(ctx context.Context, masterAddr string) (*IDAllocator, error
Allocator: Allocator{
Ctx: ctx1,
CancelFunc: cancel,
Role: "IDAllocator",
},
countPerRPC: IDCountPerRPC,
masterAddress: masterAddr,
......@@ -58,12 +61,11 @@ func (ia *IDAllocator) Start() error {
connectMasterFn := func() error {
return ia.connectMaster()
}
err := retry.Retry(10, time.Millisecond*200, connectMasterFn)
err := retry.Retry(1000, time.Millisecond*200, connectMasterFn)
if err != nil {
panic("connect to master failed")
}
ia.Allocator.Start()
return nil
return ia.Allocator.Start()
}
func (ia *IDAllocator) connectMaster() error {
......@@ -71,16 +73,31 @@ func (ia *IDAllocator) connectMaster() error {
defer cancel()
conn, err := grpc.DialContext(ctx, ia.masterAddress, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Printf("Connect to master failed, error= %v", err)
log.Error("Connect to master failed", zap.Any("Role", ia.Role), zap.Error(err))
return err
}
log.Printf("Connected to master, master_addr=%s", ia.masterAddress)
log.Debug("Connected to master", zap.Any("Role", ia.Role), zap.Any("masterAddress", ia.masterAddress))
ia.masterConn = conn
ia.masterClient = masterpb.NewMasterServiceClient(conn)
return nil
}
func (ia *IDAllocator) syncID() bool {
func (ia *IDAllocator) gatherReqIDCount() uint32 {
need := uint32(0)
for _, req := range ia.ToDoReqs {
tReq := req.(*IDRequest)
need += tReq.count
}
return need
}
func (ia *IDAllocator) syncID() (bool, error) {
need := ia.gatherReqIDCount()
if need < ia.countPerRPC {
need = ia.countPerRPC
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
req := &masterpb.AllocIDRequest{
Base: &commonpb.MsgBase{
......@@ -89,18 +106,17 @@ func (ia *IDAllocator) syncID() bool {
Timestamp: 0,
SourceID: ia.PeerID,
},
Count: ia.countPerRPC,
Count: need,
}
resp, err := ia.masterClient.AllocID(ctx, req)
cancel()
if err != nil {
log.Println("syncID Failed!!!!!")
return false
return false, fmt.Errorf("syncID Failed:%w", err)
}
ia.idStart = resp.GetID()
ia.idEnd = ia.idStart + int64(resp.GetCount())
return true
return true, nil
}
func (ia *IDAllocator) checkSyncFunc(timeout bool) bool {
......@@ -122,6 +138,10 @@ func (ia *IDAllocator) pickCanDoFunc() {
}
}
ia.ToDoReqs = ia.ToDoReqs[idx:]
log.Debug("IDAllocator pickCanDoFunc",
zap.Any("need", need),
zap.Any("total", total),
zap.Any("remainReqCnt", len(ia.ToDoReqs)))
}
func (ia *IDAllocator) processFunc(req Request) error {
......
......@@ -3,9 +3,11 @@ package allocator
import (
"context"
"fmt"
"log"
"time"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/util/retry"
......@@ -38,6 +40,7 @@ func NewTimestampAllocator(ctx context.Context, masterAddr string) (*TimestampAl
Allocator: Allocator{
Ctx: ctx1,
CancelFunc: cancel,
Role: "TimestampAllocator",
},
masterAddress: masterAddr,
countPerRPC: tsCountPerRPC,
......@@ -57,7 +60,7 @@ func (ta *TimestampAllocator) Start() error {
connectMasterFn := func() error {
return ta.connectMaster()
}
err := retry.Retry(10, time.Millisecond*200, connectMasterFn)
err := retry.Retry(1000, time.Millisecond*200, connectMasterFn)
if err != nil {
panic("Timestamp local allocator connect to master failed")
}
......@@ -70,10 +73,10 @@ func (ta *TimestampAllocator) connectMaster() error {
defer cancel()
conn, err := grpc.DialContext(ctx, ta.masterAddress, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Printf("Connect to master failed, error= %v", err)
log.Error("TimestampAllocator Connect to master failed", zap.Error(err))
return err
}
log.Printf("Connected to master, master_addr=%s", ta.masterAddress)
log.Debug("TimestampAllocator connected to master", zap.Any("masterAddress", ta.masterAddress))
ta.masterConn = conn
ta.masterClient = masterpb.NewMasterServiceClient(conn)
return nil
......@@ -98,9 +101,26 @@ func (ta *TimestampAllocator) pickCanDoFunc() {
}
}
ta.ToDoReqs = ta.ToDoReqs[idx:]
log.Debug("TimestampAllocator pickCanDoFunc",
zap.Any("need", need),
zap.Any("total", total),
zap.Any("remainReqCnt", len(ta.ToDoReqs)))
}
func (ta *TimestampAllocator) syncTs() bool {
func (ta *TimestampAllocator) gatherReqTsCount() uint32 {
need := uint32(0)
for _, req := range ta.ToDoReqs {
tReq := req.(*TSORequest)
need += tReq.count
}
return need
}
func (ta *TimestampAllocator) syncTs() (bool, error) {
need := ta.gatherReqTsCount()
if need < ta.countPerRPC {
need = ta.countPerRPC
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
req := &masterpb.AllocTimestampRequest{
Base: &commonpb.MsgBase{
......@@ -109,18 +129,18 @@ func (ta *TimestampAllocator) syncTs() bool {
Timestamp: 0,
SourceID: ta.PeerID,
},
Count: ta.countPerRPC,
Count: need,
}
resp, err := ta.masterClient.AllocTimestamp(ctx, req)
defer cancel()
cancel()
if err != nil {
log.Println("syncTimestamp Failed!!!!!")
return false
return false, fmt.Errorf("syncTimestamp Failed:%w", err)
}
ta.lastTsBegin = resp.GetTimestamp()
ta.lastTsEnd = ta.lastTsBegin + uint64(resp.GetCount())
return true
return true, nil
}
func (ta *TimestampAllocator) processFunc(req Request) error {
......
......@@ -138,6 +138,7 @@ func NewSegIDAssigner(ctx context.Context, dataService types.DataService, getTic
Allocator: Allocator{
Ctx: ctx1,
CancelFunc: cancel,
Role: "SegmentIDAllocator",
},
countPerRPC: SegCountPerRPC,
dataService: dataService,
......@@ -275,9 +276,9 @@ func (sa *SegIDAssigner) reduceSegReqs() {
sa.segReqs = newSegReqs
}
func (sa *SegIDAssigner) syncSegments() bool {
func (sa *SegIDAssigner) syncSegments() (bool, error) {
if len(sa.segReqs) == 0 {
return true
return true, nil
}
sa.reduceSegReqs()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
......@@ -292,8 +293,7 @@ func (sa *SegIDAssigner) syncSegments() bool {
resp, err := sa.dataService.AssignSegmentID(ctx, req)
if err != nil {
log.Debug("proxynode", zap.String("GRPC AssignSegmentID Failed", err.Error()))
return false
return false, fmt.Errorf("syncSegmentID Failed:%w", err)
}
now := time.Now()
......@@ -331,7 +331,7 @@ func (sa *SegIDAssigner) syncSegments() bool {
assign.lastInsertTime = now
success = true
}
return success
return success, nil
}
func (sa *SegIDAssigner) processFunc(req allocator.Request) error {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册