client.go 6.5 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
X
XuanYang-cn 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
X
XuanYang-cn 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
X
XuanYang-cn 已提交
16

17
package grpcdatanodeclient
X
XuanYang-cn 已提交
18 19 20

import (
	"context"
G
godchen 已提交
21
	"fmt"
X
XuanYang-cn 已提交
22

X
Xiangyu Wang 已提交
23 24 25 26
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
27
	"github.com/milvus-io/milvus/internal/util/funcutil"
28
	"github.com/milvus-io/milvus/internal/util/grpcclient"
29 30
	"github.com/milvus-io/milvus/internal/util/paramtable"
	"github.com/milvus-io/milvus/internal/util/typeutil"
31
	"google.golang.org/grpc"
X
XuanYang-cn 已提交
32 33
)

34
var ClientParams paramtable.GrpcClientConfig
35

S
sunby 已提交
36
// Client is the grpc client for DataNode
X
XuanYang-cn 已提交
37
type Client struct {
38 39
	grpcClient grpcclient.GrpcClient
	addr       string
D
dragondriver 已提交
40 41
}

42
// NewClient creates a client for DataNode.
43
func NewClient(ctx context.Context, addr string) (*Client, error) {
G
godchen 已提交
44
	if addr == "" {
G
godchen 已提交
45
		return nil, fmt.Errorf("address is empty")
G
godchen 已提交
46
	}
47
	ClientParams.InitOnce(typeutil.DataNodeRole)
48
	client := &Client{
49 50
		addr: addr,
		grpcClient: &grpcclient.ClientBase{
51 52
			ClientMaxRecvSize: ClientParams.ClientMaxRecvSize,
			ClientMaxSendSize: ClientParams.ClientMaxSendSize,
53
		},
54
	}
55 56 57
	client.grpcClient.SetRole(typeutil.DataNodeRole)
	client.grpcClient.SetGetAddrFunc(client.getAddr)
	client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
58 59

	return client, nil
X
XuanYang-cn 已提交
60 61
}

62
// Init initializes the client.
N
neza2017 已提交
63
func (c *Client) Init() error {
64
	return nil
G
godchen 已提交
65 66
}

67 68
// Start starts the client.
// Currently, it does nothing.
N
neza2017 已提交
69
func (c *Client) Start() error {
X
XuanYang-cn 已提交
70
	return nil
X
XuanYang-cn 已提交
71 72
}

73 74
// Stop stops the client.
// Currently, it closes the grpc connection with the DataNode.
N
neza2017 已提交
75
func (c *Client) Stop() error {
76
	return c.grpcClient.Close()
X
XuanYang-cn 已提交
77 78
}

79
// Register does nothing.
80 81 82 83
func (c *Client) Register() error {
	return nil
}

84 85 86 87 88 89 90 91
func (c *Client) newGrpcClient(cc *grpc.ClientConn) interface{} {
	return datapb.NewDataNodeClient(cc)
}

func (c *Client) getAddr() (string, error) {
	return c.addr, nil
}

92
// GetComponentStates returns ComponentStates
G
godchen 已提交
93
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
94
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
95 96 97
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
98
		return client.(datapb.DataNodeClient).GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
G
godchen 已提交
99
	})
D
dragondriver 已提交
100 101 102
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
103
	return ret.(*internalpb.ComponentStates), err
N
neza2017 已提交
104 105
}

106 107
// GetStatisticsChannel return the statistics channel in string
// Statistics channel contains statistics infos of query nodes, such as segment infos, memory infos
N
neza2017 已提交
108
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
109
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
110 111 112
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
113
		return client.(datapb.DataNodeClient).GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
G
godchen 已提交
114
	})
D
dragondriver 已提交
115 116 117
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
118
	return ret.(*milvuspb.StringResponse), err
X
XuanYang-cn 已提交
119 120
}

121
// Deprecated
122
// WatchDmChannels create consumers on dmChannels to reveive Incremental data
G
godchen 已提交
123
func (c *Client) WatchDmChannels(ctx context.Context, req *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) {
124
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
125 126 127
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
128
		return client.(datapb.DataNodeClient).WatchDmChannels(ctx, req)
G
godchen 已提交
129
	})
D
dragondriver 已提交
130 131 132
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
133
	return ret.(*commonpb.Status), err
X
XuanYang-cn 已提交
134 135
}

136 137 138 139 140 141 142 143
// FlushSegments notifies DataNode to flush the segments req provids. The flush tasks are async to this
//  rpc, DataNode will flush the segments in the background.
//
// Return UnexpectedError code in status:
//     If DataNode isn't in HEALTHY: states not HEALTHY or dynamic checks not HEALTHY
//     If DataNode doesn't find the correspounding segmentID in its memeory replica
// Return Success code in status and trigers background flush:
//     Log an info log if a segment is under flushing
G
godchen 已提交
144
func (c *Client) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
145
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
146 147 148
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
149
		return client.(datapb.DataNodeClient).FlushSegments(ctx, req)
G
godchen 已提交
150
	})
D
dragondriver 已提交
151 152 153
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
154
	return ret.(*commonpb.Status), err
X
XuanYang-cn 已提交
155
}
156

157
// GetMetrics returns metrics
158
func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
159
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
160 161 162
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
163
		return client.(datapb.DataNodeClient).GetMetrics(ctx, req)
164
	})
D
dragondriver 已提交
165 166 167
	if err != nil || ret == nil {
		return nil, err
	}
168 169
	return ret.(*milvuspb.GetMetricsResponse), err
}
S
sunby 已提交
170

171
// Compaction return compaction by given plan
S
sunby 已提交
172
func (c *Client) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) {
173
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
174 175 176
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
177
		return client.(datapb.DataNodeClient).Compaction(ctx, req)
S
sunby 已提交
178 179 180 181 182 183
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*commonpb.Status), err
}
G
groot 已提交
184 185 186 187 188 189 190 191 192 193 194 195 196 197

// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (c *Client) Import(ctx context.Context, req *milvuspb.ImportRequest) (*commonpb.Status, error) {
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
		return client.(datapb.DataNodeClient).Import(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*commonpb.Status), err
}