ClientProxy.cpp 14.0 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18 19
#include "sdk/grpc/ClientProxy.h"
#include "grpc/gen-milvus/milvus.grpc.pb.h"
S
starlord 已提交
20
#include "src/version.h"
S
starlord 已提交
21 22 23

#include <memory>
#include <string>
S
starlord 已提交
24
#include <vector>
S
starlord 已提交
25

K
kun yu 已提交
26
//#define GRPC_MULTIPLE_THREAD;
K
kun yu 已提交
27 28

namespace milvus {
K
kun yu 已提交
29
bool
S
starlord 已提交
30
UriCheck(const std::string& uri) {
S
starlord 已提交
31
    size_t index = uri.find_first_of(':', 0);
S
starlord 已提交
32
    return (index != std::string::npos);
K
kun yu 已提交
33 34
}

K
kun yu 已提交
35
Status
S
starlord 已提交
36
ClientProxy::Connect(const ConnectParam& param) {
K
kun yu 已提交
37 38 39 40 41
    std::string uri = param.ip_address + ":" + param.port;

    channel_ = ::grpc::CreateChannel(uri, ::grpc::InsecureChannelCredentials());
    if (channel_ != nullptr) {
        connected_ = true;
K
kun yu 已提交
42 43
        client_ptr_ = std::make_shared<GrpcClient>(channel_);
        return Status::OK();
K
kun yu 已提交
44
    }
S
starlord 已提交
45 46 47 48

    std::string reason = "connect failed!";
    connected_ = false;
    return Status(StatusCode::NotConnected, reason);
K
kun yu 已提交
49 50 51
}

Status
S
starlord 已提交
52
ClientProxy::Connect(const std::string& uri) {
K
kun yu 已提交
53
    if (!UriCheck(uri)) {
S
starlord 已提交
54
        return Status(StatusCode::InvalidAgument, "Invalid uri");
K
kun yu 已提交
55
    }
K
kun yu 已提交
56
    size_t index = uri.find_first_of(':', 0);
K
kun yu 已提交
57 58 59 60 61 62 63 64 65 66 67 68

    ConnectParam param;
    param.ip_address = uri.substr(0, index);
    param.port = uri.substr(index + 1);

    return Connect(param);
}

Status
ClientProxy::Connected() const {
    try {
        std::string info;
Y
Yu Kun 已提交
69
        return client_ptr_->Cmd(info, "");
S
starlord 已提交
70
    } catch (std::exception& ex) {
K
kun yu 已提交
71 72 73 74 75 76
        return Status(StatusCode::NotConnected, "connection lost: " + std::string(ex.what()));
    }
}

Status
ClientProxy::Disconnect() {
K
kun yu 已提交
77
    try {
K
kun yu 已提交
78
        Status status = client_ptr_->Disconnect();
K
kun yu 已提交
79 80
        connected_ = false;
        channel_.reset();
K
kun yu 已提交
81
        return status;
S
starlord 已提交
82
    } catch (std::exception& ex) {
K
kun yu 已提交
83 84
        return Status(StatusCode::UnknownError, "failed to disconnect: " + std::string(ex.what()));
    }
K
kun yu 已提交
85 86 87 88
}

std::string
ClientProxy::ClientVersion() const {
K
kun yu 已提交
89
    return MILVUS_VERSION;
K
kun yu 已提交
90 91 92
}

Status
S
starlord 已提交
93
ClientProxy::CreateTable(const TableSchema& param) {
K
kun yu 已提交
94 95
    try {
        ::milvus::grpc::TableSchema schema;
96
        schema.set_table_name(param.table_name);
K
kun yu 已提交
97
        schema.set_dimension(param.dimension);
98
        schema.set_index_file_size(param.index_file_size);
S
starlord 已提交
99
        schema.set_metric_type(static_cast<int32_t>(param.metric_type));
K
kun yu 已提交
100

K
kun yu 已提交
101
        return client_ptr_->CreateTable(schema);
S
starlord 已提交
102
    } catch (std::exception& ex) {
K
kun yu 已提交
103 104 105 106 107
        return Status(StatusCode::UnknownError, "failed to create table: " + std::string(ex.what()));
    }
}

bool
S
starlord 已提交
108
ClientProxy::HasTable(const std::string& table_name) {
K
kun yu 已提交
109
    Status status = Status::OK();
K
kun yu 已提交
110 111
    ::milvus::grpc::TableName grpc_table_name;
    grpc_table_name.set_table_name(table_name);
K
kun yu 已提交
112 113
    bool result = client_ptr_->HasTable(grpc_table_name, status);
    return result;
K
kun yu 已提交
114 115 116
}

Status
S
starlord 已提交
117
ClientProxy::DropTable(const std::string& table_name) {
K
kun yu 已提交
118 119 120
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
K
kun yu 已提交
121
        return client_ptr_->DropTable(grpc_table_name);
S
starlord 已提交
122
    } catch (std::exception& ex) {
K
kun yu 已提交
123 124 125 126 127
        return Status(StatusCode::UnknownError, "failed to drop table: " + std::string(ex.what()));
    }
}

Status
S
starlord 已提交
128
ClientProxy::CreateIndex(const IndexParam& index_param) {
K
kun yu 已提交
129
    try {
Y
Yu Kun 已提交
130
        ::milvus::grpc::IndexParam grpc_index_param;
131
        grpc_index_param.set_table_name(index_param.table_name);
S
starlord 已提交
132
        grpc_index_param.mutable_index()->set_index_type(static_cast<int32_t>(index_param.index_type));
133
        grpc_index_param.mutable_index()->set_nlist(index_param.nlist);
Y
Yu Kun 已提交
134
        return client_ptr_->CreateIndex(grpc_index_param);
S
starlord 已提交
135
    } catch (std::exception& ex) {
K
kun yu 已提交
136 137 138 139
        return Status(StatusCode::UnknownError, "failed to build index: " + std::string(ex.what()));
    }
}

K
kun yu 已提交
140
Status
S
starlord 已提交
141 142
ClientProxy::Insert(const std::string& table_name, const std::vector<RowRecord>& record_array,
                    std::vector<int64_t>& id_array) {
K
kun yu 已提交
143
    Status status = Status::OK();
K
kun yu 已提交
144 145
    try {
////////////////////////////////////////////////////////////////////////////
K
kun yu 已提交
146
#ifdef GRPC_MULTIPLE_THREAD
K
kun yu 已提交
147 148 149
        std::vector<std::thread> threads;
        int thread_count = 10;

K
kun yu 已提交
150
        std::shared_ptr<::milvus::grpc::InsertInfos> insert_info_array(
S
starlord 已提交
151
            new ::milvus::grpc::InsertInfos[thread_count], std::default_delete<::milvus::grpc::InsertInfos[]>());
K
kun yu 已提交
152

S
starlord 已提交
153 154
        std::shared_ptr<::milvus::grpc::VectorIds> vector_ids_array(new ::milvus::grpc::VectorIds[thread_count],
                                                                    std::default_delete<::milvus::grpc::VectorIds[]>());
K
kun yu 已提交
155

K
kun yu 已提交
156 157 158
        int64_t record_count = record_array.size() / thread_count;

        for (size_t i = 0; i < thread_count; i++) {
K
kun yu 已提交
159
            insert_info_array.get()[i].set_table_name(table_name);
K
kun yu 已提交
160
            for (size_t j = i * record_count; j < record_count * (i + 1); j++) {
S
starlord 已提交
161
                ::milvus::grpc::RowRecord* grpc_record = insert_info_array.get()[i].add_row_record_array();
K
kun yu 已提交
162 163 164 165 166 167 168 169 170
                for (size_t k = 0; k < record_array[j].data.size(); k++) {
                    grpc_record->add_vector_data(record_array[j].data[k]);
                }
            }
        }

        std::cout << "*****************************************************\n";
        auto start = std::chrono::high_resolution_clock::now();
        for (size_t j = 0; j < thread_count; j++) {
S
starlord 已提交
171 172
            threads.push_back(std::thread(&GrpcClient::InsertVector, client_ptr_, std::ref(vector_ids_array.get()[j]),
                                          std::ref(insert_info_array.get()[j]), std::ref(status)));
K
kun yu 已提交
173 174 175
        }
        std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
        auto finish = std::chrono::high_resolution_clock::now();
S
starlord 已提交
176 177
        std::cout << "InsertVector cost: "
                  << std::chrono::duration_cast<std::chrono::duration<double>>(finish - start).count() << "s\n";
K
kun yu 已提交
178 179 180
        std::cout << "*****************************************************\n";

        for (size_t i = 0; i < thread_count; i++) {
K
kun yu 已提交
181 182
            for (size_t j = 0; j < vector_ids_array.get()[i].vector_id_array_size(); j++) {
                id_array.push_back(vector_ids_array.get()[i].vector_id_array(j));
K
kun yu 已提交
183 184
            }
        }
K
kun yu 已提交
185
#else
Y
Yu Kun 已提交
186 187
        ::milvus::grpc::InsertParam insert_param;
        insert_param.set_table_name(table_name);
K
kun yu 已提交
188

S
starlord 已提交
189 190
        for (auto& record : record_array) {
            ::milvus::grpc::RowRecord* grpc_record = insert_param.add_row_record_array();
191
            grpc_record->add_vector_data(record.data.begin(), record.data.end());
K
kun yu 已提交
192 193
        }

S
starlord 已提交
194
        // Single thread
Y
Yu Kun 已提交
195 196
        ::milvus::grpc::VectorIds vector_ids;
        if (!id_array.empty()) {
197
            /* set user's ids */
198
            insert_param.add_row_id_array(id_array.begin(), id_array.end());
Y
Yu Kun 已提交
199 200 201
            client_ptr_->Insert(vector_ids, insert_param, status);
        } else {
            client_ptr_->Insert(vector_ids, insert_param, status);
202 203
            /* return Milvus generated ids back to user */
            id_array.insert(id_array.end(), vector_ids.vector_id_array().begin(), vector_ids.vector_id_array().end());
K
kun yu 已提交
204 205
        }
#endif
S
starlord 已提交
206
    } catch (std::exception& ex) {
K
kun yu 已提交
207
        return Status(StatusCode::UnknownError, "fail to add vector: " + std::string(ex.what()));
K
kun yu 已提交
208 209
    }

K
kun yu 已提交
210
    return status;
K
kun yu 已提交
211 212 213
}

Status
S
starlord 已提交
214 215
ClientProxy::Search(const std::string& table_name, const std::vector<RowRecord>& query_record_array,
                    const std::vector<Range>& query_range_array, int64_t topk, int64_t nprobe,
216
                    TopKQueryResult& topk_query_result) {
K
kun yu 已提交
217
    try {
S
starlord 已提交
218
        // step 1: convert vectors data
Y
Yu Kun 已提交
219 220 221
        ::milvus::grpc::SearchParam search_param;
        search_param.set_table_name(table_name);
        search_param.set_topk(topk);
Y
Yu Kun 已提交
222
        search_param.set_nprobe(nprobe);
S
starlord 已提交
223 224
        for (auto& record : query_record_array) {
            ::milvus::grpc::RowRecord* row_record = search_param.add_query_record_array();
225
            row_record->add_vector_data(record.data.begin(), record.data.end());
K
kun yu 已提交
226 227
        }

S
starlord 已提交
228 229 230
        // step 2: convert range array
        for (auto& range : query_range_array) {
            ::milvus::grpc::Range* grpc_range = search_param.add_query_range_array();
K
kun yu 已提交
231 232 233 234
            grpc_range->set_start_value(range.start_value);
            grpc_range->set_end_value(range.end_value);
        }

S
starlord 已提交
235
        // step 3: search vectors
236 237
        ::milvus::grpc::TopKQueryResult result;
        Status status = client_ptr_->Search(result, search_param);
K
kun yu 已提交
238

S
starlord 已提交
239
        // step 4: convert result array
240 241 242 243 244 245
        topk_query_result.row_num = result.row_num();
        topk_query_result.ids.resize(result.ids().size());
        memcpy(topk_query_result.ids.data(), result.ids().data(), result.ids().size() * sizeof(int64_t));
        topk_query_result.distances.resize(result.distances().size());
        memcpy(topk_query_result.distances.data(), result.distances().data(),
               result.distances().size() * sizeof(float));
K
kun yu 已提交
246

K
kun yu 已提交
247
        return status;
S
starlord 已提交
248
    } catch (std::exception& ex) {
K
kun yu 已提交
249
        return Status(StatusCode::UnknownError, "fail to search vectors: " + std::string(ex.what()));
K
kun yu 已提交
250 251 252 253
    }
}

Status
S
starlord 已提交
254
ClientProxy::DescribeTable(const std::string& table_name, TableSchema& table_schema) {
K
kun yu 已提交
255 256 257
    try {
        ::milvus::grpc::TableSchema grpc_schema;

K
kun yu 已提交
258
        Status status = client_ptr_->DescribeTable(grpc_schema, table_name);
K
kun yu 已提交
259

260
        table_schema.table_name = grpc_schema.table_name();
K
kun yu 已提交
261
        table_schema.dimension = grpc_schema.dimension();
262
        table_schema.index_file_size = grpc_schema.index_file_size();
S
starlord 已提交
263
        table_schema.metric_type = static_cast<MetricType>(grpc_schema.metric_type());
K
kun yu 已提交
264 265

        return status;
S
starlord 已提交
266
    } catch (std::exception& ex) {
K
kun yu 已提交
267
        return Status(StatusCode::UnknownError, "fail to describe table: " + std::string(ex.what()));
K
kun yu 已提交
268 269 270 271
    }
}

Status
S
starlord 已提交
272
ClientProxy::CountTable(const std::string& table_name, int64_t& row_count) {
K
kun yu 已提交
273
    try {
K
kun yu 已提交
274
        Status status;
Y
Yu Kun 已提交
275
        row_count = client_ptr_->CountTable(table_name, status);
K
kun yu 已提交
276
        return status;
S
starlord 已提交
277
    } catch (std::exception& ex) {
K
kun yu 已提交
278
        return Status(StatusCode::UnknownError, "fail to show tables: " + std::string(ex.what()));
K
kun yu 已提交
279 280 281 282
    }
}

Status
S
starlord 已提交
283
ClientProxy::ShowTables(std::vector<std::string>& table_array) {
K
kun yu 已提交
284
    try {
285 286 287 288 289 290 291 292 293
        Status status;
        milvus::grpc::TableNameList table_name_list;
        status = client_ptr_->ShowTables(table_name_list);

        table_array.resize(table_name_list.table_names_size());
        for (uint64_t i = 0; i < table_name_list.table_names_size(); ++i) {
            table_array[i] = table_name_list.table_names(i);
        }
        return status;
S
starlord 已提交
294
    } catch (std::exception& ex) {
K
kun yu 已提交
295
        return Status(StatusCode::UnknownError, "fail to show tables: " + std::string(ex.what()));
K
kun yu 已提交
296 297 298 299 300
    }
}

std::string
ClientProxy::ServerVersion() const {
K
kun yu 已提交
301
    Status status = Status::OK();
K
kun yu 已提交
302 303
    try {
        std::string version;
Y
Yu Kun 已提交
304
        Status status = client_ptr_->Cmd(version, "version");
K
kun yu 已提交
305
        return version;
S
starlord 已提交
306
    } catch (std::exception& ex) {
K
kun yu 已提交
307 308 309 310 311 312
        return "";
    }
}

std::string
ClientProxy::ServerStatus() const {
K
kun yu 已提交
313
    if (channel_ == nullptr) {
K
kun yu 已提交
314 315 316 317 318
        return "not connected to server";
    }

    try {
        std::string dummy;
Y
Yu Kun 已提交
319
        Status status = client_ptr_->Cmd(dummy, "");
K
kun yu 已提交
320
        return "server alive";
S
starlord 已提交
321
    } catch (std::exception& ex) {
K
kun yu 已提交
322 323 324 325
        return "connection lost";
    }
}

Y
Yu Kun 已提交
326 327 328 329 330 331 332 333 334 335
std::string
ClientProxy::DumpTaskTables() const {
    if (channel_ == nullptr) {
        return "not connected to server";
    }

    try {
        std::string dummy;
        Status status = client_ptr_->Cmd(dummy, "tasktable");
        return dummy;
S
starlord 已提交
336
    } catch (std::exception& ex) {
Y
Yu Kun 已提交
337 338 339 340
        return "connection lost";
    }
}

Y
Yu Kun 已提交
341
Status
S
starlord 已提交
342
ClientProxy::DeleteByRange(milvus::Range& range, const std::string& table_name) {
343 344 345 346 347 348
    try {
        ::milvus::grpc::DeleteByRangeParam delete_by_range_param;
        delete_by_range_param.set_table_name(table_name);
        delete_by_range_param.mutable_range()->set_start_value(range.start_value);
        delete_by_range_param.mutable_range()->set_end_value(range.end_value);
        return client_ptr_->DeleteByRange(delete_by_range_param);
S
starlord 已提交
349
    } catch (std::exception& ex) {
350 351
        return Status(StatusCode::UnknownError, "fail to delete by range: " + std::string(ex.what()));
    }
Y
Yu Kun 已提交
352 353 354
}

Status
S
starlord 已提交
355
ClientProxy::PreloadTable(const std::string& table_name) const {
Y
Yu Kun 已提交
356 357 358 359 360
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
        Status status = client_ptr_->PreloadTable(grpc_table_name);
        return status;
S
starlord 已提交
361
    } catch (std::exception& ex) {
362
        return Status(StatusCode::UnknownError, "fail to preload tables: " + std::string(ex.what()));
Y
Yu Kun 已提交
363
    }
Y
Yu Kun 已提交
364 365
}

366
Status
S
starlord 已提交
367
ClientProxy::DescribeIndex(const std::string& table_name, IndexParam& index_param) const {
368 369 370 371 372
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
        ::milvus::grpc::IndexParam grpc_index_param;
        Status status = client_ptr_->DescribeIndex(grpc_table_name, grpc_index_param);
S
starlord 已提交
373
        index_param.index_type = static_cast<IndexType>(grpc_index_param.mutable_index()->index_type());
374 375 376
        index_param.nlist = grpc_index_param.mutable_index()->nlist();

        return status;
S
starlord 已提交
377
    } catch (std::exception& ex) {
378 379
        return Status(StatusCode::UnknownError, "fail to describe index: " + std::string(ex.what()));
    }
Y
Yu Kun 已提交
380 381 382
}

Status
S
starlord 已提交
383
ClientProxy::DropIndex(const std::string& table_name) const {
384 385 386 387 388
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
        Status status = client_ptr_->DropIndex(grpc_table_name);
        return status;
S
starlord 已提交
389
    } catch (std::exception& ex) {
390 391
        return Status(StatusCode::UnknownError, "fail to drop index: " + std::string(ex.what()));
    }
Y
Yu Kun 已提交
392 393
}

S
starlord 已提交
394
}  // namespace milvus