ClientProxy.cpp 17.0 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

Y
yukun 已提交
18 19
#include "grpc/ClientProxy.h"
#include "grpc-gen/gen-milvus/milvus.grpc.pb.h"
S
starlord 已提交
20 21 22

#include <memory>
#include <string>
S
starlord 已提交
23
#include <vector>
S
starlord 已提交
24

K
kun yu 已提交
25
//#define GRPC_MULTIPLE_THREAD;
Y
yukun 已提交
26
#define MILVUS_SDK_VERSION "0.6.0";
K
kun yu 已提交
27 28

namespace milvus {
K
kun yu 已提交
29
bool
S
starlord 已提交
30
UriCheck(const std::string& uri) {
S
starlord 已提交
31
    size_t index = uri.find_first_of(':', 0);
S
starlord 已提交
32
    return (index != std::string::npos);
K
kun yu 已提交
33 34
}

G
groot 已提交
35 36 37 38 39 40 41
void
CopyRowRecord(::milvus::grpc::RowRecord* target, const RowRecord& src) {
    auto vector_data = target->mutable_vector_data();
    vector_data->Resize(static_cast<int>(src.data.size()), 0.0);
    memcpy(vector_data->mutable_data(), src.data.data(), src.data.size() * sizeof(float));
}

K
kun yu 已提交
42
Status
S
starlord 已提交
43
ClientProxy::Connect(const ConnectParam& param) {
K
kun yu 已提交
44 45 46 47 48
    std::string uri = param.ip_address + ":" + param.port;

    channel_ = ::grpc::CreateChannel(uri, ::grpc::InsecureChannelCredentials());
    if (channel_ != nullptr) {
        connected_ = true;
K
kun yu 已提交
49 50
        client_ptr_ = std::make_shared<GrpcClient>(channel_);
        return Status::OK();
K
kun yu 已提交
51
    }
S
starlord 已提交
52 53 54 55

    std::string reason = "connect failed!";
    connected_ = false;
    return Status(StatusCode::NotConnected, reason);
K
kun yu 已提交
56 57 58
}

Status
S
starlord 已提交
59
ClientProxy::Connect(const std::string& uri) {
K
kun yu 已提交
60
    if (!UriCheck(uri)) {
S
starlord 已提交
61
        return Status(StatusCode::InvalidAgument, "Invalid uri");
K
kun yu 已提交
62
    }
K
kun yu 已提交
63
    size_t index = uri.find_first_of(':', 0);
K
kun yu 已提交
64 65 66 67 68 69 70 71 72 73 74 75

    ConnectParam param;
    param.ip_address = uri.substr(0, index);
    param.port = uri.substr(index + 1);

    return Connect(param);
}

Status
ClientProxy::Connected() const {
    try {
        std::string info;
Y
Yu Kun 已提交
76
        return client_ptr_->Cmd(info, "");
S
starlord 已提交
77
    } catch (std::exception& ex) {
K
kun yu 已提交
78 79 80 81 82 83
        return Status(StatusCode::NotConnected, "connection lost: " + std::string(ex.what()));
    }
}

Status
ClientProxy::Disconnect() {
K
kun yu 已提交
84
    try {
K
kun yu 已提交
85
        Status status = client_ptr_->Disconnect();
K
kun yu 已提交
86 87
        connected_ = false;
        channel_.reset();
K
kun yu 已提交
88
        return status;
S
starlord 已提交
89
    } catch (std::exception& ex) {
K
kun yu 已提交
90 91
        return Status(StatusCode::UnknownError, "failed to disconnect: " + std::string(ex.what()));
    }
K
kun yu 已提交
92 93 94 95
}

std::string
ClientProxy::ClientVersion() const {
Y
yukun 已提交
96
    return MILVUS_SDK_VERSION;
K
kun yu 已提交
97 98 99
}

Status
S
starlord 已提交
100
ClientProxy::CreateTable(const TableSchema& param) {
K
kun yu 已提交
101 102
    try {
        ::milvus::grpc::TableSchema schema;
103
        schema.set_table_name(param.table_name);
K
kun yu 已提交
104
        schema.set_dimension(param.dimension);
105
        schema.set_index_file_size(param.index_file_size);
S
starlord 已提交
106
        schema.set_metric_type(static_cast<int32_t>(param.metric_type));
K
kun yu 已提交
107

K
kun yu 已提交
108
        return client_ptr_->CreateTable(schema);
S
starlord 已提交
109
    } catch (std::exception& ex) {
K
kun yu 已提交
110 111 112 113 114
        return Status(StatusCode::UnknownError, "failed to create table: " + std::string(ex.what()));
    }
}

bool
S
starlord 已提交
115
ClientProxy::HasTable(const std::string& table_name) {
K
kun yu 已提交
116
    Status status = Status::OK();
K
kun yu 已提交
117 118
    ::milvus::grpc::TableName grpc_table_name;
    grpc_table_name.set_table_name(table_name);
K
kun yu 已提交
119 120
    bool result = client_ptr_->HasTable(grpc_table_name, status);
    return result;
K
kun yu 已提交
121 122 123
}

Status
S
starlord 已提交
124
ClientProxy::DropTable(const std::string& table_name) {
K
kun yu 已提交
125 126 127
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
K
kun yu 已提交
128
        return client_ptr_->DropTable(grpc_table_name);
S
starlord 已提交
129
    } catch (std::exception& ex) {
K
kun yu 已提交
130 131 132 133 134
        return Status(StatusCode::UnknownError, "failed to drop table: " + std::string(ex.what()));
    }
}

Status
S
starlord 已提交
135
ClientProxy::CreateIndex(const IndexParam& index_param) {
K
kun yu 已提交
136
    try {
Y
Yu Kun 已提交
137
        ::milvus::grpc::IndexParam grpc_index_param;
138
        grpc_index_param.set_table_name(index_param.table_name);
S
starlord 已提交
139
        grpc_index_param.mutable_index()->set_index_type(static_cast<int32_t>(index_param.index_type));
140
        grpc_index_param.mutable_index()->set_nlist(index_param.nlist);
Y
Yu Kun 已提交
141
        return client_ptr_->CreateIndex(grpc_index_param);
S
starlord 已提交
142
    } catch (std::exception& ex) {
K
kun yu 已提交
143 144 145 146
        return Status(StatusCode::UnknownError, "failed to build index: " + std::string(ex.what()));
    }
}

K
kun yu 已提交
147
Status
G
groot 已提交
148 149
ClientProxy::Insert(const std::string& table_name, const std::string& partition_tag,
                    const std::vector<RowRecord>& record_array, std::vector<int64_t>& id_array) {
K
kun yu 已提交
150
    Status status = Status::OK();
K
kun yu 已提交
151 152
    try {
////////////////////////////////////////////////////////////////////////////
K
kun yu 已提交
153
#ifdef GRPC_MULTIPLE_THREAD
K
kun yu 已提交
154 155 156
        std::vector<std::thread> threads;
        int thread_count = 10;

K
kun yu 已提交
157
        std::shared_ptr<::milvus::grpc::InsertInfos> insert_info_array(
S
starlord 已提交
158
            new ::milvus::grpc::InsertInfos[thread_count], std::default_delete<::milvus::grpc::InsertInfos[]>());
K
kun yu 已提交
159

S
starlord 已提交
160 161
        std::shared_ptr<::milvus::grpc::VectorIds> vector_ids_array(new ::milvus::grpc::VectorIds[thread_count],
                                                                    std::default_delete<::milvus::grpc::VectorIds[]>());
K
kun yu 已提交
162

K
kun yu 已提交
163 164 165
        int64_t record_count = record_array.size() / thread_count;

        for (size_t i = 0; i < thread_count; i++) {
K
kun yu 已提交
166
            insert_info_array.get()[i].set_table_name(table_name);
K
kun yu 已提交
167
            for (size_t j = i * record_count; j < record_count * (i + 1); j++) {
S
starlord 已提交
168
                ::milvus::grpc::RowRecord* grpc_record = insert_info_array.get()[i].add_row_record_array();
K
kun yu 已提交
169 170 171 172 173 174 175 176 177
                for (size_t k = 0; k < record_array[j].data.size(); k++) {
                    grpc_record->add_vector_data(record_array[j].data[k]);
                }
            }
        }

        std::cout << "*****************************************************\n";
        auto start = std::chrono::high_resolution_clock::now();
        for (size_t j = 0; j < thread_count; j++) {
S
starlord 已提交
178 179
            threads.push_back(std::thread(&GrpcClient::InsertVector, client_ptr_, std::ref(vector_ids_array.get()[j]),
                                          std::ref(insert_info_array.get()[j]), std::ref(status)));
K
kun yu 已提交
180 181 182
        }
        std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
        auto finish = std::chrono::high_resolution_clock::now();
S
starlord 已提交
183 184
        std::cout << "InsertVector cost: "
                  << std::chrono::duration_cast<std::chrono::duration<double>>(finish - start).count() << "s\n";
K
kun yu 已提交
185 186 187
        std::cout << "*****************************************************\n";

        for (size_t i = 0; i < thread_count; i++) {
K
kun yu 已提交
188 189
            for (size_t j = 0; j < vector_ids_array.get()[i].vector_id_array_size(); j++) {
                id_array.push_back(vector_ids_array.get()[i].vector_id_array(j));
K
kun yu 已提交
190 191
            }
        }
K
kun yu 已提交
192
#else
Y
Yu Kun 已提交
193 194
        ::milvus::grpc::InsertParam insert_param;
        insert_param.set_table_name(table_name);
G
groot 已提交
195
        insert_param.set_partition_tag(partition_tag);
K
kun yu 已提交
196

S
starlord 已提交
197 198
        for (auto& record : record_array) {
            ::milvus::grpc::RowRecord* grpc_record = insert_param.add_row_record_array();
G
groot 已提交
199
            CopyRowRecord(grpc_record, record);
K
kun yu 已提交
200 201
        }

S
starlord 已提交
202
        // Single thread
Y
Yu Kun 已提交
203 204
        ::milvus::grpc::VectorIds vector_ids;
        if (!id_array.empty()) {
205
            /* set user's ids */
G
groot 已提交
206 207 208
            auto row_ids = insert_param.mutable_row_id_array();
            row_ids->Resize(static_cast<int>(id_array.size()), -1);
            memcpy(row_ids->mutable_data(), id_array.data(), id_array.size() * sizeof(int64_t));
Y
Yu Kun 已提交
209 210 211
            client_ptr_->Insert(vector_ids, insert_param, status);
        } else {
            client_ptr_->Insert(vector_ids, insert_param, status);
212 213
            /* return Milvus generated ids back to user */
            id_array.insert(id_array.end(), vector_ids.vector_id_array().begin(), vector_ids.vector_id_array().end());
K
kun yu 已提交
214 215
        }
#endif
S
starlord 已提交
216
    } catch (std::exception& ex) {
K
kun yu 已提交
217
        return Status(StatusCode::UnknownError, "fail to add vector: " + std::string(ex.what()));
K
kun yu 已提交
218 219
    }

K
kun yu 已提交
220
    return status;
K
kun yu 已提交
221 222 223
}

Status
G
groot 已提交
224
ClientProxy::Search(const std::string& table_name, const std::vector<std::string>& partition_tags,
G
groot 已提交
225
                    const std::vector<RowRecord>& query_record_array, const std::vector<Range>& query_range_array,
G
groot 已提交
226
                    int64_t topk, int64_t nprobe, TopKQueryResult& topk_query_result) {
K
kun yu 已提交
227
    try {
S
starlord 已提交
228
        // step 1: convert vectors data
Y
Yu Kun 已提交
229 230 231
        ::milvus::grpc::SearchParam search_param;
        search_param.set_table_name(table_name);
        search_param.set_topk(topk);
Y
Yu Kun 已提交
232
        search_param.set_nprobe(nprobe);
G
groot 已提交
233
        for (auto& tag : partition_tags) {
G
groot 已提交
234 235
            search_param.add_partition_tag_array(tag);
        }
S
starlord 已提交
236 237
        for (auto& record : query_record_array) {
            ::milvus::grpc::RowRecord* row_record = search_param.add_query_record_array();
G
groot 已提交
238
            CopyRowRecord(row_record, record);
K
kun yu 已提交
239 240
        }

S
starlord 已提交
241 242 243
        // step 2: convert range array
        for (auto& range : query_range_array) {
            ::milvus::grpc::Range* grpc_range = search_param.add_query_range_array();
K
kun yu 已提交
244 245 246 247
            grpc_range->set_start_value(range.start_value);
            grpc_range->set_end_value(range.end_value);
        }

S
starlord 已提交
248
        // step 3: search vectors
249 250
        ::milvus::grpc::TopKQueryResult result;
        Status status = client_ptr_->Search(result, search_param);
K
kun yu 已提交
251

S
starlord 已提交
252
        // step 4: convert result array
253 254 255 256 257 258 259 260 261 262 263
        topk_query_result.reserve(result.row_num());
        int64_t nq = result.row_num();
        int64_t topk = result.ids().size() / nq;
        for (int64_t i = 0; i < result.row_num(); i++) {
            milvus::QueryResult one_result;
            one_result.ids.resize(topk);
            one_result.distances.resize(topk);
            memcpy(one_result.ids.data(), result.ids().data() + topk * i, topk * sizeof(int64_t));
            memcpy(one_result.distances.data(), result.distances().data() + topk * i, topk * sizeof(float));
            topk_query_result.emplace_back(one_result);
        }
K
kun yu 已提交
264

K
kun yu 已提交
265
        return status;
S
starlord 已提交
266
    } catch (std::exception& ex) {
K
kun yu 已提交
267
        return Status(StatusCode::UnknownError, "fail to search vectors: " + std::string(ex.what()));
K
kun yu 已提交
268 269 270 271
    }
}

Status
S
starlord 已提交
272
ClientProxy::DescribeTable(const std::string& table_name, TableSchema& table_schema) {
K
kun yu 已提交
273 274 275
    try {
        ::milvus::grpc::TableSchema grpc_schema;

K
kun yu 已提交
276
        Status status = client_ptr_->DescribeTable(grpc_schema, table_name);
K
kun yu 已提交
277

278
        table_schema.table_name = grpc_schema.table_name();
K
kun yu 已提交
279
        table_schema.dimension = grpc_schema.dimension();
280
        table_schema.index_file_size = grpc_schema.index_file_size();
S
starlord 已提交
281
        table_schema.metric_type = static_cast<MetricType>(grpc_schema.metric_type());
K
kun yu 已提交
282 283

        return status;
S
starlord 已提交
284
    } catch (std::exception& ex) {
K
kun yu 已提交
285
        return Status(StatusCode::UnknownError, "fail to describe table: " + std::string(ex.what()));
K
kun yu 已提交
286 287 288 289
    }
}

Status
S
starlord 已提交
290
ClientProxy::CountTable(const std::string& table_name, int64_t& row_count) {
K
kun yu 已提交
291
    try {
K
kun yu 已提交
292
        Status status;
Y
Yu Kun 已提交
293
        row_count = client_ptr_->CountTable(table_name, status);
K
kun yu 已提交
294
        return status;
S
starlord 已提交
295
    } catch (std::exception& ex) {
K
kun yu 已提交
296
        return Status(StatusCode::UnknownError, "fail to show tables: " + std::string(ex.what()));
K
kun yu 已提交
297 298 299 300
    }
}

Status
S
starlord 已提交
301
ClientProxy::ShowTables(std::vector<std::string>& table_array) {
K
kun yu 已提交
302
    try {
303 304 305 306 307 308 309 310 311
        Status status;
        milvus::grpc::TableNameList table_name_list;
        status = client_ptr_->ShowTables(table_name_list);

        table_array.resize(table_name_list.table_names_size());
        for (uint64_t i = 0; i < table_name_list.table_names_size(); ++i) {
            table_array[i] = table_name_list.table_names(i);
        }
        return status;
S
starlord 已提交
312
    } catch (std::exception& ex) {
K
kun yu 已提交
313
        return Status(StatusCode::UnknownError, "fail to show tables: " + std::string(ex.what()));
K
kun yu 已提交
314 315 316 317 318
    }
}

std::string
ClientProxy::ServerVersion() const {
K
kun yu 已提交
319
    Status status = Status::OK();
K
kun yu 已提交
320 321
    try {
        std::string version;
Y
Yu Kun 已提交
322
        Status status = client_ptr_->Cmd(version, "version");
K
kun yu 已提交
323
        return version;
S
starlord 已提交
324
    } catch (std::exception& ex) {
K
kun yu 已提交
325 326 327 328 329 330
        return "";
    }
}

std::string
ClientProxy::ServerStatus() const {
K
kun yu 已提交
331
    if (channel_ == nullptr) {
K
kun yu 已提交
332 333 334 335 336
        return "not connected to server";
    }

    try {
        std::string dummy;
Y
yukun 已提交
337
        Status status = client_ptr_->Cmd(dummy, "");
K
kun yu 已提交
338
        return "server alive";
S
starlord 已提交
339
    } catch (std::exception& ex) {
K
kun yu 已提交
340 341 342 343
        return "connection lost";
    }
}

Y
Yu Kun 已提交
344 345 346 347 348 349 350 351 352 353
std::string
ClientProxy::DumpTaskTables() const {
    if (channel_ == nullptr) {
        return "not connected to server";
    }

    try {
        std::string dummy;
        Status status = client_ptr_->Cmd(dummy, "tasktable");
        return dummy;
S
starlord 已提交
354
    } catch (std::exception& ex) {
Y
Yu Kun 已提交
355 356 357 358
        return "connection lost";
    }
}

Y
Yu Kun 已提交
359
Status
G
groot 已提交
360
ClientProxy::DeleteByDate(const std::string& table_name, const milvus::Range& range) {
361
    try {
G
groot 已提交
362
        ::milvus::grpc::DeleteByDateParam delete_by_range_param;
363 364 365
        delete_by_range_param.set_table_name(table_name);
        delete_by_range_param.mutable_range()->set_start_value(range.start_value);
        delete_by_range_param.mutable_range()->set_end_value(range.end_value);
G
groot 已提交
366
        return client_ptr_->DeleteByDate(delete_by_range_param);
S
starlord 已提交
367
    } catch (std::exception& ex) {
368 369
        return Status(StatusCode::UnknownError, "fail to delete by range: " + std::string(ex.what()));
    }
Y
Yu Kun 已提交
370 371 372
}

Status
S
starlord 已提交
373
ClientProxy::PreloadTable(const std::string& table_name) const {
Y
Yu Kun 已提交
374 375 376 377 378
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
        Status status = client_ptr_->PreloadTable(grpc_table_name);
        return status;
S
starlord 已提交
379
    } catch (std::exception& ex) {
380
        return Status(StatusCode::UnknownError, "fail to preload tables: " + std::string(ex.what()));
Y
Yu Kun 已提交
381
    }
Y
Yu Kun 已提交
382 383
}

384
Status
S
starlord 已提交
385
ClientProxy::DescribeIndex(const std::string& table_name, IndexParam& index_param) const {
386 387 388 389 390
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
        ::milvus::grpc::IndexParam grpc_index_param;
        Status status = client_ptr_->DescribeIndex(grpc_table_name, grpc_index_param);
S
starlord 已提交
391
        index_param.index_type = static_cast<IndexType>(grpc_index_param.mutable_index()->index_type());
392 393 394
        index_param.nlist = grpc_index_param.mutable_index()->nlist();

        return status;
S
starlord 已提交
395
    } catch (std::exception& ex) {
396 397
        return Status(StatusCode::UnknownError, "fail to describe index: " + std::string(ex.what()));
    }
Y
Yu Kun 已提交
398 399 400
}

Status
S
starlord 已提交
401
ClientProxy::DropIndex(const std::string& table_name) const {
402 403 404 405 406
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
        Status status = client_ptr_->DropIndex(grpc_table_name);
        return status;
S
starlord 已提交
407
    } catch (std::exception& ex) {
408 409
        return Status(StatusCode::UnknownError, "fail to drop index: " + std::string(ex.what()));
    }
Y
Yu Kun 已提交
410 411
}

G
groot 已提交
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
Status
ClientProxy::CreatePartition(const PartitionParam& partition_param) {
    try {
        ::milvus::grpc::PartitionParam grpc_partition_param;
        grpc_partition_param.set_table_name(partition_param.table_name);
        grpc_partition_param.set_partition_name(partition_param.partition_name);
        grpc_partition_param.set_tag(partition_param.partition_tag);
        Status status = client_ptr_->CreatePartition(grpc_partition_param);
        return status;
    } catch (std::exception& ex) {
        return Status(StatusCode::UnknownError, "fail to create partition: " + std::string(ex.what()));
    }
}

Status
ClientProxy::ShowPartitions(const std::string& table_name, PartitionList& partition_array) const {
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
        ::milvus::grpc::PartitionList grpc_partition_list;
        Status status = client_ptr_->ShowPartitions(grpc_table_name, grpc_partition_list);
        partition_array.resize(grpc_partition_list.partition_array_size());
        for (uint64_t i = 0; i < grpc_partition_list.partition_array_size(); ++i) {
            partition_array[i].table_name = grpc_partition_list.partition_array(i).table_name();
            partition_array[i].partition_name = grpc_partition_list.partition_array(i).partition_name();
            partition_array[i].partition_tag = grpc_partition_list.partition_array(i).tag();
        }
        return status;
    } catch (std::exception& ex) {
        return Status(StatusCode::UnknownError, "fail to show partitions: " + std::string(ex.what()));
    }
}

Status
ClientProxy::DropPartition(const PartitionParam& partition_param) {
    try {
        ::milvus::grpc::PartitionParam grpc_partition_param;
        grpc_partition_param.set_table_name(partition_param.table_name);
        grpc_partition_param.set_partition_name(partition_param.partition_name);
        grpc_partition_param.set_tag(partition_param.partition_tag);
        Status status = client_ptr_->DropPartition(grpc_partition_param);
        return status;
    } catch (std::exception& ex) {
        return Status(StatusCode::UnknownError, "fail to drop partition: " + std::string(ex.what()));
    }
}

S
starlord 已提交
459
}  // namespace milvus