未验证 提交 bd343550 编写于 作者: J Jiquan Long 提交者: GitHub

Support to manage connections (#24224)

Signed-off-by: Nlongjiquan <jiquan.long@zilliz.com>
上级 473c65cd
......@@ -20,7 +20,7 @@ require (
github.com/golang/protobuf v1.5.3
github.com/klauspost/compress v1.14.4
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230515081521-d963c95b041f
github.com/milvus-io/milvus-proto/go-api v0.0.0-20230517025117-8ba62a3f3a63
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
github.com/minio/minio-go/v7 v7.0.17
github.com/panjf2000/ants/v2 v2.7.2
......
此差异已折叠。
此差异已折叠。
......@@ -236,3 +236,21 @@ func (c *Client) SetRates(ctx context.Context, req *proxypb.SetRatesRequest) (*c
}
return ret.(*commonpb.Status), err
}
func (c *Client) ListClientInfos(ctx context.Context, req *proxypb.ListClientInfosRequest) (*proxypb.ListClientInfosResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client proxypb.ProxyClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.ListClientInfos(ctx, req)
})
if err != nil {
return nil, err
}
return ret.(*proxypb.ListClientInfosResponse), nil
}
......@@ -176,6 +176,7 @@ func (s *Server) startExternalGrpc(grpcPort int, errChan chan error) {
logutil.UnaryTraceLoggerInterceptor,
proxy.RateLimitInterceptor(limiter),
accesslog.UnaryAccessLoggerInterceptor,
proxy.KeepActiveInterceptor,
)),
}
......@@ -923,3 +924,11 @@ func (s *Server) ListIndexedSegment(ctx context.Context, req *federpb.ListIndexe
func (s *Server) DescribeSegmentIndexData(ctx context.Context, req *federpb.DescribeSegmentIndexDataRequest) (*federpb.DescribeSegmentIndexDataResponse, error) {
panic("TODO: implement me")
}
func (s *Server) Connect(ctx context.Context, req *milvuspb.ConnectRequest) (*milvuspb.ConnectResponse, error) {
return s.proxy.Connect(ctx, req)
}
func (s *Server) ListClientInfos(ctx context.Context, req *proxypb.ListClientInfosRequest) (*proxypb.ListClientInfosResponse, error) {
return s.proxy.ListClientInfos(ctx, req)
}
......@@ -514,6 +514,10 @@ func (m *MockProxy) Register() error {
return m.regErr
}
func (m *MockProxy) ListClientInfos(ctx context.Context, request *proxypb.ListClientInfosRequest) (*proxypb.ListClientInfosResponse, error) {
return nil, nil
}
func (m *MockProxy) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
return nil, nil
}
......@@ -852,6 +856,10 @@ func (m *MockProxy) RenameCollection(ctx context.Context, req *milvuspb.RenameCo
return nil, nil
}
func (m *MockProxy) Connect(ctx context.Context, req *milvuspb.ConnectRequest) (*milvuspb.ConnectResponse, error) {
return nil, nil
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type WaitOption struct {
......
......@@ -220,6 +220,53 @@ func (_c *Proxy_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1
return _c
}
// Connect provides a mock function with given fields: ctx, req
func (_m *Proxy) Connect(ctx context.Context, req *milvuspb.ConnectRequest) (*milvuspb.ConnectResponse, error) {
ret := _m.Called(ctx, req)
var r0 *milvuspb.ConnectResponse
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.ConnectRequest) *milvuspb.ConnectResponse); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*milvuspb.ConnectResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.ConnectRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Proxy_Connect_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Connect'
type Proxy_Connect_Call struct {
*mock.Call
}
// Connect is a helper method to define mock.On call
// - ctx context.Context
// - req *milvuspb.ConnectRequest
func (_e *Proxy_Expecter) Connect(ctx interface{}, req interface{}) *Proxy_Connect_Call {
return &Proxy_Connect_Call{Call: _e.mock.On("Connect", ctx, req)}
}
func (_c *Proxy_Connect_Call) Run(run func(ctx context.Context, req *milvuspb.ConnectRequest)) *Proxy_Connect_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.ConnectRequest))
})
return _c
}
func (_c *Proxy_Connect_Call) Return(_a0 *milvuspb.ConnectResponse, _a1 error) *Proxy_Connect_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// CreateAlias provides a mock function with given fields: ctx, request
func (_m *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
ret := _m.Called(ctx, request)
......@@ -2544,6 +2591,53 @@ func (_c *Proxy_InvalidateCredentialCache_Call) Return(_a0 *commonpb.Status, _a1
return _c
}
// ListClientInfos provides a mock function with given fields: ctx, req
func (_m *Proxy) ListClientInfos(ctx context.Context, req *proxypb.ListClientInfosRequest) (*proxypb.ListClientInfosResponse, error) {
ret := _m.Called(ctx, req)
var r0 *proxypb.ListClientInfosResponse
if rf, ok := ret.Get(0).(func(context.Context, *proxypb.ListClientInfosRequest) *proxypb.ListClientInfosResponse); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*proxypb.ListClientInfosResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *proxypb.ListClientInfosRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Proxy_ListClientInfos_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListClientInfos'
type Proxy_ListClientInfos_Call struct {
*mock.Call
}
// ListClientInfos is a helper method to define mock.On call
// - ctx context.Context
// - req *proxypb.ListClientInfosRequest
func (_e *Proxy_Expecter) ListClientInfos(ctx interface{}, req interface{}) *Proxy_ListClientInfos_Call {
return &Proxy_ListClientInfos_Call{Call: _e.mock.On("ListClientInfos", ctx, req)}
}
func (_c *Proxy_ListClientInfos_Call) Run(run func(ctx context.Context, req *proxypb.ListClientInfosRequest)) *Proxy_ListClientInfos_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*proxypb.ListClientInfosRequest))
})
return _c
}
func (_c *Proxy_ListClientInfos_Call) Return(_a0 *proxypb.ListClientInfosResponse, _a1 error) *Proxy_ListClientInfos_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// ListCredUsers provides a mock function with given fields: ctx, req
func (_m *Proxy) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
ret := _m.Called(ctx, req)
......
......@@ -20,6 +20,8 @@ service Proxy {
rpc RefreshPolicyInfoCache(RefreshPolicyInfoCacheRequest) returns (common.Status) {}
rpc GetProxyMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {}
rpc SetRates(SetRatesRequest) returns (common.Status) {}
rpc ListClientInfos(ListClientInfosRequest) returns (ListClientInfosResponse) {}
}
message InvalidateCollMetaCacheRequest {
......@@ -61,3 +63,12 @@ message SetRatesRequest {
common.MsgBase base = 1;
repeated CollectionRate rates = 2;
}
message ListClientInfosRequest {
common.MsgBase base = 1;
}
message ListClientInfosResponse {
common.Status status = 1;
repeated common.ClientInfo client_infos = 2;
}
此差异已折叠。
package proxy
import (
"context"
"time"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/pkg/log"
"go.uber.org/zap"
)
type clientInfo struct {
*commonpb.ClientInfo
identifier int64
lastActiveTime time.Time
}
func getLoggerOfClientInfo(info *commonpb.ClientInfo) []zap.Field {
fields := []zap.Field{
zap.String("sdk_type", info.GetSdkType()),
zap.String("sdk_version", info.GetSdkVersion()),
zap.String("local_time", info.GetLocalTime()),
zap.String("user", info.GetUser()),
zap.String("host", info.GetHost()),
}
for k, v := range info.GetReserved() {
fields = append(fields, zap.String(k, v))
}
return fields
}
func (c clientInfo) getLogger() []zap.Field {
fields := getLoggerOfClientInfo(c.ClientInfo)
fields = append(fields,
zap.Int64("identifier", c.identifier),
zap.Time("last_active_time", c.lastActiveTime),
)
return fields
}
func (c clientInfo) ctxLogRegister(ctx context.Context) {
log.Ctx(ctx).Info("client register", c.getLogger()...)
}
func (c clientInfo) logDeregister() {
log.Info("client deregister", c.getLogger()...)
}
package proxy
import (
"context"
"strconv"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/pkg/log"
)
const (
// we shouldn't check this too frequently.
defaultConnCheckDuration = 2 * time.Minute
defaultTTLForInactiveConn = 24 * time.Hour
)
type connectionManager struct {
mu sync.RWMutex
initOnce sync.Once
stopOnce sync.Once
closeSignal chan struct{}
wg sync.WaitGroup
buffer chan int64
duration time.Duration
ttl time.Duration
clientInfos map[int64]clientInfo
}
type connectionManagerOption func(s *connectionManager)
func withDuration(duration time.Duration) connectionManagerOption {
return func(s *connectionManager) {
s.duration = duration
}
}
func withTTL(ttl time.Duration) connectionManagerOption {
return func(s *connectionManager) {
s.ttl = ttl
}
}
func (s *connectionManager) apply(opts ...connectionManagerOption) {
for _, opt := range opts {
opt(s)
}
}
func (s *connectionManager) init() {
s.initOnce.Do(func() {
s.wg.Add(1)
go s.checkLoop()
})
}
func (s *connectionManager) stop() {
s.stopOnce.Do(func() {
close(s.closeSignal)
s.wg.Wait()
})
}
func (s *connectionManager) checkLoop() {
defer s.wg.Done()
t := time.NewTicker(s.duration)
defer t.Stop()
for {
select {
case <-s.closeSignal:
log.Info("connection manager closed")
return
case identifier := <-s.buffer:
s.update(identifier)
case <-t.C:
s.removeLongInactiveClients()
}
}
}
func (s *connectionManager) register(ctx context.Context, identifier int64, info *commonpb.ClientInfo) {
cli := clientInfo{
ClientInfo: info,
identifier: identifier,
lastActiveTime: time.Now(),
}
s.mu.Lock()
defer s.mu.Unlock()
s.clientInfos[identifier] = cli
cli.ctxLogRegister(ctx)
}
func (s *connectionManager) keepActive(identifier int64) {
// make this asynchronous and then the rpc won't be blocked too long.
s.buffer <- identifier
}
func (s *connectionManager) update(identifier int64) {
s.mu.Lock()
defer s.mu.Unlock()
cli, ok := s.clientInfos[identifier]
if ok {
cli.lastActiveTime = time.Now()
s.clientInfos[identifier] = cli
}
}
func (s *connectionManager) removeLongInactiveClients() {
s.mu.Lock()
defer s.mu.Unlock()
for candidate, cli := range s.clientInfos {
if time.Since(cli.lastActiveTime) > s.ttl {
cli.logDeregister()
delete(s.clientInfos, candidate)
}
}
}
func (s *connectionManager) list() []*commonpb.ClientInfo {
s.mu.RLock()
defer s.mu.RUnlock()
clients := make([]*commonpb.ClientInfo, 0, len(s.clientInfos))
for identifier, cli := range s.clientInfos {
if cli.ClientInfo != nil {
client := proto.Clone(cli.ClientInfo).(*commonpb.ClientInfo)
if client.Reserved == nil {
client.Reserved = make(map[string]string)
}
client.Reserved["identifier"] = string(strconv.AppendInt(nil, identifier, 10))
client.Reserved["last_active_time"] = cli.lastActiveTime.String()
clients = append(clients, client)
}
}
return clients
}
func newConnectionManager(opts ...connectionManagerOption) *connectionManager {
s := &connectionManager{
closeSignal: make(chan struct{}, 1),
buffer: make(chan int64, 64),
duration: defaultConnCheckDuration,
ttl: defaultTTLForInactiveConn,
clientInfos: make(map[int64]clientInfo),
}
s.apply(opts...)
s.init()
return s
}
var connectionManagerInstance *connectionManager
var getConnectionManagerInstanceOnce sync.Once
func GetConnectionManager() *connectionManager {
getConnectionManagerInstanceOnce.Do(func() {
connectionManagerInstance = newConnectionManager(
withDuration(defaultConnCheckDuration),
withTTL(defaultTTLForInactiveConn))
})
return connectionManagerInstance
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -29,6 +29,7 @@ type tsoAllocator interface {
}
// use timestampAllocatorInterface to keep other components testable
//go:generate mockery --name=timestampAllocatorInterface --filename=mock_tso_test.go --outpkg=proxy --output=. --inpackage --structname=mockTimestampAllocator --with-expecter
type timestampAllocatorInterface interface {
AllocTimestamp(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error)
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -435,6 +435,8 @@ func (node *Proxy) Stop() error {
// https://github.com/milvus-io/milvus/issues/12282
node.UpdateStateCode(commonpb.StateCode_Abnormal)
GetConnectionManager().stop()
return nil
}
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册