提交 7a7a73e8 编写于 作者: X xige-16 提交者: yefu.chen

Fix high memory usage in pulsarTtStream

Signed-off-by: Nxige-16 <xi.ge@zilliz.com>
上级 15dd1748
......@@ -185,7 +185,6 @@ if ( BUILD_UNIT_TEST STREQUAL "ON" )
append_flags( CMAKE_CXX_FLAGS FLAGS "-DELPP_DISABLE_LOGS")
add_subdirectory(unittest)
add_subdirectory(bench)
endif ()
add_custom_target( Clean-All COMMAND ${CMAKE_BUILD_TOOL} clean )
......
include_directories(${CMAKE_HOME_DIRECTORY}/src)
include_directories(${CMAKE_HOME_DIRECTORY}/unittest)
include_directories(${CMAKE_HOME_DIRECTORY}/src/index/knowhere)
set(bench_srcs
bench_naive.cpp
bench_search.cpp
)
add_executable(all_bench ${bench_srcs})
target_link_libraries(all_bench
milvus_segcore
milvus_indexbuilder
log
pthread
)
target_link_libraries(all_bench benchmark::benchmark_main)
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <benchmark/benchmark.h>
#include <string>
static void
BN_Naive_StringCreation(benchmark::State& state) {
for (auto _ : state) std::string empty_string;
}
// Register the function as a benchmark
BENCHMARK(BN_Naive_StringCreation);
// Define another benchmark
static void
BN_Naive_StringCopy(benchmark::State& state) {
std::string x = "hello";
for (auto _ : state) std::string copy(x);
}
BENCHMARK(BN_Naive_StringCopy);
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <cstdint>
#include <benchmark/benchmark.h>
#include <string>
#include "segcore/SegmentGrowing.h"
#include "segcore/SegmentSealed.h"
#include "test_utils/DataGen.h"
using namespace milvus;
using namespace milvus::query;
using namespace milvus::segcore;
static int dim = 128;
static int64_t N = 1024 * 1024 * 1;
const auto schema = []() {
auto schema = std::make_shared<Schema>();
schema->AddDebugField("fakevec", DataType::VECTOR_FLOAT, dim, MetricType::METRIC_L2);
return schema;
}();
const auto dataset_ = [] {
auto dataset_ = DataGen(schema, N);
return dataset_;
}();
const auto plan = [] {
std::string dsl = R"({
"bool": {
"must": [
{
"vector": {
"fakevec": {
"metric_type": "L2",
"params": {
"nprobe": 4
},
"query": "$0",
"topk": 5
}
}
}
]
}
})";
auto plan = CreatePlan(*schema, dsl);
return plan;
}();
auto ph_group = [] {
auto num_queries = 5;
auto ph_group_raw = CreatePlaceholderGroup(num_queries, dim, 1024);
auto ph_group = ParsePlaceholderGroup(plan.get(), ph_group_raw.SerializeAsString());
return ph_group;
}();
static void
Search_SmallIndex(benchmark::State& state) {
// schema->AddDebugField("age", DataType::FLOAT);
auto is_small_index = state.range(0);
auto chunk_size = state.range(1) * 1024;
auto segment = CreateGrowingSegment(schema, chunk_size);
if (!is_small_index) {
segment->debug_disable_small_index();
}
segment->PreInsert(N);
ColumnBasedRawData raw_data;
raw_data.columns_ = dataset_.cols_;
raw_data.count = N;
segment->Insert(0, N, dataset_.row_ids_.data(), dataset_.timestamps_.data(), raw_data);
Timestamp time = 10000000;
std::vector<const PlaceholderGroup*> ph_group_arr = {ph_group.get()};
for (auto _ : state) {
auto qr = segment->Search(plan.get(), ph_group_arr.data(), &time, 1);
}
}
BENCHMARK(Search_SmallIndex)->MinTime(5)->ArgsProduct({{true, false}, {8, 16, 32, 64, 128}});
static void
Search_Sealed(benchmark::State& state) {
auto segment = CreateSealedSegment(schema);
SealedLoader(dataset_, *segment);
auto choice = state.range(0);
if (choice == 0) {
// Brute Force
} else if (choice == 1) {
// ivf
auto vec = (const float*)dataset_.cols_[0].data();
auto indexing = GenIndexing(N, dim, vec);
LoadIndexInfo info;
info.index = indexing;
info.index_params["index_type"] = "IVF";
info.index_params["index_mode"] = "CPU";
info.index_params["metric_type"] = MetricTypeToName(MetricType::METRIC_L2);
segment->LoadIndex(info);
}
Timestamp time = 10000000;
std::vector<const PlaceholderGroup*> ph_group_arr = {ph_group.get()};
for (auto _ : state) {
auto qr = segment->Search(plan.get(), ph_group_arr.data(), &time, 1);
}
}
BENCHMARK(Search_Sealed)->MinTime(5)->Arg(1)->Arg(0);
......@@ -5,5 +5,4 @@
*src/grpc*
*output*
*unittest*
*bench*
*src/pb*
......@@ -12,6 +12,5 @@ formatThis() {
formatThis "${CorePath}/src"
formatThis "${CorePath}/unittest"
formatThis "${CorePath}/bench"
${CorePath}/build-support/add_license.sh ${CorePath}/build-support/cpp_license.txt ${CorePath}
......@@ -50,9 +50,6 @@ class SegmentGrowing : public SegmentInternalInterface {
};
public:
virtual void
debug_disable_small_index() = 0;
virtual int64_t
PreInsert(int64_t size) = 0;
......
......@@ -188,10 +188,9 @@ SegmentGrowingImpl::do_insert(int64_t reserved_begin,
// NOTE: this must be the last step, cannot be put above
uid2offset_.insert(std::make_pair(row_id, reserved_begin + i));
}
record_.ack_responder_.AddSegment(reserved_begin, reserved_begin + size);
if (!debug_disable_small_index_) {
indexing_record_.UpdateResourceAck(record_.ack_responder_.GetAck() / size_per_chunk_, record_);
}
indexing_record_.UpdateResourceAck(record_.ack_responder_.GetAck() / size_per_chunk_, record_);
}
Status
......
......@@ -112,11 +112,6 @@ class SegmentGrowingImpl : public SegmentGrowing {
}
public:
void
debug_disable_small_index() override {
debug_disable_small_index_ = true;
}
ssize_t
get_row_count() const override {
return record_.ack_responder_.GetAck();
......@@ -209,9 +204,6 @@ class SegmentGrowingImpl : public SegmentGrowing {
SealedIndexingRecord sealed_indexing_record_;
tbb::concurrent_unordered_multimap<idx_t, int64_t> uid2offset_;
private:
bool debug_disable_small_index_ = false;
};
} // namespace milvus::segcore
......@@ -41,7 +41,6 @@ find_package( Threads REQUIRED )
# ****************************** Thirdparty googletest ***************************************
if ( MILVUS_BUILD_TESTS )
add_subdirectory( gtest )
add_subdirectory( google_benchmark)
endif()
......
include(FetchContent)
FetchContent_Declare(google_benchmark
URL https://github.com/google/benchmark/archive/v1.5.2.tar.gz
URL_MD5 084b34aceaeac11a6607d35220ca2efa
DOWNLOAD_DIR ${THIRDPARTY_DOWNLOAD_PATH}
SOURCE_DIR ${CMAKE_CURRENT_BINARY_DIR}/google_benchmark
BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/google_benchmark
)
FetchContent_GetProperties( google_benchmark )
if ( NOT google_benchmark_POPULATED )
FetchContent_Populate( google_benchmark )
# Adding the following targets:
# gtest, gtest_main, gmock, gmock_main
message("gb=${google_benchmark_SOURCE_DIR}")
add_subdirectory( ${google_benchmark_SOURCE_DIR}
${google_benchmark_BINARY_DIR}
EXCLUDE_FROM_ALL )
endif()
......@@ -9,12 +9,12 @@ import (
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/log"
"go.uber.org/zap"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/log"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
......@@ -475,6 +475,7 @@ type PulsarTtMsgStream struct {
unsolvedBuf map[Consumer][]TsMsg
unsolvedMutex *sync.Mutex
lastTimeStamp Timestamp
syncConsumer chan int
}
func newPulsarTtMsgStream(ctx context.Context,
......@@ -487,11 +488,13 @@ func newPulsarTtMsgStream(ctx context.Context,
return nil, err
}
unsolvedBuf := make(map[Consumer][]TsMsg)
syncConsumer := make(chan int, 1)
return &PulsarTtMsgStream{
PulsarMsgStream: *pulsarMsgStream,
unsolvedBuf: unsolvedBuf,
unsolvedMutex: &sync.Mutex{},
syncConsumer: syncConsumer,
}, nil
}
......@@ -515,6 +518,9 @@ func (ms *PulsarTtMsgStream) AsConsumer(channels []string,
}
ms.consumerLock.Lock()
if len(ms.consumers) == 0 {
ms.syncConsumer <- 1
}
ms.consumers = append(ms.consumers, pc)
ms.unsolvedBuf[pc] = make([]TsMsg, 0)
ms.consumerChannels = append(ms.consumerChannels, channels[i])
......@@ -536,12 +542,37 @@ func (ms *PulsarTtMsgStream) Start() {
}
}
func (ms *PulsarTtMsgStream) Close() {
ms.streamCancel()
close(ms.syncConsumer)
ms.wait.Wait()
for _, producer := range ms.producers {
if producer != nil {
producer.Close()
}
}
for _, consumer := range ms.consumers {
if consumer != nil {
consumer.Close()
}
}
if ms.client != nil {
ms.client.Close()
}
}
func (ms *PulsarTtMsgStream) bufMsgPackToChannel() {
defer ms.wait.Done()
ms.unsolvedBuf = make(map[Consumer][]TsMsg)
isChannelReady := make(map[Consumer]bool)
eofMsgTimeStamp := make(map[Consumer]Timestamp)
if _, ok := <-ms.syncConsumer; !ok {
log.Debug("consumer closed!")
return
}
for {
select {
case <-ms.ctx.Done():
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册