未验证 提交 c58a8d8f 编写于 作者: C cai.zhang 提交者: GitHub

Support bind IndexNode mode (#19247)

Signed-off-by: Ncai.zhang <cai.zhang@zilliz.com>
Signed-off-by: Ncai.zhang <cai.zhang@zilliz.com>
上级 6972efe6
......@@ -220,6 +220,12 @@ indexCoord:
minSegmentNumRowsToEnableIndex: 1024 # It's a threshold. When the segment num rows is less than this value, the segment will not be indexed
bindIndexNodeMode:
enable: false
address: "localhost:22930"
withCred: false
nodeID: 0
gc:
interval: 600 # gc interval in seconds
......
......@@ -41,7 +41,7 @@ type Client struct {
}
// NewClient creates a new IndexNode client.
func NewClient(ctx context.Context, addr string) (*Client, error) {
func NewClient(ctx context.Context, addr string, encryption bool) (*Client, error) {
if addr == "" {
return nil, fmt.Errorf("address is empty")
}
......@@ -64,6 +64,9 @@ func NewClient(ctx context.Context, addr string) (*Client, error) {
client.grpcClient.SetRole(typeutil.IndexNodeRole)
client.grpcClient.SetGetAddrFunc(client.getAddr)
client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
if encryption {
client.grpcClient.EnableEncryption()
}
return client, nil
}
......
......@@ -42,11 +42,11 @@ var ParamsGlobal paramtable.ComponentParam
func Test_NewClient(t *testing.T) {
ClientParams.InitOnce(typeutil.IndexNodeRole)
ctx := context.Background()
client, err := NewClient(ctx, "")
client, err := NewClient(ctx, "", false)
assert.Nil(t, client)
assert.NotNil(t, err)
client, err = NewClient(ctx, "test")
client, err = NewClient(ctx, "test", false)
assert.Nil(t, err)
assert.NotNil(t, client)
......@@ -143,7 +143,7 @@ func TestIndexNodeClient(t *testing.T) {
err = ins.Run()
assert.Nil(t, err)
inc, err := NewClient(ctx, "localhost:21121")
inc, err := NewClient(ctx, "localhost:21121", false)
assert.Nil(t, err)
assert.NotNil(t, inc)
......
......@@ -20,4 +20,7 @@ const (
// IndexAddTaskName is the name of the operation to add index task.
IndexAddTaskName = "IndexAddTask"
CreateIndexTaskName = "CreateIndexTask"
diskAnnIndex = "DISKANN"
invalidIndex = "invalid"
)
......@@ -193,15 +193,27 @@ func (i *IndexCoord) Init() error {
return
}
aliveNodeID := make([]UniqueID, 0)
for _, session := range sessions {
session := session
aliveNodeID = append(aliveNodeID, session.ServerID)
//go func() {
if err := i.nodeManager.AddNode(session.ServerID, session.Address); err != nil {
log.Error("IndexCoord", zap.Int64("ServerID", session.ServerID),
zap.Error(err))
if Params.IndexCoordCfg.BindIndexNodeMode {
if err = i.nodeManager.AddNode(Params.IndexCoordCfg.IndexNodeID, Params.IndexCoordCfg.IndexNodeAddress); err != nil {
log.Error("IndexCoord add node fail", zap.Int64("ServerID", Params.IndexCoordCfg.IndexNodeID),
zap.String("address", Params.IndexCoordCfg.IndexNodeAddress), zap.Error(err))
initErr = err
return
}
log.Debug("IndexCoord add node success", zap.String("IndexNode address", Params.IndexCoordCfg.IndexNodeAddress),
zap.Int64("nodeID", Params.IndexCoordCfg.IndexNodeID))
aliveNodeID = append(aliveNodeID, Params.IndexCoordCfg.IndexNodeID)
metrics.IndexCoordIndexNodeNum.WithLabelValues().Inc()
} else {
for _, session := range sessions {
session := session
if err := i.nodeManager.AddNode(session.ServerID, session.Address); err != nil {
log.Error("IndexCoord", zap.Int64("ServerID", session.ServerID),
zap.Error(err))
continue
}
aliveNodeID = append(aliveNodeID, session.ServerID)
}
//}()
}
log.Debug("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.GetAllClients())))
i.indexBuilder = newIndexBuilder(i.loopCtx, i, i.metaTable, aliveNodeID)
......@@ -954,6 +966,9 @@ func (i *IndexCoord) watchNodeLoop() {
}
return
}
if Params.IndexCoordCfg.BindIndexNodeMode {
continue
}
log.Debug("IndexCoord watchNodeLoop event updated")
switch event.EventType {
case sessionutil.SessionAddEvent:
......
......@@ -86,12 +86,17 @@ func (nm *NodeManager) AddNode(nodeID UniqueID, address string) error {
log.Warn("IndexCoord", zap.Any("Node client already exist with ID:", nodeID))
return nil
}
var (
nodeClient types.IndexNode
err error
)
nodeClient, err := grpcindexnodeclient.NewClient(context.TODO(), address)
nodeClient, err = grpcindexnodeclient.NewClient(context.TODO(), address, Params.IndexCoordCfg.WithCredential)
if err != nil {
log.Error("IndexCoord NodeManager", zap.Any("Add node err", err))
return err
}
err = nodeClient.Init()
if err != nil {
log.Error("IndexCoord NodeManager", zap.Any("Add node err", err))
......@@ -158,6 +163,61 @@ func (nm *NodeManager) PeekClient(meta *model.SegmentIndex) (UniqueID, types.Ind
return 0, nil
}
func (nm *NodeManager) ClientSupportDisk() bool {
log.Info("IndexCoord check if client support disk index")
allClients := nm.GetAllClients()
if len(allClients) == 0 {
log.Warn("there is no IndexNode online")
return false
}
// Note: In order to quickly end other goroutines, an error is returned when the client is successfully selected
ctx, cancel := context.WithCancel(nm.ctx)
var (
enableDisk = false
nodeMutex = sync.Mutex{}
wg = sync.WaitGroup{}
)
for nodeID, client := range allClients {
nodeID := nodeID
client := client
wg.Add(1)
go func() {
defer wg.Done()
resp, err := client.GetJobStats(ctx, &indexpb.GetJobStatsRequest{})
if err != nil {
log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID), zap.Error(err))
return
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("get IndexNode slots failed", zap.Int64("nodeID", nodeID),
zap.String("reason", resp.Status.Reason))
return
}
log.Debug("get job stats success", zap.Int64("nodeID", nodeID), zap.Bool("enable disk", resp.EnableDisk))
if resp.EnableDisk {
nodeMutex.Lock()
defer nodeMutex.Unlock()
cancel()
if !enableDisk {
enableDisk = true
}
return
}
}()
}
wg.Wait()
cancel()
if enableDisk {
log.Info("IndexNode support disk index")
return true
}
log.Error("all IndexNodes do not support disk indexes")
return false
}
func (nm *NodeManager) GetAllClients() map[UniqueID]types.IndexNode {
nm.lock.RLock()
defer nm.lock.RUnlock()
......
......@@ -19,6 +19,7 @@ package indexcoord
import (
"context"
"errors"
"sync"
"testing"
"github.com/milvus-io/milvus/api/commonpb"
......@@ -143,3 +144,108 @@ func TestNodeManager_PeekClient(t *testing.T) {
assert.Contains(t, []UniqueID{8, 9}, nodeID)
})
}
func TestNodeManager_ClientSupportDisk(t *testing.T) {
t.Run("support", func(t *testing.T) {
nm := &NodeManager{
ctx: context.Background(),
lock: sync.RWMutex{},
nodeClients: map[UniqueID]types.IndexNode{
1: &indexnode.Mock{
CallGetJobStats: func(ctx context.Context, in *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) {
return &indexpb.GetJobStatsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
TaskSlots: 1,
JobInfos: nil,
EnableDisk: true,
}, nil
},
},
},
}
support := nm.ClientSupportDisk()
assert.True(t, support)
})
t.Run("not support", func(t *testing.T) {
nm := &NodeManager{
ctx: context.Background(),
lock: sync.RWMutex{},
nodeClients: map[UniqueID]types.IndexNode{
1: &indexnode.Mock{
CallGetJobStats: func(ctx context.Context, in *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) {
return &indexpb.GetJobStatsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
TaskSlots: 1,
JobInfos: nil,
EnableDisk: false,
}, nil
},
},
},
}
support := nm.ClientSupportDisk()
assert.False(t, support)
})
t.Run("no indexnode", func(t *testing.T) {
nm := &NodeManager{
ctx: context.Background(),
lock: sync.RWMutex{},
nodeClients: map[UniqueID]types.IndexNode{},
}
support := nm.ClientSupportDisk()
assert.False(t, support)
})
t.Run("error", func(t *testing.T) {
nm := &NodeManager{
ctx: context.Background(),
lock: sync.RWMutex{},
nodeClients: map[UniqueID]types.IndexNode{
1: &indexnode.Mock{
CallGetJobStats: func(ctx context.Context, in *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) {
return nil, errors.New("error")
},
},
},
}
support := nm.ClientSupportDisk()
assert.False(t, support)
})
t.Run("fail reason", func(t *testing.T) {
nm := &NodeManager{
ctx: context.Background(),
lock: sync.RWMutex{},
nodeClients: map[UniqueID]types.IndexNode{
1: &indexnode.Mock{
CallGetJobStats: func(ctx context.Context, in *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) {
return &indexpb.GetJobStatsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "fail reason",
},
TaskSlots: 0,
JobInfos: nil,
EnableDisk: false,
}, nil
},
},
},
}
support := nm.ClientSupportDisk()
assert.False(t, support)
})
}
......@@ -126,6 +126,10 @@ func (cit *CreateIndexTask) OnEnqueue() error {
func (cit *CreateIndexTask) PreExecute(ctx context.Context) error {
log.Info("IndexCoord CreateIndexTask PreExecute", zap.Int64("collectionID", cit.req.CollectionID),
zap.Int64("fieldID", cit.req.FieldID), zap.String("indexName", cit.req.IndexName))
// TODO: check index type is disk index.
if GetIndexType(cit.req.GetIndexParams()) == diskAnnIndex && !cit.indexCoordClient.nodeManager.ClientSupportDisk() {
return errors.New("all IndexNodes do not support disk indexes, please verify")
}
return nil
}
......
......@@ -22,6 +22,7 @@ import (
"strconv"
"strings"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/schemapb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/indexpb"
......@@ -72,3 +73,12 @@ func parseBuildIDFromFilePath(key string) (UniqueID, error) {
func buildHandoffKey(collID, partID, segID UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d", util.HandoffSegmentPrefix, collID, partID, segID)
}
func GetIndexType(indexParams []*commonpb.KeyValuePair) string {
for _, param := range indexParams {
if param.Key == "index_type" {
return param.Value
}
}
return invalidIndex
}
......@@ -203,6 +203,7 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq
EnqueueJobNum: int64(unissued),
TaskSlots: int64(slots),
JobInfos: jobInfos,
EnableDisk: Params.IndexNodeCfg.EnableDisk,
}, nil
}
......
......@@ -252,4 +252,5 @@ message GetJobStatsResponse {
int64 enqueue_job_num = 4;
int64 task_slots = 5;
repeated JobInfo job_infos = 6;
bool enable_disk = 7;
}
......@@ -18,23 +18,23 @@ package grpcclient
import (
"context"
"crypto/tls"
"fmt"
"sync"
"time"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/crypto"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
grpcopentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/crypto"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
// GrpcClient abstracts client of grpc
......@@ -42,6 +42,7 @@ type GrpcClient interface {
SetRole(string)
GetRole() string
SetGetAddrFunc(func() (string, error))
EnableEncryption()
SetNewGrpcClientFunc(func(cc *grpc.ClientConn) interface{})
GetGrpcClient(ctx context.Context) (interface{}, error)
ReCall(ctx context.Context, caller func(client interface{}) (interface{}, error)) (interface{}, error)
......@@ -55,6 +56,7 @@ type ClientBase struct {
newGrpcClient func(cc *grpc.ClientConn) interface{}
grpcClient interface{}
encryption bool
conn *grpc.ClientConn
grpcClientMtx sync.RWMutex
role string
......@@ -87,6 +89,10 @@ func (c *ClientBase) SetGetAddrFunc(f func() (string, error)) {
c.getAddrFunc = f
}
func (c *ClientBase) EnableEncryption() {
c.encryption = true
}
// SetNewGrpcClientFunc sets newGrpcClient of client
func (c *ClientBase) SetNewGrpcClientFunc(f func(cc *grpc.ClientConn) interface{}) {
c.newGrpcClient = f
......@@ -157,34 +163,70 @@ func (c *ClientBase) connect(ctx context.Context) error {
}
}]}`, c.RetryServiceNameConfig, c.MaxAttempts, c.InitialBackoff, c.MaxBackoff, c.BackoffMultiplier)
conn, err := grpc.DialContext(
dialContext,
addr,
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(c.ClientMaxRecvSize),
grpc.MaxCallSendMsgSize(c.ClientMaxSendSize),
),
grpc.WithUnaryInterceptor(grpcopentracing.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(grpcopentracing.StreamClientInterceptor(opts...)),
grpc.WithDefaultServiceConfig(retryPolicy),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: c.KeepAliveTime,
Timeout: c.KeepAliveTimeout,
PermitWithoutStream: true,
}),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: 100 * time.Millisecond,
Multiplier: 1.6,
Jitter: 0.2,
MaxDelay: 3 * time.Second,
},
MinConnectTimeout: c.DialTimeout,
}),
grpc.WithPerRPCCredentials(&Token{Value: crypto.Base64Encode(util.MemberCredID)}),
)
var conn *grpc.ClientConn
if c.encryption {
conn, err = grpc.DialContext(
dialContext,
addr,
//grpc.WithInsecure(),
// #nosec G402
grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})),
grpc.WithBlock(),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(c.ClientMaxRecvSize),
grpc.MaxCallSendMsgSize(c.ClientMaxSendSize),
),
grpc.WithUnaryInterceptor(grpcopentracing.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(grpcopentracing.StreamClientInterceptor(opts...)),
grpc.WithDefaultServiceConfig(retryPolicy),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: c.KeepAliveTime,
Timeout: c.KeepAliveTimeout,
PermitWithoutStream: true,
}),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: 100 * time.Millisecond,
Multiplier: 1.6,
Jitter: 0.2,
MaxDelay: 3 * time.Second,
},
MinConnectTimeout: c.DialTimeout,
}),
grpc.WithPerRPCCredentials(&Token{Value: crypto.Base64Encode(util.MemberCredID)}),
)
} else {
conn, err = grpc.DialContext(
dialContext,
addr,
grpc.WithInsecure(),
//grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})),
grpc.WithBlock(),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(c.ClientMaxRecvSize),
grpc.MaxCallSendMsgSize(c.ClientMaxSendSize),
),
grpc.WithUnaryInterceptor(grpcopentracing.UnaryClientInterceptor(opts...)),
grpc.WithStreamInterceptor(grpcopentracing.StreamClientInterceptor(opts...)),
grpc.WithDefaultServiceConfig(retryPolicy),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: c.KeepAliveTime,
Timeout: c.KeepAliveTimeout,
PermitWithoutStream: true,
}),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: 100 * time.Millisecond,
Multiplier: 1.6,
Jitter: 0.2,
MaxDelay: 3 * time.Second,
},
MinConnectTimeout: c.DialTimeout,
}),
grpc.WithPerRPCCredentials(&Token{Value: crypto.Base64Encode(util.MemberCredID)}),
)
}
cancel()
if err != nil {
return wrapErrConnect(addr, err)
......
......@@ -53,6 +53,10 @@ func (c *GRPCClientBase) SetRole(role string) {
c.role = role
}
func (c *GRPCClientBase) EnableEncryption() {
}
func (c *GRPCClientBase) SetNewGrpcClientFunc(f func(cc *grpc.ClientConn) interface{}) {
c.newGrpcClient = f
}
......
......@@ -1325,6 +1325,11 @@ type indexCoordConfig struct {
Address string
Port int
BindIndexNodeMode bool
IndexNodeAddress string
WithCredential bool
IndexNodeID int64
MinSegmentNumRowsToEnableIndex int64
GCInterval time.Duration
......@@ -1338,6 +1343,10 @@ func (p *indexCoordConfig) init(base *BaseTable) {
p.initGCInterval()
p.initMinSegmentNumRowsToEnableIndex()
p.initBindIndexNodeMode()
p.initIndexNodeAddress()
p.initWithCredential()
p.initIndexNodeID()
}
func (p *indexCoordConfig) initMinSegmentNumRowsToEnableIndex() {
......@@ -1348,6 +1357,22 @@ func (p *indexCoordConfig) initGCInterval() {
p.GCInterval = time.Duration(p.Base.ParseInt64WithDefault("indexCoord.gc.interval", 60*10)) * time.Second
}
func (p *indexCoordConfig) initBindIndexNodeMode() {
p.BindIndexNodeMode = p.Base.ParseBool("indexCoord.bindIndexNodeMode.enable", false)
}
func (p *indexCoordConfig) initIndexNodeAddress() {
p.IndexNodeAddress = p.Base.LoadWithDefault("indexCoord.bindIndexNodeMode.address", "localhost:22930")
}
func (p *indexCoordConfig) initWithCredential() {
p.WithCredential = p.Base.ParseBool("indexCoord.bindIndexNodeMode.withCred", false)
}
func (p *indexCoordConfig) initIndexNodeID() {
p.IndexNodeID = p.Base.ParseInt64WithDefault("indexCoord.bindIndexNodeMode.nodeID", 0)
}
///////////////////////////////////////////////////////////////////////////////
// --- indexnode ---
type indexNodeConfig struct {
......
......@@ -318,6 +318,11 @@ func TestComponentParam(t *testing.T) {
Params.UpdatedTime = time.Now()
t.Logf("UpdatedTime: %v", Params.UpdatedTime)
assert.False(t, Params.BindIndexNodeMode)
assert.Equal(t, "localhost:22930", Params.IndexNodeAddress)
assert.False(t, Params.WithCredential)
assert.Equal(t, int64(0), Params.IndexNodeID)
})
t.Run("test indexNodeConfig", func(t *testing.T) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册