ClientProxy.cpp 17.7 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

Y
yukun 已提交
12 13
#include "grpc/ClientProxy.h"
#include "grpc-gen/gen-milvus/milvus.grpc.pb.h"
S
starlord 已提交
14 15 16

#include <memory>
#include <string>
S
starlord 已提交
17
#include <vector>
S
starlord 已提交
18

K
kun yu 已提交
19
//#define GRPC_MULTIPLE_THREAD;
Y
yukun 已提交
20
#define MILVUS_SDK_VERSION "0.6.0";
K
kun yu 已提交
21 22

namespace milvus {
K
kun yu 已提交
23
bool
S
starlord 已提交
24
UriCheck(const std::string& uri) {
S
starlord 已提交
25
    size_t index = uri.find_first_of(':', 0);
S
starlord 已提交
26
    return (index != std::string::npos);
K
kun yu 已提交
27 28
}

G
groot 已提交
29 30
void
CopyRowRecord(::milvus::grpc::RowRecord* target, const RowRecord& src) {
G
groot 已提交
31 32 33 34 35 36 37 38 39
    if (!src.float_data.empty()) {
        auto vector_data = target->mutable_float_data();
        vector_data->Resize(static_cast<int>(src.float_data.size()), 0.0);
        memcpy(vector_data->mutable_data(), src.float_data.data(), src.float_data.size() * sizeof(float));
    }

    if (!src.binary_data.empty()) {
        target->set_binary_data(src.binary_data.data(), src.binary_data.size());
    }
G
groot 已提交
40 41
}

K
kun yu 已提交
42
Status
S
starlord 已提交
43
ClientProxy::Connect(const ConnectParam& param) {
K
kun yu 已提交
44 45 46 47 48
    std::string uri = param.ip_address + ":" + param.port;

    channel_ = ::grpc::CreateChannel(uri, ::grpc::InsecureChannelCredentials());
    if (channel_ != nullptr) {
        connected_ = true;
K
kun yu 已提交
49 50
        client_ptr_ = std::make_shared<GrpcClient>(channel_);
        return Status::OK();
K
kun yu 已提交
51
    }
S
starlord 已提交
52 53 54 55

    std::string reason = "connect failed!";
    connected_ = false;
    return Status(StatusCode::NotConnected, reason);
K
kun yu 已提交
56 57 58
}

Status
S
starlord 已提交
59
ClientProxy::Connect(const std::string& uri) {
K
kun yu 已提交
60
    if (!UriCheck(uri)) {
S
starlord 已提交
61
        return Status(StatusCode::InvalidAgument, "Invalid uri");
K
kun yu 已提交
62
    }
K
kun yu 已提交
63
    size_t index = uri.find_first_of(':', 0);
K
kun yu 已提交
64 65 66 67 68 69 70 71 72 73 74 75

    ConnectParam param;
    param.ip_address = uri.substr(0, index);
    param.port = uri.substr(index + 1);

    return Connect(param);
}

Status
ClientProxy::Connected() const {
    try {
        std::string info;
Y
Yu Kun 已提交
76
        return client_ptr_->Cmd(info, "");
S
starlord 已提交
77
    } catch (std::exception& ex) {
K
kun yu 已提交
78 79 80 81 82 83
        return Status(StatusCode::NotConnected, "connection lost: " + std::string(ex.what()));
    }
}

Status
ClientProxy::Disconnect() {
K
kun yu 已提交
84
    try {
K
kun yu 已提交
85
        Status status = client_ptr_->Disconnect();
K
kun yu 已提交
86 87
        connected_ = false;
        channel_.reset();
K
kun yu 已提交
88
        return status;
S
starlord 已提交
89
    } catch (std::exception& ex) {
K
kun yu 已提交
90 91
        return Status(StatusCode::UnknownError, "failed to disconnect: " + std::string(ex.what()));
    }
K
kun yu 已提交
92 93 94 95
}

std::string
ClientProxy::ClientVersion() const {
Y
yukun 已提交
96
    return MILVUS_SDK_VERSION;
K
kun yu 已提交
97 98 99
}

Status
S
starlord 已提交
100
ClientProxy::CreateTable(const TableSchema& param) {
K
kun yu 已提交
101 102
    try {
        ::milvus::grpc::TableSchema schema;
103
        schema.set_table_name(param.table_name);
K
kun yu 已提交
104
        schema.set_dimension(param.dimension);
105
        schema.set_index_file_size(param.index_file_size);
S
starlord 已提交
106
        schema.set_metric_type(static_cast<int32_t>(param.metric_type));
K
kun yu 已提交
107

K
kun yu 已提交
108
        return client_ptr_->CreateTable(schema);
S
starlord 已提交
109
    } catch (std::exception& ex) {
K
kun yu 已提交
110 111 112 113 114
        return Status(StatusCode::UnknownError, "failed to create table: " + std::string(ex.what()));
    }
}

bool
S
starlord 已提交
115
ClientProxy::HasTable(const std::string& table_name) {
K
kun yu 已提交
116
    Status status = Status::OK();
K
kun yu 已提交
117 118
    ::milvus::grpc::TableName grpc_table_name;
    grpc_table_name.set_table_name(table_name);
K
kun yu 已提交
119 120
    bool result = client_ptr_->HasTable(grpc_table_name, status);
    return result;
K
kun yu 已提交
121 122 123
}

Status
S
starlord 已提交
124
ClientProxy::DropTable(const std::string& table_name) {
K
kun yu 已提交
125 126 127
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
K
kun yu 已提交
128
        return client_ptr_->DropTable(grpc_table_name);
S
starlord 已提交
129
    } catch (std::exception& ex) {
K
kun yu 已提交
130 131 132 133 134
        return Status(StatusCode::UnknownError, "failed to drop table: " + std::string(ex.what()));
    }
}

Status
S
starlord 已提交
135
ClientProxy::CreateIndex(const IndexParam& index_param) {
K
kun yu 已提交
136
    try {
Y
Yu Kun 已提交
137
        ::milvus::grpc::IndexParam grpc_index_param;
138
        grpc_index_param.set_table_name(index_param.table_name);
S
starlord 已提交
139
        grpc_index_param.mutable_index()->set_index_type(static_cast<int32_t>(index_param.index_type));
140
        grpc_index_param.mutable_index()->set_nlist(index_param.nlist);
Y
Yu Kun 已提交
141
        return client_ptr_->CreateIndex(grpc_index_param);
S
starlord 已提交
142
    } catch (std::exception& ex) {
K
kun yu 已提交
143 144 145 146
        return Status(StatusCode::UnknownError, "failed to build index: " + std::string(ex.what()));
    }
}

K
kun yu 已提交
147
Status
G
groot 已提交
148 149
ClientProxy::Insert(const std::string& table_name, const std::string& partition_tag,
                    const std::vector<RowRecord>& record_array, std::vector<int64_t>& id_array) {
K
kun yu 已提交
150
    Status status = Status::OK();
K
kun yu 已提交
151 152
    try {
////////////////////////////////////////////////////////////////////////////
K
kun yu 已提交
153
#ifdef GRPC_MULTIPLE_THREAD
K
kun yu 已提交
154 155 156
        std::vector<std::thread> threads;
        int thread_count = 10;

K
kun yu 已提交
157
        std::shared_ptr<::milvus::grpc::InsertInfos> insert_info_array(
S
starlord 已提交
158
            new ::milvus::grpc::InsertInfos[thread_count], std::default_delete<::milvus::grpc::InsertInfos[]>());
K
kun yu 已提交
159

S
starlord 已提交
160 161
        std::shared_ptr<::milvus::grpc::VectorIds> vector_ids_array(new ::milvus::grpc::VectorIds[thread_count],
                                                                    std::default_delete<::milvus::grpc::VectorIds[]>());
K
kun yu 已提交
162

K
kun yu 已提交
163 164 165
        int64_t record_count = record_array.size() / thread_count;

        for (size_t i = 0; i < thread_count; i++) {
K
kun yu 已提交
166
            insert_info_array.get()[i].set_table_name(table_name);
K
kun yu 已提交
167
            for (size_t j = i * record_count; j < record_count * (i + 1); j++) {
S
starlord 已提交
168
                ::milvus::grpc::RowRecord* grpc_record = insert_info_array.get()[i].add_row_record_array();
K
kun yu 已提交
169 170 171 172 173 174 175 176 177
                for (size_t k = 0; k < record_array[j].data.size(); k++) {
                    grpc_record->add_vector_data(record_array[j].data[k]);
                }
            }
        }

        std::cout << "*****************************************************\n";
        auto start = std::chrono::high_resolution_clock::now();
        for (size_t j = 0; j < thread_count; j++) {
S
starlord 已提交
178 179
            threads.push_back(std::thread(&GrpcClient::InsertVector, client_ptr_, std::ref(vector_ids_array.get()[j]),
                                          std::ref(insert_info_array.get()[j]), std::ref(status)));
K
kun yu 已提交
180 181 182
        }
        std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
        auto finish = std::chrono::high_resolution_clock::now();
S
starlord 已提交
183 184
        std::cout << "InsertVector cost: "
                  << std::chrono::duration_cast<std::chrono::duration<double>>(finish - start).count() << "s\n";
K
kun yu 已提交
185 186 187
        std::cout << "*****************************************************\n";

        for (size_t i = 0; i < thread_count; i++) {
K
kun yu 已提交
188 189
            for (size_t j = 0; j < vector_ids_array.get()[i].vector_id_array_size(); j++) {
                id_array.push_back(vector_ids_array.get()[i].vector_id_array(j));
K
kun yu 已提交
190 191
            }
        }
K
kun yu 已提交
192
#else
Y
Yu Kun 已提交
193 194
        ::milvus::grpc::InsertParam insert_param;
        insert_param.set_table_name(table_name);
G
groot 已提交
195
        insert_param.set_partition_tag(partition_tag);
K
kun yu 已提交
196

S
starlord 已提交
197 198
        for (auto& record : record_array) {
            ::milvus::grpc::RowRecord* grpc_record = insert_param.add_row_record_array();
G
groot 已提交
199
            CopyRowRecord(grpc_record, record);
K
kun yu 已提交
200 201
        }

S
starlord 已提交
202
        // Single thread
Y
Yu Kun 已提交
203 204
        ::milvus::grpc::VectorIds vector_ids;
        if (!id_array.empty()) {
205
            /* set user's ids */
G
groot 已提交
206 207 208
            auto row_ids = insert_param.mutable_row_id_array();
            row_ids->Resize(static_cast<int>(id_array.size()), -1);
            memcpy(row_ids->mutable_data(), id_array.data(), id_array.size() * sizeof(int64_t));
Y
Yu Kun 已提交
209 210 211
            client_ptr_->Insert(vector_ids, insert_param, status);
        } else {
            client_ptr_->Insert(vector_ids, insert_param, status);
212 213
            /* return Milvus generated ids back to user */
            id_array.insert(id_array.end(), vector_ids.vector_id_array().begin(), vector_ids.vector_id_array().end());
K
kun yu 已提交
214 215
        }
#endif
S
starlord 已提交
216
    } catch (std::exception& ex) {
K
kun yu 已提交
217
        return Status(StatusCode::UnknownError, "fail to add vector: " + std::string(ex.what()));
K
kun yu 已提交
218 219
    }

K
kun yu 已提交
220
    return status;
K
kun yu 已提交
221 222 223
}

Status
G
groot 已提交
224
ClientProxy::Search(const std::string& table_name, const std::vector<std::string>& partition_tags,
G
groot 已提交
225
                    const std::vector<RowRecord>& query_record_array, const std::vector<Range>& query_range_array,
G
groot 已提交
226
                    int64_t topk, int64_t nprobe, TopKQueryResult& topk_query_result) {
K
kun yu 已提交
227
    try {
S
starlord 已提交
228
        // step 1: convert vectors data
Y
Yu Kun 已提交
229 230 231
        ::milvus::grpc::SearchParam search_param;
        search_param.set_table_name(table_name);
        search_param.set_topk(topk);
Y
Yu Kun 已提交
232
        search_param.set_nprobe(nprobe);
G
groot 已提交
233
        for (auto& tag : partition_tags) {
G
groot 已提交
234 235
            search_param.add_partition_tag_array(tag);
        }
S
starlord 已提交
236 237
        for (auto& record : query_record_array) {
            ::milvus::grpc::RowRecord* row_record = search_param.add_query_record_array();
G
groot 已提交
238
            CopyRowRecord(row_record, record);
K
kun yu 已提交
239 240
        }

S
starlord 已提交
241 242 243
        // step 2: convert range array
        for (auto& range : query_range_array) {
            ::milvus::grpc::Range* grpc_range = search_param.add_query_range_array();
K
kun yu 已提交
244 245 246 247
            grpc_range->set_start_value(range.start_value);
            grpc_range->set_end_value(range.end_value);
        }

S
starlord 已提交
248
        // step 3: search vectors
249 250
        ::milvus::grpc::TopKQueryResult result;
        Status status = client_ptr_->Search(result, search_param);
G
groot 已提交
251 252 253
        if (result.row_num() == 0) {
            return status;
        }
K
kun yu 已提交
254

S
starlord 已提交
255
        // step 4: convert result array
256 257 258 259 260 261 262 263 264 265 266
        topk_query_result.reserve(result.row_num());
        int64_t nq = result.row_num();
        int64_t topk = result.ids().size() / nq;
        for (int64_t i = 0; i < result.row_num(); i++) {
            milvus::QueryResult one_result;
            one_result.ids.resize(topk);
            one_result.distances.resize(topk);
            memcpy(one_result.ids.data(), result.ids().data() + topk * i, topk * sizeof(int64_t));
            memcpy(one_result.distances.data(), result.distances().data() + topk * i, topk * sizeof(float));
            topk_query_result.emplace_back(one_result);
        }
K
kun yu 已提交
267

K
kun yu 已提交
268
        return status;
S
starlord 已提交
269
    } catch (std::exception& ex) {
K
kun yu 已提交
270
        return Status(StatusCode::UnknownError, "fail to search vectors: " + std::string(ex.what()));
K
kun yu 已提交
271 272 273 274
    }
}

Status
S
starlord 已提交
275
ClientProxy::DescribeTable(const std::string& table_name, TableSchema& table_schema) {
K
kun yu 已提交
276 277 278
    try {
        ::milvus::grpc::TableSchema grpc_schema;

K
kun yu 已提交
279
        Status status = client_ptr_->DescribeTable(grpc_schema, table_name);
K
kun yu 已提交
280

281
        table_schema.table_name = grpc_schema.table_name();
K
kun yu 已提交
282
        table_schema.dimension = grpc_schema.dimension();
283
        table_schema.index_file_size = grpc_schema.index_file_size();
S
starlord 已提交
284
        table_schema.metric_type = static_cast<MetricType>(grpc_schema.metric_type());
K
kun yu 已提交
285 286

        return status;
S
starlord 已提交
287
    } catch (std::exception& ex) {
K
kun yu 已提交
288
        return Status(StatusCode::UnknownError, "fail to describe table: " + std::string(ex.what()));
K
kun yu 已提交
289 290 291 292
    }
}

Status
S
starlord 已提交
293
ClientProxy::CountTable(const std::string& table_name, int64_t& row_count) {
K
kun yu 已提交
294
    try {
K
kun yu 已提交
295
        Status status;
Y
Yu Kun 已提交
296
        row_count = client_ptr_->CountTable(table_name, status);
K
kun yu 已提交
297
        return status;
S
starlord 已提交
298
    } catch (std::exception& ex) {
K
kun yu 已提交
299
        return Status(StatusCode::UnknownError, "fail to show tables: " + std::string(ex.what()));
K
kun yu 已提交
300 301 302 303
    }
}

Status
S
starlord 已提交
304
ClientProxy::ShowTables(std::vector<std::string>& table_array) {
K
kun yu 已提交
305
    try {
306 307 308 309 310 311 312 313 314
        Status status;
        milvus::grpc::TableNameList table_name_list;
        status = client_ptr_->ShowTables(table_name_list);

        table_array.resize(table_name_list.table_names_size());
        for (uint64_t i = 0; i < table_name_list.table_names_size(); ++i) {
            table_array[i] = table_name_list.table_names(i);
        }
        return status;
S
starlord 已提交
315
    } catch (std::exception& ex) {
K
kun yu 已提交
316
        return Status(StatusCode::UnknownError, "fail to show tables: " + std::string(ex.what()));
K
kun yu 已提交
317 318 319 320 321
    }
}

std::string
ClientProxy::ServerVersion() const {
K
kun yu 已提交
322
    Status status = Status::OK();
K
kun yu 已提交
323 324
    try {
        std::string version;
Y
Yu Kun 已提交
325
        Status status = client_ptr_->Cmd(version, "version");
K
kun yu 已提交
326
        return version;
S
starlord 已提交
327
    } catch (std::exception& ex) {
K
kun yu 已提交
328 329 330 331 332 333
        return "";
    }
}

std::string
ClientProxy::ServerStatus() const {
K
kun yu 已提交
334
    if (channel_ == nullptr) {
K
kun yu 已提交
335 336 337 338 339
        return "not connected to server";
    }

    try {
        std::string dummy;
Y
yukun 已提交
340
        Status status = client_ptr_->Cmd(dummy, "");
K
kun yu 已提交
341
        return "server alive";
S
starlord 已提交
342
    } catch (std::exception& ex) {
K
kun yu 已提交
343 344 345 346
        return "connection lost";
    }
}

Y
Yu Kun 已提交
347 348 349 350 351 352 353 354 355 356
std::string
ClientProxy::DumpTaskTables() const {
    if (channel_ == nullptr) {
        return "not connected to server";
    }

    try {
        std::string dummy;
        Status status = client_ptr_->Cmd(dummy, "tasktable");
        return dummy;
S
starlord 已提交
357
    } catch (std::exception& ex) {
Y
Yu Kun 已提交
358 359 360 361
        return "connection lost";
    }
}

Y
Yu Kun 已提交
362
Status
G
groot 已提交
363
ClientProxy::DeleteByDate(const std::string& table_name, const milvus::Range& range) {
364
    try {
G
groot 已提交
365
        ::milvus::grpc::DeleteByDateParam delete_by_range_param;
366 367 368
        delete_by_range_param.set_table_name(table_name);
        delete_by_range_param.mutable_range()->set_start_value(range.start_value);
        delete_by_range_param.mutable_range()->set_end_value(range.end_value);
G
groot 已提交
369
        return client_ptr_->DeleteByDate(delete_by_range_param);
S
starlord 已提交
370
    } catch (std::exception& ex) {
371 372
        return Status(StatusCode::UnknownError, "fail to delete by range: " + std::string(ex.what()));
    }
Y
Yu Kun 已提交
373 374 375
}

Status
S
starlord 已提交
376
ClientProxy::PreloadTable(const std::string& table_name) const {
Y
Yu Kun 已提交
377 378 379 380 381
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
        Status status = client_ptr_->PreloadTable(grpc_table_name);
        return status;
S
starlord 已提交
382
    } catch (std::exception& ex) {
383
        return Status(StatusCode::UnknownError, "fail to preload tables: " + std::string(ex.what()));
Y
Yu Kun 已提交
384
    }
Y
Yu Kun 已提交
385 386
}

387
Status
S
starlord 已提交
388
ClientProxy::DescribeIndex(const std::string& table_name, IndexParam& index_param) const {
389 390 391 392 393
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
        ::milvus::grpc::IndexParam grpc_index_param;
        Status status = client_ptr_->DescribeIndex(grpc_table_name, grpc_index_param);
S
starlord 已提交
394
        index_param.index_type = static_cast<IndexType>(grpc_index_param.mutable_index()->index_type());
395 396 397
        index_param.nlist = grpc_index_param.mutable_index()->nlist();

        return status;
S
starlord 已提交
398
    } catch (std::exception& ex) {
399 400
        return Status(StatusCode::UnknownError, "fail to describe index: " + std::string(ex.what()));
    }
Y
Yu Kun 已提交
401 402 403
}

Status
S
starlord 已提交
404
ClientProxy::DropIndex(const std::string& table_name) const {
405 406 407 408 409
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
        Status status = client_ptr_->DropIndex(grpc_table_name);
        return status;
S
starlord 已提交
410
    } catch (std::exception& ex) {
411 412
        return Status(StatusCode::UnknownError, "fail to drop index: " + std::string(ex.what()));
    }
Y
Yu Kun 已提交
413 414
}

G
groot 已提交
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
Status
ClientProxy::CreatePartition(const PartitionParam& partition_param) {
    try {
        ::milvus::grpc::PartitionParam grpc_partition_param;
        grpc_partition_param.set_table_name(partition_param.table_name);
        grpc_partition_param.set_partition_name(partition_param.partition_name);
        grpc_partition_param.set_tag(partition_param.partition_tag);
        Status status = client_ptr_->CreatePartition(grpc_partition_param);
        return status;
    } catch (std::exception& ex) {
        return Status(StatusCode::UnknownError, "fail to create partition: " + std::string(ex.what()));
    }
}

Status
ClientProxy::ShowPartitions(const std::string& table_name, PartitionList& partition_array) const {
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
        ::milvus::grpc::PartitionList grpc_partition_list;
        Status status = client_ptr_->ShowPartitions(grpc_table_name, grpc_partition_list);
        partition_array.resize(grpc_partition_list.partition_array_size());
        for (uint64_t i = 0; i < grpc_partition_list.partition_array_size(); ++i) {
            partition_array[i].table_name = grpc_partition_list.partition_array(i).table_name();
            partition_array[i].partition_name = grpc_partition_list.partition_array(i).partition_name();
            partition_array[i].partition_tag = grpc_partition_list.partition_array(i).tag();
        }
        return status;
    } catch (std::exception& ex) {
        return Status(StatusCode::UnknownError, "fail to show partitions: " + std::string(ex.what()));
    }
}

Status
ClientProxy::DropPartition(const PartitionParam& partition_param) {
    try {
        ::milvus::grpc::PartitionParam grpc_partition_param;
        grpc_partition_param.set_table_name(partition_param.table_name);
        grpc_partition_param.set_partition_name(partition_param.partition_name);
        grpc_partition_param.set_tag(partition_param.partition_tag);
        Status status = client_ptr_->DropPartition(grpc_partition_param);
        return status;
    } catch (std::exception& ex) {
        return Status(StatusCode::UnknownError, "fail to drop partition: " + std::string(ex.what()));
    }
}

462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481
Status
ClientProxy::GetConfig(const std::string& node_name, std::string& value) const {
    try {
        return client_ptr_->Cmd(value, "get_config " + node_name);
    } catch (std::exception& ex) {
        return Status(StatusCode::UnknownError, "Fail to get config: " + node_name);
    }
}

Status
ClientProxy::SetConfig(const std::string& node_name, const std::string& value) const {
    try {
        std::string dummy;
        return client_ptr_->Cmd(dummy, "set_config " + node_name + " " + value);
    } catch (std::exception& ex) {
        return Status(StatusCode::UnknownError, "Fail to set config: " + node_name);
    }
}


S
starlord 已提交
482
}  // namespace milvus