ClientProxy.cpp 13.8 KB
Newer Older
K
kun yu 已提交
1 2 3 4 5 6
/*******************************************************************************
* Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
* Unauthorized copying of this file, via any medium is strictly prohibited.
* Proprietary and confidential.
******************************************************************************/
#include "ClientProxy.h"
K
kun yu 已提交
7
#include "version.h"
K
kun yu 已提交
8
#include "milvus.grpc.pb.h"
K
kun yu 已提交
9
//#define GRPC_MULTIPLE_THREAD;
K
kun yu 已提交
10 11 12

namespace milvus {

K
kun yu 已提交
13 14 15 16 17 18 19 20 21 22
bool
UriCheck(const std::string &uri) {
     size_t index = uri.find_first_of(':', 0);
    if (index == std::string::npos) {
        return false;
    } else {
        return true;
    }
}

K
kun yu 已提交
23 24 25 26 27 28 29
Status
ClientProxy::Connect(const ConnectParam &param) {
    std::string uri = param.ip_address + ":" + param.port;

    channel_ = ::grpc::CreateChannel(uri, ::grpc::InsecureChannelCredentials());
    if (channel_ != nullptr) {
        connected_ = true;
K
kun yu 已提交
30 31 32 33 34 35
        client_ptr_ = std::make_shared<GrpcClient>(channel_);
        return Status::OK();
    } else {
        std::string reason = "connect failed!";
        connected_ = false;
        return Status(StatusCode::NotConnected, reason);
K
kun yu 已提交
36 37 38 39 40
    }
}

Status
ClientProxy::Connect(const std::string &uri) {
K
kun yu 已提交
41
    if (!UriCheck(uri)) {
K
kun yu 已提交
42 43
        return Status::Invalid("Invalid uri");
    }
K
kun yu 已提交
44
    size_t index = uri.find_first_of(':', 0);
K
kun yu 已提交
45 46 47 48 49 50 51 52 53 54 55 56

    ConnectParam param;
    param.ip_address = uri.substr(0, index);
    param.port = uri.substr(index + 1);

    return Connect(param);
}

Status
ClientProxy::Connected() const {
    try {
        std::string info;
Y
Yu Kun 已提交
57
        return client_ptr_->Cmd(info, "");
K
kun yu 已提交
58 59 60 61 62 63 64
    } catch (std::exception &ex) {
        return Status(StatusCode::NotConnected, "connection lost: " + std::string(ex.what()));
    }
}

Status
ClientProxy::Disconnect() {
K
kun yu 已提交
65
    try {
K
kun yu 已提交
66
        Status status = client_ptr_->Disconnect();
K
kun yu 已提交
67 68
        connected_ = false;
        channel_.reset();
K
kun yu 已提交
69
        return status;
K
kun yu 已提交
70 71 72
    }catch (std::exception &ex) {
        return Status(StatusCode::UnknownError, "failed to disconnect: " + std::string(ex.what()));
    }
K
kun yu 已提交
73 74 75 76
}

std::string
ClientProxy::ClientVersion() const {
K
kun yu 已提交
77
    return MILVUS_VERSION;
K
kun yu 已提交
78 79 80 81 82 83
}

Status
ClientProxy::CreateTable(const TableSchema &param) {
    try {
        ::milvus::grpc::TableSchema schema;
K
kun yu 已提交
84
        schema.mutable_table_name()->set_table_name(param.table_name);
K
kun yu 已提交
85
        schema.set_dimension(param.dimension);
86
        schema.set_index_file_size(param.index_file_size);
G
groot 已提交
87
        schema.set_metric_type((int32_t)param.metric_type);
K
kun yu 已提交
88

K
kun yu 已提交
89
        return client_ptr_->CreateTable(schema);
K
kun yu 已提交
90 91 92 93 94 95 96
    } catch (std::exception &ex) {
        return Status(StatusCode::UnknownError, "failed to create table: " + std::string(ex.what()));
    }
}

bool
ClientProxy::HasTable(const std::string &table_name) {
K
kun yu 已提交
97
    Status status = Status::OK();
K
kun yu 已提交
98 99
    ::milvus::grpc::TableName grpc_table_name;
    grpc_table_name.set_table_name(table_name);
K
kun yu 已提交
100 101
    bool result = client_ptr_->HasTable(grpc_table_name, status);
    return result;
K
kun yu 已提交
102 103 104 105 106 107 108
}

Status
ClientProxy::DropTable(const std::string &table_name) {
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
K
kun yu 已提交
109
        return client_ptr_->DropTable(grpc_table_name);
K
kun yu 已提交
110 111 112 113 114 115
    } catch (std::exception &ex) {
        return Status(StatusCode::UnknownError, "failed to drop table: " + std::string(ex.what()));
    }
}

Status
Y
Yu Kun 已提交
116
ClientProxy::CreateIndex(const IndexParam &index_param) {
K
kun yu 已提交
117
    try {
Y
Yu Kun 已提交
118 119
        //TODO:add index params
        ::milvus::grpc::IndexParam grpc_index_param;
G
groot 已提交
120
        grpc_index_param.mutable_table_name()->set_table_name(index_param.table_name);
121 122
        grpc_index_param.mutable_index()->set_index_type((int32_t)index_param.index_type);
        grpc_index_param.mutable_index()->set_nlist(index_param.nlist);
Y
Yu Kun 已提交
123
        return client_ptr_->CreateIndex(grpc_index_param);
K
kun yu 已提交
124 125 126 127 128 129

    } catch (std::exception &ex) {
        return Status(StatusCode::UnknownError, "failed to build index: " + std::string(ex.what()));
    }
}

K
kun yu 已提交
130
Status
Y
Yu Kun 已提交
131
ClientProxy::Insert(const std::string &table_name,
K
kun yu 已提交
132 133
                          const std::vector<RowRecord> &record_array,
                          std::vector<int64_t> &id_array) {
K
kun yu 已提交
134
    Status status = Status::OK();
K
kun yu 已提交
135 136
    try {
////////////////////////////////////////////////////////////////////////////
K
kun yu 已提交
137
#ifdef GRPC_MULTIPLE_THREAD
K
kun yu 已提交
138 139 140
        std::vector<std::thread> threads;
        int thread_count = 10;

K
kun yu 已提交
141 142 143 144 145 146 147 148
        std::shared_ptr<::milvus::grpc::InsertInfos> insert_info_array(
            new ::milvus::grpc::InsertInfos[thread_count],
            std::default_delete<::milvus::grpc::InsertInfos[]>() );

        std::shared_ptr<::milvus::grpc::VectorIds> vector_ids_array(
            new ::milvus::grpc::VectorIds[thread_count],
                std::default_delete<::milvus::grpc::VectorIds[]>() );

K
kun yu 已提交
149 150 151
        int64_t record_count = record_array.size() / thread_count;

        for (size_t i = 0; i < thread_count; i++) {
K
kun yu 已提交
152
            insert_info_array.get()[i].set_table_name(table_name);
K
kun yu 已提交
153
            for (size_t j = i * record_count; j < record_count * (i + 1); j++) {
K
kun yu 已提交
154 155
                ::milvus::grpc::RowRecord *grpc_record =
                    insert_info_array.get()[i].add_row_record_array();
K
kun yu 已提交
156 157 158 159 160 161 162 163 164 165
                for (size_t k = 0; k < record_array[j].data.size(); k++) {
                    grpc_record->add_vector_data(record_array[j].data[k]);
                }
            }
        }

        std::cout << "*****************************************************\n";
        auto start = std::chrono::high_resolution_clock::now();
        for (size_t j = 0; j < thread_count; j++) {
            threads.push_back(
K
kun yu 已提交
166 167 168
                    std::thread(&GrpcClient::InsertVector, client_ptr_,
                        std::ref(vector_ids_array.get()[j]), std::ref(insert_info_array.get()[j]),
                        std::ref(status)));
K
kun yu 已提交
169 170 171 172 173 174 175
        }
        std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
        auto finish = std::chrono::high_resolution_clock::now();
        std::cout << "InsertVector cost: " << std::chrono::duration_cast<std::chrono::duration<double>>(finish - start).count() << "s\n";
        std::cout << "*****************************************************\n";

        for (size_t i = 0; i < thread_count; i++) {
K
kun yu 已提交
176 177
            for (size_t j = 0; j < vector_ids_array.get()[i].vector_id_array_size(); j++) {
                id_array.push_back(vector_ids_array.get()[i].vector_id_array(j));
K
kun yu 已提交
178 179
            }
        }
K
kun yu 已提交
180
#else
Y
Yu Kun 已提交
181 182
        ::milvus::grpc::InsertParam insert_param;
        insert_param.set_table_name(table_name);
K
kun yu 已提交
183 184

        for (auto &record : record_array) {
Y
Yu Kun 已提交
185
            ::milvus::grpc::RowRecord *grpc_record = insert_param.add_row_record_array();
K
kun yu 已提交
186 187 188 189 190 191
            for (size_t i = 0; i < record.data.size(); i++) {
                grpc_record->add_vector_data(record.data[i]);
            }
        }

        //Single thread
Y
Yu Kun 已提交
192 193 194 195 196 197 198 199 200 201 202
        ::milvus::grpc::VectorIds vector_ids;
        if (!id_array.empty()) {
            for (auto i = 0; i < id_array.size(); i++) {
                insert_param.add_row_id_array(id_array[i]);
            }
            client_ptr_->Insert(vector_ids, insert_param, status);
        } else {
            client_ptr_->Insert(vector_ids, insert_param, status);
            for (size_t i = 0; i < vector_ids.vector_id_array_size(); i++) {
                id_array.push_back(vector_ids.vector_id_array(i));
            }
K
kun yu 已提交
203
        }
Y
Yu Kun 已提交
204

K
kun yu 已提交
205 206
#endif

K
kun yu 已提交
207
    } catch (std::exception &ex) {
K
kun yu 已提交
208
        return Status(StatusCode::UnknownError, "fail to add vector: " + std::string(ex.what()));
K
kun yu 已提交
209 210
    }

K
kun yu 已提交
211
    return status;
K
kun yu 已提交
212 213 214
}

Status
Y
Yu Kun 已提交
215
ClientProxy::Search(const std::string &table_name,
K
kun yu 已提交
216 217 218
                          const std::vector<RowRecord> &query_record_array,
                          const std::vector<Range> &query_range_array,
                          int64_t topk,
Y
Yu Kun 已提交
219
                          int64_t nprobe,
K
kun yu 已提交
220 221 222
                          std::vector<TopKQueryResult> &topk_query_result_array) {
    try {
        //step 1: convert vectors data
Y
Yu Kun 已提交
223 224 225
        ::milvus::grpc::SearchParam search_param;
        search_param.set_table_name(table_name);
        search_param.set_topk(topk);
Y
Yu Kun 已提交
226
        search_param.set_nprobe(nprobe);
K
kun yu 已提交
227
        for (auto &record : query_record_array) {
Y
Yu Kun 已提交
228
            ::milvus::grpc::RowRecord *row_record = search_param.add_query_record_array();
K
kun yu 已提交
229 230
            for (auto &rec : record.data) {
                row_record->add_vector_data(rec);
K
kun yu 已提交
231 232 233 234 235
            }
        }

        //step 2: convert range array
        for (auto &range : query_range_array) {
Y
Yu Kun 已提交
236
            ::milvus::grpc::Range *grpc_range = search_param.add_query_range_array();
K
kun yu 已提交
237 238 239 240 241
            grpc_range->set_start_value(range.start_value);
            grpc_range->set_end_value(range.end_value);
        }

        //step 3: search vectors
242 243
        ::milvus::grpc::TopKQueryResultList topk_query_result_list;
        Status status = client_ptr_->Search(topk_query_result_list, search_param);
K
kun yu 已提交
244 245

        //step 4: convert result array
246
        for (uint64_t i = 0; i < topk_query_result_list.topk_query_result_size(); ++i) {
K
kun yu 已提交
247
            TopKQueryResult result;
248
            for (uint64_t j = 0; j < topk_query_result_list.topk_query_result(i).query_result_arrays_size(); ++j) {
K
kun yu 已提交
249
                QueryResult query_result;
250 251
                query_result.id = topk_query_result_list.topk_query_result(i).query_result_arrays(j).id();
                query_result.distance = topk_query_result_list.topk_query_result(i).query_result_arrays(j).distance();
K
kun yu 已提交
252 253 254 255 256
                result.query_result_arrays.emplace_back(query_result);
            }

            topk_query_result_array.emplace_back(result);
        }
K
kun yu 已提交
257
        return status;
K
kun yu 已提交
258 259

    } catch (std::exception &ex) {
K
kun yu 已提交
260
        return Status(StatusCode::UnknownError, "fail to search vectors: " + std::string(ex.what()));
K
kun yu 已提交
261 262 263 264 265 266 267 268 269
    }

}

Status
ClientProxy::DescribeTable(const std::string &table_name, TableSchema &table_schema) {
    try {
        ::milvus::grpc::TableSchema grpc_schema;

K
kun yu 已提交
270
        Status status = client_ptr_->DescribeTable(grpc_schema, table_name);
K
kun yu 已提交
271

K
kun yu 已提交
272
        table_schema.table_name = grpc_schema.table_name().table_name();
K
kun yu 已提交
273
        table_schema.dimension = grpc_schema.dimension();
274
        table_schema.index_file_size = grpc_schema.index_file_size();
G
groot 已提交
275
        table_schema.metric_type = (MetricType)grpc_schema.metric_type();
K
kun yu 已提交
276 277

        return status;
K
kun yu 已提交
278
    } catch (std::exception &ex) {
K
kun yu 已提交
279
        return Status(StatusCode::UnknownError, "fail to describe table: " + std::string(ex.what()));
K
kun yu 已提交
280 281 282 283 284
    }

}

Status
Y
Yu Kun 已提交
285
ClientProxy::CountTable(const std::string &table_name, int64_t &row_count) {
K
kun yu 已提交
286
    try {
K
kun yu 已提交
287
        Status status;
Y
Yu Kun 已提交
288
        row_count = client_ptr_->CountTable(table_name, status);
K
kun yu 已提交
289
        return status;
K
kun yu 已提交
290
    } catch (std::exception &ex) {
K
kun yu 已提交
291
        return Status(StatusCode::UnknownError, "fail to show tables: " + std::string(ex.what()));
K
kun yu 已提交
292 293 294 295 296 297
    }
}

Status
ClientProxy::ShowTables(std::vector<std::string> &table_array) {
    try {
K
kun yu 已提交
298
        return client_ptr_->ShowTables(table_array);
K
kun yu 已提交
299 300

    } catch (std::exception &ex) {
K
kun yu 已提交
301
        return Status(StatusCode::UnknownError, "fail to show tables: " + std::string(ex.what()));
K
kun yu 已提交
302 303 304 305 306
    }
}

std::string
ClientProxy::ServerVersion() const {
K
kun yu 已提交
307
    Status status = Status::OK();
K
kun yu 已提交
308 309
    try {
        std::string version;
Y
Yu Kun 已提交
310
        Status status = client_ptr_->Cmd(version, "version");
K
kun yu 已提交
311 312 313 314 315 316 317 318
        return version;
    } catch (std::exception &ex) {
        return "";
    }
}

std::string
ClientProxy::ServerStatus() const {
K
kun yu 已提交
319
    if (channel_ == nullptr) {
K
kun yu 已提交
320 321 322 323 324
        return "not connected to server";
    }

    try {
        std::string dummy;
Y
Yu Kun 已提交
325
        Status status = client_ptr_->Cmd(dummy, "");
K
kun yu 已提交
326 327 328 329 330 331
        return "server alive";
    } catch (std::exception &ex) {
        return "connection lost";
    }
}

Y
Yu Kun 已提交
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
std::string
ClientProxy::DumpTaskTables() const {
    if (channel_ == nullptr) {
        return "not connected to server";
    }

    try {
        std::string dummy;
        Status status = client_ptr_->Cmd(dummy, "tasktable");
        return dummy;
    } catch (std::exception &ex) {
        return "connection lost";
    }
}

Y
Yu Kun 已提交
347 348
Status
ClientProxy::DeleteByRange(milvus::Range &range, const std::string &table_name) {
349 350 351 352 353 354 355 356 357
    try {
        ::milvus::grpc::DeleteByRangeParam delete_by_range_param;
        delete_by_range_param.set_table_name(table_name);
        delete_by_range_param.mutable_range()->set_start_value(range.start_value);
        delete_by_range_param.mutable_range()->set_end_value(range.end_value);
        return client_ptr_->DeleteByRange(delete_by_range_param);
    } catch (std::exception &ex) {
        return Status(StatusCode::UnknownError, "fail to delete by range: " + std::string(ex.what()));
    }
Y
Yu Kun 已提交
358 359 360 361
}

Status
ClientProxy::PreloadTable(const std::string &table_name) const {
Y
Yu Kun 已提交
362 363 364 365 366 367
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
        Status status = client_ptr_->PreloadTable(grpc_table_name);
        return status;
    } catch (std::exception &ex) {
368
        return Status(StatusCode::UnknownError, "fail to preload tables: " + std::string(ex.what()));
Y
Yu Kun 已提交
369
    }
Y
Yu Kun 已提交
370 371
}

372 373 374 375 376 377 378 379 380 381 382
Status
ClientProxy::DescribeIndex(const std::string &table_name, IndexParam &index_param) const {
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
        ::milvus::grpc::IndexParam grpc_index_param;
        Status status = client_ptr_->DescribeIndex(grpc_table_name, grpc_index_param);
        index_param.index_type = (IndexType)(grpc_index_param.mutable_index()->index_type());
        index_param.nlist = grpc_index_param.mutable_index()->nlist();

        return status;
Y
Yu Kun 已提交
383

384 385 386
    } catch (std::exception &ex) {
        return Status(StatusCode::UnknownError, "fail to describe index: " + std::string(ex.what()));
    }
Y
Yu Kun 已提交
387 388 389 390
}

Status
ClientProxy::DropIndex(const std::string &table_name) const {
391 392 393 394 395 396 397 398
    try {
        ::milvus::grpc::TableName grpc_table_name;
        grpc_table_name.set_table_name(table_name);
        Status status = client_ptr_->DropIndex(grpc_table_name);
        return status;
    } catch (std::exception &ex) {
        return Status(StatusCode::UnknownError, "fail to drop index: " + std::string(ex.what()));
    }
Y
Yu Kun 已提交
399 400
}

K
kun yu 已提交
401
}