未验证 提交 255e3959 编写于 作者: F FluorineDog 提交者: GitHub

support time travel (#5894)

* support time travel
Signed-off-by: Nfluorinedog <fluorinedog@gmail.com>

* lint
Signed-off-by: Nfluorinedog <fluorinedog@gmail.com>
上级 3cc0ab13
......@@ -18,11 +18,14 @@
#include <memory>
#include <vector>
#include <utility>
#include <limits>
#include <NamedType/named_type.hpp>
#include "pb/schema.pb.h"
namespace milvus {
using Timestamp = uint64_t; // TODO: use TiKV-like timestamp
constexpr auto MAX_TIMESTAMP = std::numeric_limits<Timestamp>::max();
using engine::DataType;
using engine::FieldElementType;
using engine::idx_t;
......
此差异已折叠。
......@@ -48,7 +48,7 @@ struct TableStruct_segcore_2eproto {
PROTOBUF_SECTION_VARIABLE(protodesc_cold);
static const ::PROTOBUF_NAMESPACE_ID::internal::AuxillaryParseTableField aux[]
PROTOBUF_SECTION_VARIABLE(protodesc_cold);
static const ::PROTOBUF_NAMESPACE_ID::internal::ParseTable schema[2]
static const ::PROTOBUF_NAMESPACE_ID::internal::ParseTable schema[4]
PROTOBUF_SECTION_VARIABLE(protodesc_cold);
static const ::PROTOBUF_NAMESPACE_ID::internal::FieldMetadata field_metadata[];
static const ::PROTOBUF_NAMESPACE_ID::internal::SerializationTable serialization_table[];
......@@ -58,6 +58,12 @@ extern const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table
namespace milvus {
namespace proto {
namespace segcore {
class LoadFieldMeta;
class LoadFieldMetaDefaultTypeInternal;
extern LoadFieldMetaDefaultTypeInternal _LoadFieldMeta_default_instance_;
class LoadSegmentMeta;
class LoadSegmentMetaDefaultTypeInternal;
extern LoadSegmentMetaDefaultTypeInternal _LoadSegmentMeta_default_instance_;
class RetrieveRequest;
class RetrieveRequestDefaultTypeInternal;
extern RetrieveRequestDefaultTypeInternal _RetrieveRequest_default_instance_;
......@@ -68,6 +74,8 @@ extern RetrieveResultsDefaultTypeInternal _RetrieveResults_default_instance_;
} // namespace proto
} // namespace milvus
PROTOBUF_NAMESPACE_OPEN
template<> ::milvus::proto::segcore::LoadFieldMeta* Arena::CreateMaybeMessage<::milvus::proto::segcore::LoadFieldMeta>(Arena*);
template<> ::milvus::proto::segcore::LoadSegmentMeta* Arena::CreateMaybeMessage<::milvus::proto::segcore::LoadSegmentMeta>(Arena*);
template<> ::milvus::proto::segcore::RetrieveRequest* Arena::CreateMaybeMessage<::milvus::proto::segcore::RetrieveRequest>(Arena*);
template<> ::milvus::proto::segcore::RetrieveResults* Arena::CreateMaybeMessage<::milvus::proto::segcore::RetrieveResults>(Arena*);
PROTOBUF_NAMESPACE_CLOSE
......@@ -375,6 +383,295 @@ class RetrieveResults :
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_segcore_2eproto;
};
// -------------------------------------------------------------------
class LoadFieldMeta :
public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:milvus.proto.segcore.LoadFieldMeta) */ {
public:
LoadFieldMeta();
virtual ~LoadFieldMeta();
LoadFieldMeta(const LoadFieldMeta& from);
LoadFieldMeta(LoadFieldMeta&& from) noexcept
: LoadFieldMeta() {
*this = ::std::move(from);
}
inline LoadFieldMeta& operator=(const LoadFieldMeta& from) {
CopyFrom(from);
return *this;
}
inline LoadFieldMeta& operator=(LoadFieldMeta&& from) noexcept {
if (GetArenaNoVirtual() == from.GetArenaNoVirtual()) {
if (this != &from) InternalSwap(&from);
} else {
CopyFrom(from);
}
return *this;
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() {
return GetDescriptor();
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() {
return GetMetadataStatic().descriptor;
}
static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() {
return GetMetadataStatic().reflection;
}
static const LoadFieldMeta& default_instance();
static void InitAsDefaultInstance(); // FOR INTERNAL USE ONLY
static inline const LoadFieldMeta* internal_default_instance() {
return reinterpret_cast<const LoadFieldMeta*>(
&_LoadFieldMeta_default_instance_);
}
static constexpr int kIndexInFileMessages =
2;
friend void swap(LoadFieldMeta& a, LoadFieldMeta& b) {
a.Swap(&b);
}
inline void Swap(LoadFieldMeta* other) {
if (other == this) return;
InternalSwap(other);
}
// implements Message ----------------------------------------------
inline LoadFieldMeta* New() const final {
return CreateMaybeMessage<LoadFieldMeta>(nullptr);
}
LoadFieldMeta* New(::PROTOBUF_NAMESPACE_ID::Arena* arena) const final {
return CreateMaybeMessage<LoadFieldMeta>(arena);
}
void CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void CopyFrom(const LoadFieldMeta& from);
void MergeFrom(const LoadFieldMeta& from);
PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final;
bool IsInitialized() const final;
size_t ByteSizeLong() const final;
#if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final;
#else
bool MergePartialFromCodedStream(
::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) final;
#endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
void SerializeWithCachedSizes(
::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const final;
::PROTOBUF_NAMESPACE_ID::uint8* InternalSerializeWithCachedSizesToArray(
::PROTOBUF_NAMESPACE_ID::uint8* target) const final;
int GetCachedSize() const final { return _cached_size_.Get(); }
private:
inline void SharedCtor();
inline void SharedDtor();
void SetCachedSize(int size) const final;
void InternalSwap(LoadFieldMeta* other);
friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata;
static ::PROTOBUF_NAMESPACE_ID::StringPiece FullMessageName() {
return "milvus.proto.segcore.LoadFieldMeta";
}
private:
inline ::PROTOBUF_NAMESPACE_ID::Arena* GetArenaNoVirtual() const {
return nullptr;
}
inline void* MaybeArenaPtr() const {
return nullptr;
}
public:
::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final;
private:
static ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadataStatic() {
::PROTOBUF_NAMESPACE_ID::internal::AssignDescriptors(&::descriptor_table_segcore_2eproto);
return ::descriptor_table_segcore_2eproto.file_level_metadata[kIndexInFileMessages];
}
public:
// nested types ----------------------------------------------------
// accessors -------------------------------------------------------
enum : int {
kMinTimestampFieldNumber = 1,
kMaxTimestampFieldNumber = 2,
kRowCountFieldNumber = 3,
};
// int64 min_timestamp = 1;
void clear_min_timestamp();
::PROTOBUF_NAMESPACE_ID::int64 min_timestamp() const;
void set_min_timestamp(::PROTOBUF_NAMESPACE_ID::int64 value);
// int64 max_timestamp = 2;
void clear_max_timestamp();
::PROTOBUF_NAMESPACE_ID::int64 max_timestamp() const;
void set_max_timestamp(::PROTOBUF_NAMESPACE_ID::int64 value);
// int64 row_count = 3;
void clear_row_count();
::PROTOBUF_NAMESPACE_ID::int64 row_count() const;
void set_row_count(::PROTOBUF_NAMESPACE_ID::int64 value);
// @@protoc_insertion_point(class_scope:milvus.proto.segcore.LoadFieldMeta)
private:
class _Internal;
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::int64 min_timestamp_;
::PROTOBUF_NAMESPACE_ID::int64 max_timestamp_;
::PROTOBUF_NAMESPACE_ID::int64 row_count_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_segcore_2eproto;
};
// -------------------------------------------------------------------
class LoadSegmentMeta :
public ::PROTOBUF_NAMESPACE_ID::Message /* @@protoc_insertion_point(class_definition:milvus.proto.segcore.LoadSegmentMeta) */ {
public:
LoadSegmentMeta();
virtual ~LoadSegmentMeta();
LoadSegmentMeta(const LoadSegmentMeta& from);
LoadSegmentMeta(LoadSegmentMeta&& from) noexcept
: LoadSegmentMeta() {
*this = ::std::move(from);
}
inline LoadSegmentMeta& operator=(const LoadSegmentMeta& from) {
CopyFrom(from);
return *this;
}
inline LoadSegmentMeta& operator=(LoadSegmentMeta&& from) noexcept {
if (GetArenaNoVirtual() == from.GetArenaNoVirtual()) {
if (this != &from) InternalSwap(&from);
} else {
CopyFrom(from);
}
return *this;
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* descriptor() {
return GetDescriptor();
}
static const ::PROTOBUF_NAMESPACE_ID::Descriptor* GetDescriptor() {
return GetMetadataStatic().descriptor;
}
static const ::PROTOBUF_NAMESPACE_ID::Reflection* GetReflection() {
return GetMetadataStatic().reflection;
}
static const LoadSegmentMeta& default_instance();
static void InitAsDefaultInstance(); // FOR INTERNAL USE ONLY
static inline const LoadSegmentMeta* internal_default_instance() {
return reinterpret_cast<const LoadSegmentMeta*>(
&_LoadSegmentMeta_default_instance_);
}
static constexpr int kIndexInFileMessages =
3;
friend void swap(LoadSegmentMeta& a, LoadSegmentMeta& b) {
a.Swap(&b);
}
inline void Swap(LoadSegmentMeta* other) {
if (other == this) return;
InternalSwap(other);
}
// implements Message ----------------------------------------------
inline LoadSegmentMeta* New() const final {
return CreateMaybeMessage<LoadSegmentMeta>(nullptr);
}
LoadSegmentMeta* New(::PROTOBUF_NAMESPACE_ID::Arena* arena) const final {
return CreateMaybeMessage<LoadSegmentMeta>(arena);
}
void CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void MergeFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) final;
void CopyFrom(const LoadSegmentMeta& from);
void MergeFrom(const LoadSegmentMeta& from);
PROTOBUF_ATTRIBUTE_REINITIALIZES void Clear() final;
bool IsInitialized() const final;
size_t ByteSizeLong() const final;
#if GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
const char* _InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID::internal::ParseContext* ctx) final;
#else
bool MergePartialFromCodedStream(
::PROTOBUF_NAMESPACE_ID::io::CodedInputStream* input) final;
#endif // GOOGLE_PROTOBUF_ENABLE_EXPERIMENTAL_PARSER
void SerializeWithCachedSizes(
::PROTOBUF_NAMESPACE_ID::io::CodedOutputStream* output) const final;
::PROTOBUF_NAMESPACE_ID::uint8* InternalSerializeWithCachedSizesToArray(
::PROTOBUF_NAMESPACE_ID::uint8* target) const final;
int GetCachedSize() const final { return _cached_size_.Get(); }
private:
inline void SharedCtor();
inline void SharedDtor();
void SetCachedSize(int size) const final;
void InternalSwap(LoadSegmentMeta* other);
friend class ::PROTOBUF_NAMESPACE_ID::internal::AnyMetadata;
static ::PROTOBUF_NAMESPACE_ID::StringPiece FullMessageName() {
return "milvus.proto.segcore.LoadSegmentMeta";
}
private:
inline ::PROTOBUF_NAMESPACE_ID::Arena* GetArenaNoVirtual() const {
return nullptr;
}
inline void* MaybeArenaPtr() const {
return nullptr;
}
public:
::PROTOBUF_NAMESPACE_ID::Metadata GetMetadata() const final;
private:
static ::PROTOBUF_NAMESPACE_ID::Metadata GetMetadataStatic() {
::PROTOBUF_NAMESPACE_ID::internal::AssignDescriptors(&::descriptor_table_segcore_2eproto);
return ::descriptor_table_segcore_2eproto.file_level_metadata[kIndexInFileMessages];
}
public:
// nested types ----------------------------------------------------
// accessors -------------------------------------------------------
enum : int {
kMetasFieldNumber = 1,
kTotalSizeFieldNumber = 2,
};
// repeated .milvus.proto.segcore.LoadFieldMeta metas = 1;
int metas_size() const;
void clear_metas();
::milvus::proto::segcore::LoadFieldMeta* mutable_metas(int index);
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::segcore::LoadFieldMeta >*
mutable_metas();
const ::milvus::proto::segcore::LoadFieldMeta& metas(int index) const;
::milvus::proto::segcore::LoadFieldMeta* add_metas();
const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::segcore::LoadFieldMeta >&
metas() const;
// int64 total_size = 2;
void clear_total_size();
::PROTOBUF_NAMESPACE_ID::int64 total_size() const;
void set_total_size(::PROTOBUF_NAMESPACE_ID::int64 value);
// @@protoc_insertion_point(class_scope:milvus.proto.segcore.LoadSegmentMeta)
private:
class _Internal;
::PROTOBUF_NAMESPACE_ID::internal::InternalMetadataWithArena _internal_metadata_;
::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::segcore::LoadFieldMeta > metas_;
::PROTOBUF_NAMESPACE_ID::int64 total_size_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_segcore_2eproto;
};
// ===================================================================
......@@ -572,11 +869,109 @@ RetrieveResults::fields_data() const {
return fields_data_;
}
// -------------------------------------------------------------------
// LoadFieldMeta
// int64 min_timestamp = 1;
inline void LoadFieldMeta::clear_min_timestamp() {
min_timestamp_ = PROTOBUF_LONGLONG(0);
}
inline ::PROTOBUF_NAMESPACE_ID::int64 LoadFieldMeta::min_timestamp() const {
// @@protoc_insertion_point(field_get:milvus.proto.segcore.LoadFieldMeta.min_timestamp)
return min_timestamp_;
}
inline void LoadFieldMeta::set_min_timestamp(::PROTOBUF_NAMESPACE_ID::int64 value) {
min_timestamp_ = value;
// @@protoc_insertion_point(field_set:milvus.proto.segcore.LoadFieldMeta.min_timestamp)
}
// int64 max_timestamp = 2;
inline void LoadFieldMeta::clear_max_timestamp() {
max_timestamp_ = PROTOBUF_LONGLONG(0);
}
inline ::PROTOBUF_NAMESPACE_ID::int64 LoadFieldMeta::max_timestamp() const {
// @@protoc_insertion_point(field_get:milvus.proto.segcore.LoadFieldMeta.max_timestamp)
return max_timestamp_;
}
inline void LoadFieldMeta::set_max_timestamp(::PROTOBUF_NAMESPACE_ID::int64 value) {
max_timestamp_ = value;
// @@protoc_insertion_point(field_set:milvus.proto.segcore.LoadFieldMeta.max_timestamp)
}
// int64 row_count = 3;
inline void LoadFieldMeta::clear_row_count() {
row_count_ = PROTOBUF_LONGLONG(0);
}
inline ::PROTOBUF_NAMESPACE_ID::int64 LoadFieldMeta::row_count() const {
// @@protoc_insertion_point(field_get:milvus.proto.segcore.LoadFieldMeta.row_count)
return row_count_;
}
inline void LoadFieldMeta::set_row_count(::PROTOBUF_NAMESPACE_ID::int64 value) {
row_count_ = value;
// @@protoc_insertion_point(field_set:milvus.proto.segcore.LoadFieldMeta.row_count)
}
// -------------------------------------------------------------------
// LoadSegmentMeta
// repeated .milvus.proto.segcore.LoadFieldMeta metas = 1;
inline int LoadSegmentMeta::metas_size() const {
return metas_.size();
}
inline void LoadSegmentMeta::clear_metas() {
metas_.Clear();
}
inline ::milvus::proto::segcore::LoadFieldMeta* LoadSegmentMeta::mutable_metas(int index) {
// @@protoc_insertion_point(field_mutable:milvus.proto.segcore.LoadSegmentMeta.metas)
return metas_.Mutable(index);
}
inline ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::segcore::LoadFieldMeta >*
LoadSegmentMeta::mutable_metas() {
// @@protoc_insertion_point(field_mutable_list:milvus.proto.segcore.LoadSegmentMeta.metas)
return &metas_;
}
inline const ::milvus::proto::segcore::LoadFieldMeta& LoadSegmentMeta::metas(int index) const {
// @@protoc_insertion_point(field_get:milvus.proto.segcore.LoadSegmentMeta.metas)
return metas_.Get(index);
}
inline ::milvus::proto::segcore::LoadFieldMeta* LoadSegmentMeta::add_metas() {
// @@protoc_insertion_point(field_add:milvus.proto.segcore.LoadSegmentMeta.metas)
return metas_.Add();
}
inline const ::PROTOBUF_NAMESPACE_ID::RepeatedPtrField< ::milvus::proto::segcore::LoadFieldMeta >&
LoadSegmentMeta::metas() const {
// @@protoc_insertion_point(field_list:milvus.proto.segcore.LoadSegmentMeta.metas)
return metas_;
}
// int64 total_size = 2;
inline void LoadSegmentMeta::clear_total_size() {
total_size_ = PROTOBUF_LONGLONG(0);
}
inline ::PROTOBUF_NAMESPACE_ID::int64 LoadSegmentMeta::total_size() const {
// @@protoc_insertion_point(field_get:milvus.proto.segcore.LoadSegmentMeta.total_size)
return total_size_;
}
inline void LoadSegmentMeta::set_total_size(::PROTOBUF_NAMESPACE_ID::int64 value) {
total_size_ = value;
// @@protoc_insertion_point(field_set:milvus.proto.segcore.LoadSegmentMeta.total_size)
}
#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif // __GNUC__
// -------------------------------------------------------------------
// -------------------------------------------------------------------
// -------------------------------------------------------------------
// @@protoc_insertion_point(namespace_scope)
......
......@@ -37,8 +37,8 @@ class ExecExprVisitor : public ExprVisitor {
public:
using RetType = std::deque<boost::dynamic_bitset<>>;
ExecExprVisitor(const segcore::SegmentInternalInterface& segment, int64_t row_count)
: segment_(segment), row_count_(row_count) {
ExecExprVisitor(const segcore::SegmentInternalInterface& segment, int64_t row_count, Timestamp timestamp)
: segment_(segment), row_count_(row_count), timestamp_(timestamp) {
}
RetType
call_child(Expr& expr) {
......@@ -67,5 +67,6 @@ class ExecExprVisitor : public ExprVisitor {
const segcore::SegmentInternalInterface& segment_;
int64_t row_count_;
std::optional<RetType> ret_;
Timestamp timestamp_;
};
} // namespace milvus::query
......@@ -61,6 +61,17 @@ class ExecPlanNodeVisitor : PlanNodeVisitor {
} // namespace impl
#endif
static QueryResult
empty_query_result(int64_t num_queries, int64_t topk, MetricType metric_type) {
QueryResult final_result;
SubQueryResult result(num_queries, topk, metric_type);
final_result.num_queries_ = num_queries;
final_result.topK_ = topk;
final_result.internal_seg_offsets_ = std::move(result.mutable_labels());
final_result.result_distances_ = std::move(result.mutable_values());
return final_result;
}
template <typename VectorType>
void
ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
......@@ -76,15 +87,24 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
aligned_vector<uint8_t> bitset_holder;
BitsetView view;
// TODO: add API to unify row_count
auto row_count = segment->get_row_count();
// auto row_count = segment->get_row_count();
auto active_count = segment->get_active_count(timestamp_);
// skip all calculation
if (active_count == 0) {
ret_ = empty_query_result(num_queries, node.query_info_.topK_, node.query_info_.metric_type_);
return;
}
if (node.predicate_.has_value()) {
ExecExprVisitor::RetType expr_ret = ExecExprVisitor(*segment, row_count).call_child(*node.predicate_.value());
ExecExprVisitor::RetType expr_ret =
ExecExprVisitor(*segment, active_count, timestamp_).call_child(*node.predicate_.value());
segment->mask_with_timestamps(expr_ret, timestamp_);
bitset_holder = AssembleNegBitset(expr_ret);
view = BitsetView(bitset_holder.data(), bitset_holder.size() * 8);
}
segment->vector_search(row_count, node.query_info_, src_data, num_queries, view, ret);
segment->vector_search(active_count, node.query_info_, src_data, num_queries, MAX_TIMESTAMP, view, ret);
ret_ = ret;
}
......
......@@ -28,6 +28,7 @@ set(SEGCORE_FILES
SegcoreConfig.cpp
segcore_init_c.cpp
ScalarIndex.cpp
TimestampIndex.cpp
)
add_library(milvus_segcore SHARED
${SEGCORE_FILES}
......
......@@ -27,6 +27,7 @@
#include "query/PlanImpl.h"
#include "segcore/Reduce.h"
#include "utils/tools.h"
#include <boost/iterator/counting_iterator.hpp>
namespace milvus::segcore {
......@@ -289,6 +290,7 @@ SegmentGrowingImpl::vector_search(int64_t vec_count,
query::QueryInfo query_info,
const void* query_data,
int64_t query_count,
Timestamp timestamp,
const BitsetView& bitset,
QueryResult& output) const {
auto& sealed_indexing = this->get_sealed_indexing_record();
......@@ -492,4 +494,19 @@ SegmentGrowingImpl::debug() const {
return "Growing\n";
}
int64_t
SegmentGrowingImpl::get_active_count(Timestamp ts) const {
auto row_count = this->get_row_count();
auto& ts_vec = this->get_insert_record().timestamps_;
auto iter = std::upper_bound(boost::make_counting_iterator((int64_t)0), boost::make_counting_iterator(row_count),
ts, [&](Timestamp ts, int64_t index) { return ts < ts_vec[index]; });
return *iter;
}
void
SegmentGrowingImpl::mask_with_timestamps(std::deque<boost::dynamic_bitset<>>& bitset_chunks,
Timestamp timestamp) const {
// DO NOTHING
}
} // namespace milvus::segcore
......@@ -33,6 +33,7 @@
#include <memory>
#include <string>
#include <vector>
#include <deque>
namespace milvus::segcore {
......@@ -136,6 +137,9 @@ class SegmentGrowingImpl : public SegmentGrowing {
return 0;
}
int64_t
get_active_count(Timestamp ts) const override;
// for scalar vectors
template <typename T>
void
......@@ -170,11 +174,15 @@ class SegmentGrowingImpl : public SegmentGrowing {
indexing_record_(*schema_, segcore_config_) {
}
void
mask_with_timestamps(std::deque<boost::dynamic_bitset<>>& bitset_chunks, Timestamp timestamp) const override;
void
vector_search(int64_t vec_count,
query::QueryInfo query_info,
const void* query_data,
int64_t query_count,
Timestamp timestamp,
const BitsetView& bitset,
QueryResult& output) const override;
......
......@@ -21,6 +21,7 @@
#include "pb/schema.pb.h"
#include "pb/segcore.pb.h"
#include <memory>
#include <deque>
#include <vector>
#include <utility>
#include <string>
......@@ -104,6 +105,7 @@ class SegmentInternalInterface : public SegmentInterface {
query::QueryInfo query_info,
const void* query_data,
int64_t query_count,
Timestamp timestamp,
const BitsetView& bitset,
QueryResult& output) const = 0;
......@@ -111,6 +113,9 @@ class SegmentInternalInterface : public SegmentInterface {
virtual int64_t
num_chunk_index(FieldOffset field_offset) const = 0;
virtual void
mask_with_timestamps(std::deque<boost::dynamic_bitset<>>& bitset_chunks, Timestamp timestamp) const = 0;
// count of chunks
virtual int64_t
num_chunk() const = 0;
......@@ -119,6 +124,9 @@ class SegmentInternalInterface : public SegmentInterface {
virtual int64_t
size_per_chunk() const = 0;
virtual int64_t
get_active_count(Timestamp ts) const = 0;
protected:
// internal API: return chunk_data in span
virtual SpanBase
......
......@@ -12,6 +12,7 @@
#include <memory>
#include "segcore/SegmentInterface.h"
#include "pb/segcore.pb.h"
#include "common/LoadInfo.h"
#include <utility>
......@@ -22,6 +23,8 @@ class SegmentSealed : public SegmentInternalInterface {
virtual void
LoadIndex(const LoadIndexInfo& info) = 0;
virtual void
LoadSegmentMeta(const milvus::proto::segcore::LoadSegmentMeta& meta) = 0;
virtual void
LoadFieldData(const LoadFieldDataInfo& info) = 0;
virtual void
DropIndex(const FieldId field_id) = 0;
......
......@@ -68,29 +68,51 @@ SegmentSealedImpl::LoadFieldData(const LoadFieldDataInfo& info) {
if (SystemProperty::Instance().IsSystem(field_id)) {
auto system_field_type = SystemProperty::Instance().GetSystemFieldType(field_id);
Assert(system_field_type == SystemFieldType::RowId);
auto src_ptr = reinterpret_cast<const idx_t*>(info.blob);
if (system_field_type == SystemFieldType::Timestamp) {
auto src_ptr = reinterpret_cast<const Timestamp*>(info.blob);
aligned_vector<Timestamp> vec_data(info.row_count);
std::copy_n(src_ptr, info.row_count, vec_data.data());
auto size = info.row_count;
// TODO: load from outside
TimestampIndex index;
auto min_slice_length = size < 4096 ? 1 : 4096;
auto meta = GenerateFakeSlices(src_ptr, size, min_slice_length);
index.set_length_meta(std::move(meta));
index.build_with(src_ptr, size);
// use special index
std::unique_lock lck(mutex_);
update_row_count(info.row_count);
AssertInfo(timestamps_.empty(), "already exists");
timestamps_ = std::move(vec_data);
timestamp_index_ = std::move(index);
// prepare data
aligned_vector<idx_t> vec_data(info.row_count);
std::copy_n(src_ptr, info.row_count, vec_data.data());
} else {
Assert(system_field_type == SystemFieldType::RowId);
auto src_ptr = reinterpret_cast<const idx_t*>(info.blob);
std::unique_ptr<ScalarIndexBase> pk_index_;
// fix unintentional index update
if (schema_->get_is_auto_id()) {
pk_index_ = create_index(vec_data.data(), vec_data.size());
}
// prepare data
aligned_vector<idx_t> vec_data(info.row_count);
std::copy_n(src_ptr, info.row_count, vec_data.data());
// write data under lock
std::unique_lock lck(mutex_);
update_row_count(info.row_count);
AssertInfo(row_ids_.empty(), "already exists");
row_ids_ = std::move(vec_data);
std::unique_ptr<ScalarIndexBase> pk_index_;
// fix unintentional index update
if (schema_->get_is_auto_id()) {
pk_index_ = create_index(vec_data.data(), vec_data.size());
}
if (schema_->get_is_auto_id()) {
primary_key_index_ = std::move(pk_index_);
}
// write data under lock
std::unique_lock lck(mutex_);
update_row_count(info.row_count);
AssertInfo(row_ids_.empty(), "already exists");
row_ids_ = std::move(vec_data);
if (schema_->get_is_auto_id()) {
primary_key_index_ = std::move(pk_index_);
}
}
++system_ready_count_;
} else {
// prepare data
......@@ -194,8 +216,10 @@ SegmentSealedImpl::vector_search(int64_t vec_count,
query::QueryInfo query_info,
const void* query_data,
int64_t query_count,
Timestamp timestamp,
const BitsetView& bitset,
QueryResult& output) const {
Assert(is_system_field_ready());
auto field_offset = query_info.field_offset_;
auto& field_meta = schema_->operator[](field_offset);
......@@ -203,52 +227,54 @@ SegmentSealedImpl::vector_search(int64_t vec_count,
if (get_bit(vecindex_ready_bitset_, field_offset)) {
Assert(vecindexs_.is_ready(field_offset));
query::SearchOnSealed(*schema_, vecindexs_, query_info, query_data, query_count, bitset, output);
} else if (get_bit(field_data_ready_bitset_, field_offset)) {
query::dataset::QueryDataset dataset;
dataset.query_data = query_data;
dataset.num_queries = query_count;
// if(field_meta.is)
dataset.metric_type = query_info.metric_type_;
dataset.topk = query_info.topK_;
dataset.dim = field_meta.get_dim();
Assert(get_bit(field_data_ready_bitset_, field_offset));
Assert(row_count_opt_.has_value());
auto row_count = row_count_opt_.value();
auto chunk_data = field_datas_[field_offset.get()].data();
auto sub_qr = [&] {
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {
return query::FloatSearchBruteForce(dataset, chunk_data, row_count, bitset);
} else {
return query::BinarySearchBruteForce(dataset, chunk_data, row_count, bitset);
}
}();
QueryResult results;
results.result_distances_ = std::move(sub_qr.mutable_values());
results.internal_seg_offsets_ = std::move(sub_qr.mutable_labels());
results.topK_ = dataset.topk;
results.num_queries_ = dataset.num_queries;
output = std::move(results);
} else {
return;
} else if (!get_bit(field_data_ready_bitset_, field_offset)) {
PanicInfo("Field Data is not loaded");
}
query::dataset::QueryDataset dataset;
dataset.query_data = query_data;
dataset.num_queries = query_count;
// if(field_meta.is)
dataset.metric_type = query_info.metric_type_;
dataset.topk = query_info.topK_;
dataset.dim = field_meta.get_dim();
Assert(get_bit(field_data_ready_bitset_, field_offset));
Assert(row_count_opt_.has_value());
auto row_count = row_count_opt_.value();
auto chunk_data = field_datas_[field_offset.get()].data();
auto sub_qr = [&] {
if (field_meta.get_data_type() == DataType::VECTOR_FLOAT) {
return query::FloatSearchBruteForce(dataset, chunk_data, row_count, bitset);
} else {
return query::BinarySearchBruteForce(dataset, chunk_data, row_count, bitset);
}
}();
QueryResult results;
results.result_distances_ = std::move(sub_qr.mutable_values());
results.internal_seg_offsets_ = std::move(sub_qr.mutable_labels());
results.topK_ = dataset.topk;
results.num_queries_ = dataset.num_queries;
output = std::move(results);
}
void
SegmentSealedImpl::DropFieldData(const FieldId field_id) {
if (SystemProperty::Instance().IsSystem(field_id)) {
auto system_field_type = SystemProperty::Instance().GetSystemFieldType(field_id);
Assert(system_field_type == SystemFieldType::RowId);
std::unique_lock lck(mutex_);
--system_ready_count_;
auto row_ids = std::move(row_ids_);
if (system_field_type == SystemFieldType::RowId) {
auto row_ids = std::move(row_ids_);
} else if (system_field_type == SystemFieldType::Timestamp) {
auto ts = std::move(timestamps_);
}
lck.unlock();
row_ids.clear();
} else {
auto field_offset = schema_->get_offset(field_id);
auto& field_meta = schema_->operator[](field_offset);
......@@ -280,7 +306,7 @@ SegmentSealedImpl::check_search(const query::Plan* plan) const {
Assert(plan->extra_info_opt_.has_value());
if (!is_system_field_ready()) {
PanicInfo("System Field RowID is not loaded");
PanicInfo("System Field RowID or Timestamp is not loaded");
}
auto& request_fields = plan->extra_info_opt_.value().involved_fields_;
......@@ -429,6 +455,35 @@ SegmentSealedImpl::debug() const {
return log_str;
}
void
SegmentSealedImpl::LoadSegmentMeta(const proto::segcore::LoadSegmentMeta& segment_meta) {
std::unique_lock lck(mutex_);
std::vector<int64_t> slice_lengths;
for (auto& info : segment_meta.metas()) {
slice_lengths.push_back(info.row_count());
}
timestamp_index_.set_length_meta(std::move(slice_lengths));
PanicInfo("unimplemented");
}
int64_t
SegmentSealedImpl::get_active_count(Timestamp ts) const {
// TODO optimize here to reduce expr search range
return this->get_row_count();
}
void
SegmentSealedImpl::mask_with_timestamps(std::deque<boost::dynamic_bitset<>>& bitset_chunks, Timestamp timestamp) const {
// TODO change the
Assert(this->timestamps_.size() == get_row_count());
Assert(bitset_chunks.size() == 1);
auto range = timestamp_index_.get_active_range(timestamp);
if (range.first == range.second && range.first == this->timestamps_.size()) {
// just skip
return;
}
auto mask = TimestampIndex::GenerateBitset(timestamp, range, this->timestamps_.data(), this->timestamps_.size());
bitset_chunks[0] &= mask;
}
SegmentSealedPtr
CreateSealedSegment(SchemaPtr schema) {
return std::make_unique<SegmentSealedImpl>(schema);
......
......@@ -10,9 +10,11 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <segcore/TimestampIndex.h>
#include "segcore/SegmentSealed.h"
#include "SealedIndexingRecord.h"
#include "ScalarIndex.h"
#include <deque>
#include <map>
#include <vector>
#include <memory>
......@@ -28,10 +30,11 @@ class SegmentSealedImpl : public SegmentSealed {
void
LoadFieldData(const LoadFieldDataInfo& info) override;
void
LoadSegmentMeta(const milvus::proto::segcore::LoadSegmentMeta& segment_meta) override;
void
DropIndex(const FieldId field_id) override;
void
DropFieldData(const FieldId field_id) override;
bool
HasIndex(FieldId field_id) const override;
bool
......@@ -82,6 +85,9 @@ class SegmentSealedImpl : public SegmentSealed {
void
check_search(const query::Plan* plan) const override;
int64_t
get_active_count(Timestamp ts) const override;
private:
template <typename T>
static void
......@@ -100,17 +106,21 @@ class SegmentSealedImpl : public SegmentSealed {
}
}
void
mask_with_timestamps(std::deque<boost::dynamic_bitset<>>& bitset_chunks, Timestamp timestamp) const override;
void
vector_search(int64_t vec_count,
query::QueryInfo query_info,
const void* query_data,
int64_t query_count,
Timestamp timestamp,
const BitsetView& bitset,
QueryResult& output) const override;
bool
is_system_field_ready() const {
return system_ready_count_ == 1;
return system_ready_count_ == 2;
}
std::pair<std::unique_ptr<IdArray>, std::vector<SegOffset>>
......@@ -138,6 +148,8 @@ class SegmentSealedImpl : public SegmentSealed {
SealedIndexingRecord vecindexs_;
aligned_vector<idx_t> row_ids_;
aligned_vector<Timestamp> timestamps_;
TimestampIndex timestamp_index_;
SchemaPtr schema_;
};
} // namespace milvus::segcore
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <segcore/TimestampIndex.h>
namespace milvus::segcore {
void
TimestampIndex::set_length_meta(std::vector<int64_t> lengths) {
lengths_ = std::move(lengths);
}
void
TimestampIndex::build_with(const Timestamp* timestamps, int64_t size) {
auto num_slice = lengths_.size();
Assert(num_slice > 0);
std::vector<int64_t> prefix_sums;
int offset = 0;
prefix_sums.push_back(offset);
std::vector<Timestamp> timestamp_barriers;
Timestamp last_max_v = 0;
for (int slice_id = 0; slice_id < num_slice; ++slice_id) {
auto length = lengths_[slice_id];
auto [min_v, max_v] = std::minmax_element(timestamps + offset, timestamps + offset + length);
Assert(last_max_v <= *min_v);
offset += length;
prefix_sums.push_back(offset);
timestamp_barriers.push_back(*min_v);
last_max_v = *max_v;
}
timestamp_barriers.push_back(last_max_v);
Assert(std::is_sorted(timestamp_barriers.begin(), timestamp_barriers.end()));
Assert(offset == size);
auto min_ts = timestamp_barriers[0];
this->size_ = size;
this->start_locs_ = std::move(prefix_sums);
this->min_timestamp_ = min_ts;
this->max_timestamp_ = last_max_v;
this->timestamp_barriers_ = std::move(timestamp_barriers);
}
std::pair<int64_t, int64_t>
TimestampIndex::get_active_range(Timestamp query_timestamp) const {
if (query_timestamp >= max_timestamp_) {
// most common case
return {size_, size_};
}
if (query_timestamp < min_timestamp_) {
return {0, 0};
}
auto iter = std::upper_bound(timestamp_barriers_.begin(), timestamp_barriers_.end(), query_timestamp);
int block_id = (iter - timestamp_barriers_.begin()) - 1;
Assert(0 <= block_id && block_id < lengths_.size());
return {start_locs_[block_id], start_locs_[block_id + 1]};
}
boost::dynamic_bitset<>
TimestampIndex::GenerateBitset(Timestamp query_timestamp,
std::pair<int64_t, int64_t> active_range,
const Timestamp* timestamps,
int64_t size) {
auto [beg, end] = active_range;
Assert(beg < end);
boost::dynamic_bitset<> bitset;
bitset.reserve(size);
bitset.resize(beg, true);
bitset.resize(size, false);
for (int64_t i = beg; i < end; ++i) {
bitset[i] = timestamps[i] <= query_timestamp;
}
return bitset;
}
std::vector<int64_t>
GenerateFakeSlices(const Timestamp* timestamps, int64_t size, int min_slice_length) {
assert(min_slice_length >= 1);
std::vector<int64_t> results;
std::vector<int64_t> min_values(size);
Timestamp value = std::numeric_limits<Timestamp>::max();
for (int64_t i = 0; i < size; ++i) {
auto offset = size - 1 - i;
value = std::min(value, timestamps[offset]);
min_values[offset] = value;
}
value = std::numeric_limits<Timestamp>::min();
auto slice_length = 0;
for (int64_t i = 0; i < size; ++i) {
if (value <= min_values[i] && slice_length >= min_slice_length) {
// emit new slice
results.push_back(slice_length);
slice_length = 0;
}
value = std::max(value, timestamps[i]);
slice_length += 1;
}
results.push_back(slice_length);
return results;
}
} // namespace milvus::segcore
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include <common/Schema.h>
#include <vector>
#include <boost/dynamic_bitset.hpp>
#include <utility>
namespace milvus::segcore {
class TimestampIndex {
public:
void
set_length_meta(std::vector<int64_t> lengths);
void
build_with(const Timestamp* timestamps, int64_t size);
// output bitset
// Return range [beg, end) that is undecided
// [0, beg) shall be all OK
// [end, size) shall be all not OK
std::pair<int64_t, int64_t>
get_active_range(Timestamp query_timestamp) const;
static boost::dynamic_bitset<>
GenerateBitset(Timestamp query_timestamp,
std::pair<int64_t, int64_t> active_range,
const Timestamp* timestamps,
int64_t size);
private:
// numSlice
std::vector<int64_t> lengths_;
int64_t size_;
// numSlice + 1
std::vector<int64_t> start_locs_;
Timestamp min_timestamp_;
Timestamp max_timestamp_;
// numSlice + 1
std::vector<Timestamp> timestamp_barriers_;
};
std::vector<int64_t>
GenerateFakeSlices(const Timestamp* timestamps, int64_t size, int min_slice_length = 1);
} // namespace milvus::segcore
......@@ -258,6 +258,19 @@ UpdateSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info
}
}
CStatus
LoadSealedSegmentMeta(CSegmentInterface c_segment, CProto LoadSegmentMetaProto) {
try {
auto segment_raw = (const milvus::segcore::SegmentGrowing*)c_segment;
auto segment = dynamic_cast<const milvus::segcore::SegmentSealed*>(segment_raw);
return milvus::SuccessCStatus();
} catch (std::exception& e) {
// TODO
return milvus::FailureCStatus(UnexpectedError, e.what());
}
}
int
Close(CSegmentInterface c_segment) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
......@@ -265,11 +278,6 @@ Close(CSegmentInterface c_segment) {
return status.code();
}
int
BuildIndex(CCollection c_collection, CSegmentInterface c_segment) {
PanicInfo("unimplemented");
}
bool
IsOpened(CSegmentInterface c_segment) {
auto segment = (milvus::segcore::SegmentGrowing*)c_segment;
......
......@@ -103,9 +103,6 @@ UpdateSegmentIndex(CSegmentInterface c_segment, CLoadIndexInfo c_load_index_info
int
Close(CSegmentInterface c_segment);
int
BuildIndex(CCollection c_collection, CSegmentInterface c_segment);
bool
IsOpened(CSegmentInterface c_segment);
......
......@@ -36,6 +36,7 @@ set(MILVUS_TEST_FILES
test_init.cpp
test_plan_proto.cpp
test_get_entity_by_ids.cpp
test_timestamp_index.cpp
)
add_executable(all_tests
......
......@@ -823,7 +823,7 @@ TEST(CApiTest, LoadIndex_Search) {
constexpr auto DIM = 16;
constexpr auto K = 10;
auto N = 1024 * 1024 * 10;
auto N = 1024 * 1024;
auto num_query = 100;
auto [raw_data, timestamps, uids] = generate_data(N);
auto indexing = std::make_shared<milvus::knowhere::IVFPQ>();
......@@ -2569,6 +2569,14 @@ TEST(CApiTest, SealedSegment_search_float_Predicate_Range) {
status = LoadFieldData(segment, c_id_field_data);
assert(status.error_code == Success);
auto c_ts_field_data = CLoadFieldDataInfo{
1,
counter_col.data(),
N,
};
status = LoadFieldData(segment, c_ts_field_data);
assert(status.error_code == Success);
status = UpdateSealedSegmentIndex(segment, c_load_index_info);
assert(status.error_code == Success);
......@@ -2728,6 +2736,14 @@ TEST(CApiTest, SealedSegment_search_float_With_Expr_Predicate_Range) {
status = LoadFieldData(segment, c_id_field_data);
assert(status.error_code == Success);
auto c_ts_field_data = CLoadFieldDataInfo{
1,
counter_col.data(),
N,
};
status = LoadFieldData(segment, c_ts_field_data);
assert(status.error_code == Success);
status = UpdateSealedSegmentIndex(segment, c_load_index_info);
assert(status.error_code == Success);
......
......@@ -307,7 +307,7 @@ TEST(Expr, TestRange) {
}
auto seg_promote = dynamic_cast<SegmentGrowingImpl*>(seg.get());
ExecExprVisitor visitor(*seg_promote, seg_promote->get_row_count());
ExecExprVisitor visitor(*seg_promote, seg_promote->get_row_count(), MAX_TIMESTAMP);
for (auto [clause, ref_func] : testcases) {
auto loc = dsl_string_tmp.find("@@@@");
auto dsl_string = dsl_string_tmp;
......@@ -391,7 +391,7 @@ TEST(Expr, TestTerm) {
}
auto seg_promote = dynamic_cast<SegmentGrowingImpl*>(seg.get());
ExecExprVisitor visitor(*seg_promote, seg_promote->get_row_count());
ExecExprVisitor visitor(*seg_promote, seg_promote->get_row_count(), MAX_TIMESTAMP);
for (auto [clause, ref_func] : testcases) {
auto loc = dsl_string_tmp.find("@@@@");
auto dsl_string = dsl_string_tmp;
......@@ -493,7 +493,7 @@ TEST(Expr, TestSimpleDsl) {
}
auto seg_promote = dynamic_cast<SegmentGrowingImpl*>(seg.get());
ExecExprVisitor visitor(*seg_promote, seg_promote->get_row_count());
ExecExprVisitor visitor(*seg_promote, seg_promote->get_row_count(), MAX_TIMESTAMP);
for (auto [clause, ref_func] : testcases) {
Json dsl;
dsl["bool"] = clause;
......
......@@ -44,15 +44,15 @@ TEST(Span, Naive) {
auto float_span = interface.chunk_data<FloatVector>(FieldOffset(2), chunk_id);
auto begin = chunk_id * size_per_chunk;
auto end = std::min((chunk_id + 1) * size_per_chunk, N);
auto size_per_chunk = end - begin;
for (int i = 0; i < size_per_chunk * 512 / 8; ++i) {
auto size_of_chunk = end - begin;
for (int i = 0; i < size_of_chunk * 512 / 8; ++i) {
ASSERT_EQ(vec_span.data()[i], vec_ptr[i + begin * 512 / 8]);
}
for (int i = 0; i < size_per_chunk; ++i) {
for (int i = 0; i < size_of_chunk; ++i) {
ASSERT_EQ(age_span.data()[i], age_ptr[i + begin]);
}
for (int i = 0; i < size_per_chunk; ++i) {
for (int i = 0; i < size_of_chunk; ++i) {
ASSERT_EQ(float_span.data()[i], float_ptr[i + begin * 32]);
}
}
}
\ No newline at end of file
}
......@@ -7,41 +7,30 @@
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package querynode
/*
#cgo CFLAGS: -I../core/output/include
#cgo LDFLAGS: -L../core/output/lib -lmilvus_segcore -Wl,-rpath=../core/output/lib
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
*/
import "C"
import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
)
type IndexConfig struct{}
func (s *Segment) buildIndex(collection *Collection) commonpb.Status {
/*
int
BuildIndex(CCollection c_collection, CSegmentBase c_segment);
*/
var status = C.BuildIndex(collection.collectionPtr, s.segmentPtr)
if status != 0 {
return commonpb.Status{ErrorCode: commonpb.ErrorCode_BuildIndexError}
}
return commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}
}
func (s *Segment) dropIndex(fieldID int64) commonpb.Status {
// WARN: Not support yet
return commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <gtest/gtest.h>
#include "utils/tools.h"
#include "test_utils/DataGen.h"
#include "segcore/TimestampIndex.h"
#include <vector>
using namespace milvus;
using namespace milvus::segcore;
TEST(TimestampIndex, Naive) {
SUCCEED();
std::vector<Timestamp> timestamps{
1, 2, 14, 11, 13, 22, 21, 20,
};
std::vector<int64_t> lengths = {2, 3, 3};
TimestampIndex index;
index.set_length_meta(lengths);
index.build_with(timestamps.data(), timestamps.size());
int x = 1 + 1;
auto guessed_slice = GenerateFakeSlices(timestamps.data(), timestamps.size(), 2);
ASSERT_EQ(guessed_slice.size(), lengths.size());
for (auto i = 0; i < lengths.size(); ++i) {
ASSERT_EQ(guessed_slice[i], lengths[i]);
}
}
......@@ -299,6 +299,13 @@ SealedLoader(const GeneratedData& dataset, SegmentSealed& seg) {
info.field_id = 0; // field id for RowId
seg.LoadFieldData(info);
}
{
LoadFieldDataInfo info;
info.blob = dataset.timestamps_.data();
info.row_count = dataset.timestamps_.size();
info.field_id = 1;
seg.LoadFieldData(info);
}
int field_offset = 0;
for (auto& meta : seg.get_schema().get_fields()) {
LoadFieldDataInfo info;
......
......@@ -14,3 +14,14 @@ message RetrieveResults {
repeated schema.FieldData fields_data = 2;
}
message LoadFieldMeta {
int64 min_timestamp = 1;
int64 max_timestamp = 2;
int64 row_count = 3;
}
message LoadSegmentMeta {
// TODOs
repeated LoadFieldMeta metas = 1;
int64 total_size = 2;
}
......@@ -115,27 +115,139 @@ func (m *RetrieveResults) GetFieldsData() []*schemapb.FieldData {
return nil
}
type LoadFieldMeta struct {
MinTimestamp int64 `protobuf:"varint,1,opt,name=min_timestamp,json=minTimestamp,proto3" json:"min_timestamp,omitempty"`
MaxTimestamp int64 `protobuf:"varint,2,opt,name=max_timestamp,json=maxTimestamp,proto3" json:"max_timestamp,omitempty"`
RowCount int64 `protobuf:"varint,3,opt,name=row_count,json=rowCount,proto3" json:"row_count,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *LoadFieldMeta) Reset() { *m = LoadFieldMeta{} }
func (m *LoadFieldMeta) String() string { return proto.CompactTextString(m) }
func (*LoadFieldMeta) ProtoMessage() {}
func (*LoadFieldMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_1d79fce784797357, []int{2}
}
func (m *LoadFieldMeta) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LoadFieldMeta.Unmarshal(m, b)
}
func (m *LoadFieldMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_LoadFieldMeta.Marshal(b, m, deterministic)
}
func (m *LoadFieldMeta) XXX_Merge(src proto.Message) {
xxx_messageInfo_LoadFieldMeta.Merge(m, src)
}
func (m *LoadFieldMeta) XXX_Size() int {
return xxx_messageInfo_LoadFieldMeta.Size(m)
}
func (m *LoadFieldMeta) XXX_DiscardUnknown() {
xxx_messageInfo_LoadFieldMeta.DiscardUnknown(m)
}
var xxx_messageInfo_LoadFieldMeta proto.InternalMessageInfo
func (m *LoadFieldMeta) GetMinTimestamp() int64 {
if m != nil {
return m.MinTimestamp
}
return 0
}
func (m *LoadFieldMeta) GetMaxTimestamp() int64 {
if m != nil {
return m.MaxTimestamp
}
return 0
}
func (m *LoadFieldMeta) GetRowCount() int64 {
if m != nil {
return m.RowCount
}
return 0
}
type LoadSegmentMeta struct {
// TODOs
Metas []*LoadFieldMeta `protobuf:"bytes,1,rep,name=metas,proto3" json:"metas,omitempty"`
TotalSize int64 `protobuf:"varint,2,opt,name=total_size,json=totalSize,proto3" json:"total_size,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *LoadSegmentMeta) Reset() { *m = LoadSegmentMeta{} }
func (m *LoadSegmentMeta) String() string { return proto.CompactTextString(m) }
func (*LoadSegmentMeta) ProtoMessage() {}
func (*LoadSegmentMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_1d79fce784797357, []int{3}
}
func (m *LoadSegmentMeta) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_LoadSegmentMeta.Unmarshal(m, b)
}
func (m *LoadSegmentMeta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_LoadSegmentMeta.Marshal(b, m, deterministic)
}
func (m *LoadSegmentMeta) XXX_Merge(src proto.Message) {
xxx_messageInfo_LoadSegmentMeta.Merge(m, src)
}
func (m *LoadSegmentMeta) XXX_Size() int {
return xxx_messageInfo_LoadSegmentMeta.Size(m)
}
func (m *LoadSegmentMeta) XXX_DiscardUnknown() {
xxx_messageInfo_LoadSegmentMeta.DiscardUnknown(m)
}
var xxx_messageInfo_LoadSegmentMeta proto.InternalMessageInfo
func (m *LoadSegmentMeta) GetMetas() []*LoadFieldMeta {
if m != nil {
return m.Metas
}
return nil
}
func (m *LoadSegmentMeta) GetTotalSize() int64 {
if m != nil {
return m.TotalSize
}
return 0
}
func init() {
proto.RegisterType((*RetrieveRequest)(nil), "milvus.proto.segcore.RetrieveRequest")
proto.RegisterType((*RetrieveResults)(nil), "milvus.proto.segcore.RetrieveResults")
proto.RegisterType((*LoadFieldMeta)(nil), "milvus.proto.segcore.LoadFieldMeta")
proto.RegisterType((*LoadSegmentMeta)(nil), "milvus.proto.segcore.LoadSegmentMeta")
}
func init() { proto.RegisterFile("segcore.proto", fileDescriptor_1d79fce784797357) }
var fileDescriptor_1d79fce784797357 = []byte{
// 222 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x90, 0xb1, 0x4b, 0xc4, 0x30,
0x14, 0xc6, 0x39, 0x0f, 0x04, 0xd3, 0x3b, 0x84, 0xe2, 0x50, 0x1c, 0xe4, 0x38, 0x97, 0x43, 0x30,
0x81, 0x2a, 0xae, 0x82, 0x14, 0xc1, 0x35, 0xa3, 0x4b, 0x49, 0xda, 0x67, 0x1b, 0x48, 0x9b, 0x9a,
0xf7, 0xd2, 0xd1, 0xbf, 0x5d, 0x9a, 0x74, 0x50, 0x70, 0x71, 0xfb, 0xf2, 0x25, 0xbf, 0xef, 0x07,
0x61, 0x7b, 0x84, 0xae, 0x71, 0x1e, 0xf8, 0xe4, 0x1d, 0xb9, 0xfc, 0x6a, 0x30, 0x76, 0x0e, 0x98,
0x4e, 0x7c, 0xbd, 0xbb, 0xde, 0x61, 0xd3, 0xc3, 0xa0, 0x52, 0x7b, 0xd4, 0xec, 0x52, 0x02, 0x79,
0x03, 0x33, 0x48, 0xf8, 0x0c, 0x80, 0x94, 0xdf, 0xb1, 0xad, 0x69, 0xb1, 0xd8, 0x1c, 0x36, 0xa7,
0xac, 0x2c, 0xf8, 0xef, 0x91, 0xc4, 0xbe, 0x55, 0x28, 0x97, 0x47, 0xf9, 0x2d, 0xdb, 0xbb, 0x40,
0x53, 0xa0, 0xfa, 0xc3, 0x80, 0x6d, 0xb1, 0x38, 0x3b, 0x6c, 0x4f, 0x17, 0x72, 0x97, 0xca, 0xd7,
0xd8, 0x1d, 0xbf, 0x7e, 0x3a, 0x30, 0x58, 0xc2, 0x7f, 0x39, 0x9e, 0x59, 0x96, 0xc6, 0xeb, 0x56,
0x91, 0x8a, 0x86, 0xac, 0xbc, 0xf9, 0x93, 0x89, 0xc2, 0x4a, 0x91, 0x92, 0x2c, 0x21, 0x4b, 0x7e,
0x79, 0x7a, 0x7f, 0xec, 0x0c, 0xf5, 0x41, 0xf3, 0xc6, 0x0d, 0x22, 0x71, 0xf7, 0xc6, 0xad, 0x49,
0x98, 0x91, 0xc0, 0x8f, 0xca, 0x8a, 0x38, 0x25, 0xd6, 0x7f, 0x9a, 0xb4, 0x3e, 0x8f, 0xc5, 0xc3,
0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0xed, 0xbc, 0x2b, 0xca, 0x57, 0x01, 0x00, 0x00,
// 335 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x91, 0xc1, 0x4b, 0xf3, 0x30,
0x18, 0xc6, 0xd9, 0xca, 0xf7, 0xe1, 0xb2, 0x8d, 0x41, 0xf1, 0x50, 0x14, 0x65, 0x74, 0x97, 0x21,
0xd8, 0xc2, 0x14, 0xc1, 0x93, 0xa0, 0x43, 0x10, 0xf4, 0x92, 0x79, 0xf2, 0x52, 0xd2, 0xf6, 0x75,
0x0b, 0x36, 0x4d, 0x6d, 0xde, 0x74, 0x63, 0x07, 0xff, 0x76, 0x49, 0x52, 0x71, 0x83, 0x5d, 0xbc,
0x25, 0x4f, 0x9e, 0xe7, 0xfd, 0x3d, 0x2f, 0x21, 0x43, 0x05, 0xcb, 0x4c, 0xd6, 0x10, 0x55, 0xb5,
0x44, 0xe9, 0x1f, 0x0b, 0x5e, 0x34, 0x5a, 0xb9, 0x5b, 0xd4, 0xbe, 0x9d, 0x0c, 0x54, 0xb6, 0x02,
0xc1, 0x9c, 0x1a, 0xa6, 0x64, 0x44, 0x01, 0x6b, 0x0e, 0x0d, 0x50, 0xf8, 0xd4, 0xa0, 0xd0, 0xbf,
0x20, 0x1e, 0xcf, 0x55, 0xd0, 0x19, 0x77, 0xa6, 0xfd, 0x59, 0x10, 0xed, 0x0f, 0x71, 0xd9, 0xa7,
0xb9, 0xa2, 0xc6, 0xe4, 0x4f, 0xc8, 0x50, 0x6a, 0xac, 0x34, 0x26, 0xef, 0x1c, 0x8a, 0x5c, 0x05,
0xdd, 0xb1, 0x37, 0xed, 0xd1, 0x81, 0x13, 0x1f, 0xad, 0x16, 0x7e, 0xed, 0x32, 0x94, 0x2e, 0x50,
0xfd, 0x89, 0x71, 0x47, 0xfa, 0x6e, 0x78, 0x92, 0x33, 0x64, 0x96, 0xd0, 0x9f, 0x9d, 0x1f, 0xcc,
0x58, 0xe0, 0x9c, 0x21, 0xa3, 0xc4, 0x45, 0xcc, 0x39, 0x6c, 0xc8, 0xf0, 0x59, 0xb2, 0xdc, 0x3e,
0xbe, 0x00, 0x32, 0xd3, 0x5a, 0xf0, 0x32, 0x41, 0x2e, 0x40, 0x21, 0x13, 0x95, 0xed, 0xe1, 0xd1,
0x81, 0xe0, 0xe5, 0xeb, 0x8f, 0x66, 0x4d, 0x6c, 0xb3, 0x63, 0xea, 0xb6, 0x26, 0xb6, 0xf9, 0x35,
0x9d, 0x92, 0x5e, 0x2d, 0xd7, 0x49, 0x26, 0x75, 0x89, 0x81, 0x67, 0x0d, 0x47, 0xb5, 0x5c, 0x3f,
0x98, 0x7b, 0xf8, 0x41, 0x46, 0x86, 0xbb, 0x80, 0xa5, 0x80, 0x12, 0x2d, 0xf9, 0x96, 0xfc, 0x13,
0x80, 0xcc, 0x6c, 0x6e, 0xb6, 0x98, 0x44, 0x87, 0xbe, 0x28, 0xda, 0x6b, 0x4b, 0x5d, 0xc2, 0x3f,
0x23, 0x04, 0x25, 0xb2, 0x22, 0x51, 0x7c, 0x0b, 0x6d, 0x99, 0x9e, 0x55, 0x16, 0x7c, 0x0b, 0xf7,
0x37, 0x6f, 0xd7, 0x4b, 0x8e, 0x2b, 0x9d, 0x46, 0x99, 0x14, 0xb1, 0x1b, 0x7b, 0xc9, 0x65, 0x7b,
0x8a, 0x79, 0x89, 0x50, 0x97, 0xac, 0x88, 0x2d, 0x29, 0x6e, 0x49, 0x55, 0x9a, 0xfe, 0xb7, 0xc2,
0xd5, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0xac, 0x78, 0x16, 0xc2, 0x3c, 0x02, 0x00, 0x00,
}
......@@ -227,10 +227,6 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, binlogPaths
blobs := make([]*storage.Blob, 0)
for _, binlogPath := range binlogPaths {
fieldID := binlogPath.FieldID
if fieldID == timestampFieldID {
// seg core doesn't need timestamp field
continue
}
paths := binlogPath.Binlogs
log.Debug("load segment fields data",
......@@ -254,6 +250,7 @@ func (loader *segmentLoader) loadSegmentFieldsData(segment *Segment, binlogPaths
}
_, _, insertData, err := iCodec.Deserialize(blobs)
if err != nil {
log.Error(err.Error())
return err
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册