提交 c3de667b 编写于 作者: N neza2017 提交者: yefu.chen

Implement segment management in master

Signed-off-by: Nneza2017 <yefu.chen@zilliz.com>
上级 99ea7e9f
......@@ -21,21 +21,35 @@ func main() {
etcdAddress, _ := masterParams.Params.EtcdAddress()
etcdRootPath, _ := masterParams.Params.EtcdRootPath()
pulsarAddr, _ := masterParams.Params.PulsarAddress()
defaultRecordSize := masterParams.Params.DefaultRecordSize()
minimumAssignSize := masterParams.Params.MinimumAssignSize()
segmentThreshold := masterParams.Params.SegmentThreshold()
segmentExpireDuration := masterParams.Params.SegmentExpireDuration()
numOfChannel, _ := masterParams.Params.TopicNum()
nodeNum, _ := masterParams.Params.QueryNodeNum()
statsChannel := masterParams.Params.StatsChannels()
opt := master.Option{
KVRootPath: etcdRootPath,
MetaRootPath: etcdRootPath,
EtcdAddr: []string{etcdAddress},
PulsarAddr: pulsarAddr,
ProxyIDs: nil,
PulsarProxyChannels: nil,
PulsarProxySubName: "",
SoftTTBInterval: 0,
WriteIDs: nil,
PulsarWriteChannels: nil,
PulsarWriteSubName: "",
PulsarDMChannels: nil,
PulsarK2SChannels: nil,
KVRootPath: etcdRootPath,
MetaRootPath: etcdRootPath,
EtcdAddr: []string{etcdAddress},
PulsarAddr: pulsarAddr,
ProxyIDs: nil,
PulsarProxyChannels: nil,
PulsarProxySubName: "",
SoftTTBInterval: 0,
WriteIDs: nil,
PulsarWriteChannels: nil,
PulsarWriteSubName: "",
PulsarDMChannels: nil,
PulsarK2SChannels: nil,
DefaultRecordSize: defaultRecordSize,
MinimumAssignSize: minimumAssignSize,
SegmentThreshold: segmentThreshold,
SegmentExpireDuration: segmentExpireDuration,
NumOfChannel: numOfChannel,
NumOfQueryNode: nodeNum,
StatsChannels: statsChannel,
}
svr, err := master.CreateServer(ctx, &opt)
......
# Copyright (C) 2019-2020 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
flowGraph:
maxQueueLength: 1024
maxParallelism: 1024
......@@ -9,26 +9,15 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under the License.
reader:
stats:
publishInterval: 1000 # milliseconds
service:
statsServiceTimeInterval: 1000 # milliseconds
dataSync:
flowGraph:
maxQueueLength: 1024
maxParallelism: 1024
msgStream:
dm:
recvBufSize: 1024 # msgPack chan buffer size
pulsarBufSize: 1024 # pulsar chan buffer size
search:
recvBufSize: 512
pulsarBufSize: 512
searchResult:
recvBufSize: 64
stats:
recvBufSize: 64
msgStream:
receiveBufSize: # msgPack chan buffer size
statsMsgStream: 64
dmMsgStream: 1024
searchMsgStream: 512
searchResultMsgStream: 64
pulsarBufSize: # pulsar chan buffer size
search: 512
dm: 1024
......@@ -14,10 +14,14 @@ master:
port: 53100
pulsarmoniterinterval: 1
pulsartopic: "monitor-topic"
segmentthreshold: 1073741824
defaultSizePerRecord: 1024
minimumAssignSize: 1048576
segmentThreshold: 536870912
segmentExpireDuration: 2000
proxyidlist: [0]
querynodenum: 1
writenodenum: 1
statsChannels: "statistic"
etcd:
address: localhost
......
......@@ -956,8 +956,8 @@ func (unmarshaler *QueryReqUnmarshaler) Unmarshal(input *pulsar.Message) (*TsMsg
| DescribePartition | show a partition's name and its descriptive statistics |
| ShowPartitions | list a collection's all partitions |
| AllocTimestamp | allocate a batch of consecutive timestamps |
| AllocId | allocate a batch of consecutive IDs |
| AssignSegmentId | assign segment id to insert rows (master determines which segment these rows belong to) |
| AllocID | allocate a batch of consecutive IDs |
| AssignSegmentID | assign segment id to insert rows (master determines which segment these rows belong to) |
| | |
| | |
......@@ -1244,5 +1244,63 @@ func (syncMsgProducer *timeSyncMsgProducer) Close()
#### 10.6 System Statistics
###### 10.6.1 Query Node Statistics
Query Node sends *QueryNodeSegStats* to a message stream. Master will consume it and update the segment meta. If the *MemSize* in *QueryNodeSegStats* is larger than a *SegmentThreshold*, Master will close the segment and the segment can not be allocated anymore.
```protobuf
message SegmentStats {
int64 segmentID = 1;
int64 memory_size = 2;
int64 num_rows = 3;
bool recently_modified = 4;
}
message QueryNodeSegStats {
MsgType msg_type = 1;
int64 peerID = 2;
repeated SegmentStats seg_stats = 3;
}
```
#### 10.7 Segment Management
```go
type assignment struct {
MemSize int64
AssignTime time.Time
}
type segmentStatus struct {
assignments []*assignment
}
type collectionStatus struct {
openedSegment []UniqueID
}
type SegmentManagement struct {
segStatus map[UniqueID]*SegmentStatus
collStatus map[UniqueID]*collectionStatus
}
func (segMgr *SegmentManagement) Start() error
func (segMgr *SegmentManagement) Close()
func NewSegmentManagement(ctx context.Context) *SegmentManagement
```
###### 10.7.1 Assign Segment ID to Inserted Rows
Master receives *AssignSegIDRequest* which contains a list of *SegIDRequest(count, channelID, collectionName, partitionTag)* from Proxy. Segment Manager will assign the opened segments or open a new segment if there is no enough space, and Segment Manager will record the allocated space which can be reallocated after a expire duration.
```go
func (segMgr *SegmentManager) AssignSegmentID(segIDReq []*internalpb.SegIDRequest) ([]*internalpb.SegIDAssignment, error)
```
......@@ -18,9 +18,10 @@ type UniqueID = typeutil.UniqueID
type MasterConfig struct {
Address string
Port int32
PulsarMoniterInterval int32
PulsarMonitorInterval int32
PulsarTopic string
SegmentThreshole float32
SegmentThreshold float32
SegmentExpireDuration int64
ProxyIDList []UniqueID
QueryNodeNum int
WriteNodeNum int
......
......@@ -142,6 +142,7 @@ const ::PROTOBUF_NAMESPACE_ID::uint32 TableStruct_etcd_5fmeta_2eproto::offsets[]
PROTOBUF_FIELD_OFFSET(::milvus::proto::etcd::SegmentMeta, open_time_),
PROTOBUF_FIELD_OFFSET(::milvus::proto::etcd::SegmentMeta, close_time_),
PROTOBUF_FIELD_OFFSET(::milvus::proto::etcd::SegmentMeta, num_rows_),
PROTOBUF_FIELD_OFFSET(::milvus::proto::etcd::SegmentMeta, mem_size_),
};
static const ::PROTOBUF_NAMESPACE_ID::internal::MigrationSchema schemas[] PROTOBUF_SECTION_VARIABLE(protodesc_cold) = {
{ 0, -1, sizeof(::milvus::proto::etcd::TenantMeta)},
......@@ -168,13 +169,14 @@ const char descriptor_table_protodef_etcd_5fmeta_2eproto[] PROTOBUF_SECTION_VARI
"nMeta\022\n\n\002ID\030\001 \001(\003\0225\n\006schema\030\002 \001(\0132%.milv"
"us.proto.schema.CollectionSchema\022\023\n\013crea"
"te_time\030\003 \001(\004\022\022\n\nsegmentIDs\030\004 \003(\003\022\026\n\016par"
"tition_tags\030\005 \003(\t\"\262\001\n\013SegmentMeta\022\021\n\tseg"
"tition_tags\030\005 \003(\t\"\304\001\n\013SegmentMeta\022\021\n\tseg"
"mentID\030\001 \001(\003\022\024\n\014collectionID\030\002 \001(\003\022\025\n\rpa"
"rtition_tag\030\003 \001(\t\022\025\n\rchannel_start\030\004 \001(\005"
"\022\023\n\013channel_end\030\005 \001(\005\022\021\n\topen_time\030\006 \001(\004"
"\022\022\n\nclose_time\030\007 \001(\004\022\020\n\010num_rows\030\010 \001(\003B@"
"Z>github.com/zilliztech/milvus-distribut"
"ed/internal/proto/etcdpbb\006proto3"
"\022\022\n\nclose_time\030\007 \001(\004\022\020\n\010num_rows\030\010 \001(\003\022\020"
"\n\010mem_size\030\t \001(\003B@Z>github.com/zilliztec"
"h/milvus-distributed/internal/proto/etcd"
"pbb\006proto3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_etcd_5fmeta_2eproto_deps[2] = {
&::descriptor_table_common_2eproto,
......@@ -189,7 +191,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_etc
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_etcd_5fmeta_2eproto_once;
static bool descriptor_table_etcd_5fmeta_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_etcd_5fmeta_2eproto = {
&descriptor_table_etcd_5fmeta_2eproto_initialized, descriptor_table_protodef_etcd_5fmeta_2eproto, "etcd_meta.proto", 672,
&descriptor_table_etcd_5fmeta_2eproto_initialized, descriptor_table_protodef_etcd_5fmeta_2eproto, "etcd_meta.proto", 690,
&descriptor_table_etcd_5fmeta_2eproto_once, descriptor_table_etcd_5fmeta_2eproto_sccs, descriptor_table_etcd_5fmeta_2eproto_deps, 4, 2,
schemas, file_default_instances, TableStruct_etcd_5fmeta_2eproto::offsets,
file_level_metadata_etcd_5fmeta_2eproto, 4, file_level_enum_descriptors_etcd_5fmeta_2eproto, file_level_service_descriptors_etcd_5fmeta_2eproto,
......@@ -1511,8 +1513,8 @@ SegmentMeta::SegmentMeta(const SegmentMeta& from)
partition_tag_.AssignWithDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited(), from.partition_tag_);
}
::memcpy(&segmentid_, &from.segmentid_,
static_cast<size_t>(reinterpret_cast<char*>(&num_rows_) -
reinterpret_cast<char*>(&segmentid_)) + sizeof(num_rows_));
static_cast<size_t>(reinterpret_cast<char*>(&mem_size_) -
reinterpret_cast<char*>(&segmentid_)) + sizeof(mem_size_));
// @@protoc_insertion_point(copy_constructor:milvus.proto.etcd.SegmentMeta)
}
......@@ -1520,8 +1522,8 @@ void SegmentMeta::SharedCtor() {
::PROTOBUF_NAMESPACE_ID::internal::InitSCC(&scc_info_SegmentMeta_etcd_5fmeta_2eproto.base);
partition_tag_.UnsafeSetDefault(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
::memset(&segmentid_, 0, static_cast<size_t>(
reinterpret_cast<char*>(&num_rows_) -
reinterpret_cast<char*>(&segmentid_)) + sizeof(num_rows_));
reinterpret_cast<char*>(&mem_size_) -
reinterpret_cast<char*>(&segmentid_)) + sizeof(mem_size_));
}
SegmentMeta::~SegmentMeta() {
......@@ -1550,8 +1552,8 @@ void SegmentMeta::Clear() {
partition_tag_.ClearToEmptyNoArena(&::PROTOBUF_NAMESPACE_ID::internal::GetEmptyStringAlreadyInited());
::memset(&segmentid_, 0, static_cast<size_t>(
reinterpret_cast<char*>(&num_rows_) -
reinterpret_cast<char*>(&segmentid_)) + sizeof(num_rows_));
reinterpret_cast<char*>(&mem_size_) -
reinterpret_cast<char*>(&segmentid_)) + sizeof(mem_size_));
_internal_metadata_.Clear();
}
......@@ -1619,6 +1621,13 @@ const char* SegmentMeta::_InternalParse(const char* ptr, ::PROTOBUF_NAMESPACE_ID
CHK_(ptr);
} else goto handle_unusual;
continue;
// int64 mem_size = 9;
case 9:
if (PROTOBUF_PREDICT_TRUE(static_cast<::PROTOBUF_NAMESPACE_ID::uint8>(tag) == 72)) {
mem_size_ = ::PROTOBUF_NAMESPACE_ID::internal::ReadVarint(&ptr);
CHK_(ptr);
} else goto handle_unusual;
continue;
default: {
handle_unusual:
if ((tag & 7) == 4 || tag == 0) {
......@@ -1755,6 +1764,19 @@ bool SegmentMeta::MergePartialFromCodedStream(
break;
}
// int64 mem_size = 9;
case 9: {
if (static_cast< ::PROTOBUF_NAMESPACE_ID::uint8>(tag) == (72 & 0xFF)) {
DO_((::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::ReadPrimitive<
::PROTOBUF_NAMESPACE_ID::int64, ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::TYPE_INT64>(
input, &mem_size_)));
} else {
goto handle_unusual;
}
break;
}
default: {
handle_unusual:
if (tag == 0) {
......@@ -1827,6 +1849,11 @@ void SegmentMeta::SerializeWithCachedSizes(
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64(8, this->num_rows(), output);
}
// int64 mem_size = 9;
if (this->mem_size() != 0) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64(9, this->mem_size(), output);
}
if (_internal_metadata_.have_unknown_fields()) {
::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFields(
_internal_metadata_.unknown_fields(), output);
......@@ -1886,6 +1913,11 @@ void SegmentMeta::SerializeWithCachedSizes(
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64ToArray(8, this->num_rows(), target);
}
// int64 mem_size = 9;
if (this->mem_size() != 0) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::WriteInt64ToArray(9, this->mem_size(), target);
}
if (_internal_metadata_.have_unknown_fields()) {
target = ::PROTOBUF_NAMESPACE_ID::internal::WireFormat::SerializeUnknownFieldsToArray(
_internal_metadata_.unknown_fields(), target);
......@@ -1963,6 +1995,13 @@ size_t SegmentMeta::ByteSizeLong() const {
this->num_rows());
}
// int64 mem_size = 9;
if (this->mem_size() != 0) {
total_size += 1 +
::PROTOBUF_NAMESPACE_ID::internal::WireFormatLite::Int64Size(
this->mem_size());
}
int cached_size = ::PROTOBUF_NAMESPACE_ID::internal::ToCachedSize(total_size);
SetCachedSize(cached_size);
return total_size;
......@@ -2015,6 +2054,9 @@ void SegmentMeta::MergeFrom(const SegmentMeta& from) {
if (from.num_rows() != 0) {
set_num_rows(from.num_rows());
}
if (from.mem_size() != 0) {
set_mem_size(from.mem_size());
}
}
void SegmentMeta::CopyFrom(const ::PROTOBUF_NAMESPACE_ID::Message& from) {
......@@ -2047,6 +2089,7 @@ void SegmentMeta::InternalSwap(SegmentMeta* other) {
swap(open_time_, other->open_time_);
swap(close_time_, other->close_time_);
swap(num_rows_, other->num_rows_);
swap(mem_size_, other->mem_size_);
}
::PROTOBUF_NAMESPACE_ID::Metadata SegmentMeta::GetMetadata() const {
......
......@@ -718,6 +718,7 @@ class SegmentMeta :
kOpenTimeFieldNumber = 6,
kCloseTimeFieldNumber = 7,
kNumRowsFieldNumber = 8,
kMemSizeFieldNumber = 9,
};
// string partition_tag = 3;
void clear_partition_tag();
......@@ -765,6 +766,11 @@ class SegmentMeta :
::PROTOBUF_NAMESPACE_ID::int64 num_rows() const;
void set_num_rows(::PROTOBUF_NAMESPACE_ID::int64 value);
// int64 mem_size = 9;
void clear_mem_size();
::PROTOBUF_NAMESPACE_ID::int64 mem_size() const;
void set_mem_size(::PROTOBUF_NAMESPACE_ID::int64 value);
// @@protoc_insertion_point(class_scope:milvus.proto.etcd.SegmentMeta)
private:
class _Internal;
......@@ -778,6 +784,7 @@ class SegmentMeta :
::PROTOBUF_NAMESPACE_ID::uint64 open_time_;
::PROTOBUF_NAMESPACE_ID::uint64 close_time_;
::PROTOBUF_NAMESPACE_ID::int64 num_rows_;
::PROTOBUF_NAMESPACE_ID::int64 mem_size_;
mutable ::PROTOBUF_NAMESPACE_ID::internal::CachedSize _cached_size_;
friend struct ::TableStruct_etcd_5fmeta_2eproto;
};
......@@ -1389,6 +1396,20 @@ inline void SegmentMeta::set_num_rows(::PROTOBUF_NAMESPACE_ID::int64 value) {
// @@protoc_insertion_point(field_set:milvus.proto.etcd.SegmentMeta.num_rows)
}
// int64 mem_size = 9;
inline void SegmentMeta::clear_mem_size() {
mem_size_ = PROTOBUF_LONGLONG(0);
}
inline ::PROTOBUF_NAMESPACE_ID::int64 SegmentMeta::mem_size() const {
// @@protoc_insertion_point(field_get:milvus.proto.etcd.SegmentMeta.mem_size)
return mem_size_;
}
inline void SegmentMeta::set_mem_size(::PROTOBUF_NAMESPACE_ID::int64 value) {
mem_size_ = value;
// @@protoc_insertion_point(field_set:milvus.proto.etcd.SegmentMeta.mem_size)
}
#ifdef __GNUC__
#pragma GCC diagnostic pop
#endif // __GNUC__
......
......@@ -32,19 +32,26 @@ func TestMaster_CollectionTask(t *testing.T) {
assert.Nil(t, err)
opt := Option{
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
DefaultRecordSize: 1024,
MinimumAssignSize: 1048576,
SegmentThreshold: 536870912,
SegmentExpireDuration: 2000,
NumOfChannel: 5,
NumOfQueryNode: 3,
StatsChannels: "statistic",
}
svr, err := CreateServer(ctx, &opt)
......
......@@ -400,3 +400,27 @@ func (s *Master) AllocID(ctx context.Context, request *internalpb.IDRequest) (*i
return response, nil
}
func (s *Master) AssignSegmentID(ctx context.Context, request *internalpb.AssignSegIDRequest) (*internalpb.AssignSegIDResponse, error) {
segInfos, err := s.segmentMgr.AssignSegmentID(request.GetPerChannelReq())
if err != nil {
return &internalpb.AssignSegIDResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR},
}, nil
}
ts, err := tso.AllocOne()
if err != nil {
return &internalpb.AssignSegIDResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR},
}, nil
}
return &internalpb.AssignSegIDResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
Reason: "",
},
Timestamp: ts,
ExpireDuration: 10000,
PerChannelAssignment: segInfos,
}, nil
}
......@@ -32,19 +32,26 @@ func TestMaster_CreateCollection(t *testing.T) {
assert.Nil(t, err)
opt := Option{
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
DefaultRecordSize: 1024,
MinimumAssignSize: 1048576,
SegmentThreshold: 536870912,
SegmentExpireDuration: 2000,
NumOfChannel: 5,
NumOfQueryNode: 3,
StatsChannels: "statistic",
}
svr, err := CreateServer(ctx, &opt)
......
......@@ -2,7 +2,6 @@ package master
import (
"context"
"fmt"
"log"
"math/rand"
"net"
......@@ -11,14 +10,11 @@ import (
"sync/atomic"
"time"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/golang/protobuf/proto"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/master/controller"
"github.com/zilliztech/milvus-distributed/internal/master/id"
"github.com/zilliztech/milvus-distributed/internal/master/informer"
masterParams "github.com/zilliztech/milvus-distributed/internal/master/paramtable"
......@@ -52,6 +48,14 @@ type Option struct {
PulsarDMChannels []string
PulsarK2SChannels []string
DefaultRecordSize int64
MinimumAssignSize int64
SegmentThreshold float64
SegmentExpireDuration int64
NumOfChannel int
NumOfQueryNode int
StatsChannels string
}
type Master struct {
......@@ -88,6 +92,9 @@ type Master struct {
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
segmentMgr *SegmentManager
statsMs ms.MsgStream
}
func newKVBase(kvRoot string, etcdAddr []string) *kv.EtcdKV {
......@@ -135,7 +142,7 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
return nil, err
}
pulsarProxyStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream
pulsarProxyStream.SetPulsarCient(opt.PulsarAddr)
pulsarProxyStream.SetPulsarClient(opt.PulsarAddr)
pulsarProxyStream.CreatePulsarConsumers(opt.PulsarProxyChannels, opt.PulsarProxySubName, ms.NewUnmarshalDispatcher(), 1024)
pulsarProxyStream.Start()
var proxyStream ms.MsgStream = pulsarProxyStream
......@@ -143,7 +150,7 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
tsmp.SetProxyTtBarrier(proxyTimeTickBarrier)
pulsarWriteStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream
pulsarWriteStream.SetPulsarCient(opt.PulsarAddr)
pulsarWriteStream.SetPulsarClient(opt.PulsarAddr)
pulsarWriteStream.CreatePulsarConsumers(opt.PulsarWriteChannels, opt.PulsarWriteSubName, ms.NewUnmarshalDispatcher(), 1024)
pulsarWriteStream.Start()
var writeStream ms.MsgStream = pulsarWriteStream
......@@ -151,15 +158,21 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
tsmp.SetWriteNodeTtBarrier(writeTimeTickBarrier)
pulsarDMStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarDMStream.SetPulsarCient(opt.PulsarAddr)
pulsarDMStream.SetPulsarClient(opt.PulsarAddr)
pulsarDMStream.CreatePulsarProducers(opt.PulsarDMChannels)
tsmp.SetDMSyncStream(pulsarDMStream)
pulsarK2SStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarK2SStream.SetPulsarCient(opt.PulsarAddr)
pulsarK2SStream.SetPulsarClient(opt.PulsarAddr)
pulsarK2SStream.CreatePulsarProducers(opt.PulsarK2SChannels)
tsmp.SetK2sSyncStream(pulsarK2SStream)
// stats msg stream
statsMs := ms.NewPulsarMsgStream(ctx, 1024)
statsMs.SetPulsarClient(opt.PulsarAddr)
statsMs.CreatePulsarConsumers([]string{opt.StatsChannels}, "SegmentStats", ms.NewUnmarshalDispatcher(), 1024)
statsMs.Start()
m := &Master{
ctx: ctx,
startTimestamp: time.Now().Unix(),
......@@ -170,6 +183,8 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
ssChan: make(chan internalpb.SegmentStats, 10),
grpcErr: make(chan error),
pc: informer.NewPulsarClient(),
segmentMgr: NewSegmentManager(metakv, opt),
statsMs: statsMs,
}
m.grpcServer = grpc.NewServer()
masterpb.RegisterMasterServer(m.grpcServer, m)
......@@ -220,7 +235,6 @@ func (s *Master) IsServing() bool {
// Run runs the pd server.
func (s *Master) Run(grpcPort int64) error {
if err := s.startServerLoop(s.ctx, grpcPort); err != nil {
return err
}
......@@ -343,46 +357,6 @@ func (s *Master) tsLoop() {
}
}
// todo use messagestream
func (s *Master) pulsarLoop() {
defer s.serverLoopWg.Done()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
consumer, err := s.pc.Client.Subscribe(pulsar.ConsumerOptions{
Topic: masterParams.Params.PulsarToic(),
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
return
}
defer func() {
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
cancel()
}()
consumerChan := consumer.Chan()
for {
select {
case msg := <-consumerChan:
var m internalpb.SegmentStats
proto.Unmarshal(msg.Payload(), &m)
fmt.Printf("Received message msgId: %#v -- content: '%d'\n",
msg.ID(), m.SegmentID)
s.ssChan <- m
consumer.Ack(msg)
case <-ctx.Done():
log.Print("server is closed, exit pulsar loop")
return
}
}
}
func (s *Master) tasksExecutionLoop() {
defer s.serverLoopWg.Done()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
......@@ -412,14 +386,17 @@ func (s *Master) tasksExecutionLoop() {
func (s *Master) segmentStatisticsLoop() {
defer s.serverLoopWg.Done()
defer s.statsMs.Close()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
for {
select {
case ss := <-s.ssChan:
controller.ComputeCloseTime(ss, s.kvBase)
case msg := <-s.statsMs.Chan():
err := s.segmentMgr.HandleQueryNodeMsgPack(msg)
if err != nil {
log.Println(err)
}
case <-ctx.Done():
log.Print("server is closed, exit segment statistics loop")
return
......
......@@ -449,7 +449,7 @@ func (mt *metaTable) DeleteSegment(segID UniqueID) error {
return nil
}
func (mt *metaTable) CloseSegment(segID UniqueID, closeTs Timestamp, numRows int64) error {
func (mt *metaTable) CloseSegment(segID UniqueID, closeTs Timestamp, numRows int64, memSize int64) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
......@@ -460,6 +460,7 @@ func (mt *metaTable) CloseSegment(segID UniqueID, closeTs Timestamp, numRows int
segMeta.CloseTime = closeTs
segMeta.NumRows = numRows
segMeta.MemSize = memSize
err := mt.saveSegmentMeta(&segMeta)
if err != nil {
......
......@@ -294,14 +294,15 @@ func TestMetaTable_Segment(t *testing.T) {
getSegMeta, err := meta.GetSegmentByID(segMeta.SegmentID)
assert.Nil(t, err)
assert.Equal(t, &segMeta, getSegMeta)
err = meta.CloseSegment(segMeta.SegmentID, Timestamp(11), 111)
err = meta.CloseSegment(segMeta.SegmentID, Timestamp(11), 111, 100000)
assert.Nil(t, err)
err = meta.CloseSegment(1000, Timestamp(11), 111)
err = meta.CloseSegment(1000, Timestamp(11), 111, 100000)
assert.NotNil(t, err)
getSegMeta, err = meta.GetSegmentByID(segMeta.SegmentID)
assert.Nil(t, err)
assert.Equal(t, getSegMeta.NumRows, int64(111))
assert.Equal(t, getSegMeta.CloseTime, uint64(11))
assert.Equal(t, int64(100000), getSegMeta.MemSize)
err = meta.DeleteSegment(segMeta.SegmentID)
assert.Nil(t, err)
err = meta.DeleteSegment(1000)
......
......@@ -36,10 +36,47 @@ func (p *ParamTable) PulsarToic() string {
}
func (p *ParamTable) SegmentThreshold() float64 {
threshole, _ := p.Load("master.segmentthreshold")
segmentThreshole, err := strconv.ParseFloat(threshole, 32)
threshold, _ := p.Load("master.segmentThreshold")
segmentThreshold, err := strconv.ParseFloat(threshold, 32)
if err != nil {
panic(err)
}
return segmentThreshole
return segmentThreshold
}
func (p *ParamTable) DefaultRecordSize() int64 {
size, _ := p.Load("master.defaultSizePerRecord")
res, err := strconv.ParseInt(size, 10, 64)
if err != nil {
panic(err)
}
return res
}
func (p *ParamTable) MinimumAssignSize() int64 {
size, _ := p.Load("master.minimumAssignSize")
res, err := strconv.ParseInt(size, 10, 64)
if err != nil {
panic(err)
}
return res
}
func (p *ParamTable) SegmentExpireDuration() int64 {
duration, _ := p.Load("master.segmentExpireDuration")
res, err := strconv.ParseInt(duration, 10, 64)
if err != nil {
panic(err)
}
return res
}
func (p *ParamTable) QueryNodeNum() (int, error) {
num, _ := p.Load("master.querynodenum")
return strconv.Atoi(num)
}
func (p *ParamTable) StatsChannels() string {
channels, _ := p.Load("master.statsChannels")
return channels
}
......@@ -36,19 +36,26 @@ func TestMaster_Partition(t *testing.T) {
assert.Nil(t, err)
opt := Option{
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
KVRootPath: "/test/root/kv",
MetaRootPath: "/test/root/meta",
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
DefaultRecordSize: 1024,
MinimumAssignSize: 1048576,
SegmentThreshold: 536870912,
SegmentExpireDuration: 2000,
NumOfChannel: 5,
NumOfQueryNode: 3,
StatsChannels: "statistic",
}
port := 10000 + rand.Intn(1000)
......
package master
import (
"sync"
"time"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/master/id"
"github.com/zilliztech/milvus-distributed/internal/master/tso"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
type collectionStatus struct {
openedSegments []UniqueID
}
type assignment struct {
MemSize int64 // bytes
AssignTime time.Time
}
type channelRange struct {
channelStart int32
channelEnd int32
}
type segmentStatus struct {
assignments []*assignment
}
type SegmentManager struct {
metaTable *metaTable
statsStream msgstream.MsgStream
channelRanges []*channelRange
segmentStatus map[UniqueID]*segmentStatus // segment id to segment status
collStatus map[UniqueID]*collectionStatus // collection id to collection status
defaultSizePerRecord int64
minimumAssignSize int64
segmentThreshold int64
segmentExpireDuration int64
numOfChannels int
numOfQueryNodes int
mu sync.RWMutex
}
func (segMgr *SegmentManager) HandleQueryNodeMsgPack(msgPack *msgstream.MsgPack) error {
segMgr.mu.Lock()
defer segMgr.mu.Unlock()
for _, msg := range msgPack.Msgs {
statsMsg, ok := msg.(*msgstream.QueryNodeSegStatsMsg)
if !ok {
return errors.Errorf("Type of message is not QueryNodeSegStatsMsg")
}
for _, segStat := range statsMsg.GetSegStats() {
err := segMgr.handleSegmentStat(segStat)
if err != nil {
return err
}
}
}
return nil
}
func (segMgr *SegmentManager) handleSegmentStat(segStats *internalpb.SegmentStats) error {
if !segStats.GetRecentlyModified() {
return nil
}
segID := segStats.GetSegmentID()
segMeta, err := segMgr.metaTable.GetSegmentByID(segID)
if err != nil {
return err
}
segMeta.NumRows = segStats.NumRows
segMeta.MemSize = segStats.MemorySize
if segStats.MemorySize > segMgr.segmentThreshold {
return segMgr.closeSegment(segMeta)
}
return segMgr.metaTable.UpdateSegment(segMeta)
}
func (segMgr *SegmentManager) closeSegment(segMeta *etcdpb.SegmentMeta) error {
if segMeta.GetCloseTime() == 0 {
// close the segment and remove from collStatus
collStatus, ok := segMgr.collStatus[segMeta.GetCollectionID()]
if !ok {
return errors.Errorf("Can not find the status of collection %d", segMeta.GetCollectionID())
}
openedSegments := collStatus.openedSegments
for i, openedSegID := range openedSegments {
if openedSegID == segMeta.SegmentID {
openedSegments[i] = openedSegments[len(openedSegments)-1]
collStatus.openedSegments = openedSegments[:len(openedSegments)-1]
return nil
}
}
ts, err := tso.AllocOne()
if err != nil {
return err
}
segMeta.CloseTime = ts
}
err := segMgr.metaTable.CloseSegment(segMeta.SegmentID, segMeta.GetCloseTime(), segMeta.NumRows, segMeta.MemSize)
if err != nil {
return err
}
return errors.Errorf("The segment %d is not opened in collection %d", segMeta.SegmentID, segMeta.GetCollectionID())
}
func (segMgr *SegmentManager) AssignSegmentID(segIDReq []*internalpb.SegIDRequest) ([]*internalpb.SegIDAssignment, error) {
segMgr.mu.Lock()
defer segMgr.mu.Unlock()
res := make([]*internalpb.SegIDAssignment, 0)
for _, req := range segIDReq {
collName := req.CollName
partitionTag := req.PartitionTag
count := req.Count
channelID := req.ChannelID
collMeta, err := segMgr.metaTable.GetCollectionByName(collName)
if err != nil {
return nil, err
}
collID := collMeta.GetID()
if !segMgr.metaTable.HasCollection(collID) {
return nil, errors.Errorf("can not find collection with id=%d", collID)
}
if !segMgr.metaTable.HasPartition(collID, partitionTag) {
return nil, errors.Errorf("partition tag %s can not find in coll %d", partitionTag, collID)
}
collStatus, ok := segMgr.collStatus[collID]
if !ok {
collStatus = &collectionStatus{
openedSegments: make([]UniqueID, 0),
}
segMgr.collStatus[collID] = collStatus
}
assignInfo, err := segMgr.assignSegment(collName, collID, partitionTag, count, channelID, collStatus)
if err != nil {
return nil, err
}
res = append(res, assignInfo)
}
return res, nil
}
func (segMgr *SegmentManager) assignSegment(collName string, collID UniqueID, partitionTag string, count uint32, channelID int32,
collStatus *collectionStatus) (*internalpb.SegIDAssignment, error) {
for _, segID := range collStatus.openedSegments {
segMeta, _ := segMgr.metaTable.GetSegmentByID(segID)
if segMeta.GetCloseTime() != 0 || channelID < segMeta.GetChannelStart() ||
channelID > segMeta.GetChannelEnd() || segMeta.PartitionTag != partitionTag {
continue
}
// check whether segment has enough mem size
assignedMem := segMgr.checkAssignedSegExpire(segID)
memSize := segMeta.MemSize
neededMemSize := segMgr.calNeededSize(memSize, segMeta.NumRows, int64(count))
if memSize+assignedMem+neededMemSize <= segMgr.segmentThreshold {
remainingSize := segMgr.segmentThreshold - memSize - assignedMem
allocMemSize := segMgr.calAllocMemSize(neededMemSize, remainingSize)
segMgr.addAssignment(segID, allocMemSize)
return &internalpb.SegIDAssignment{
SegID: segID,
ChannelID: channelID,
Count: uint32(segMgr.calNumRows(memSize, segMeta.NumRows, allocMemSize)),
CollName: collName,
PartitionTag: partitionTag,
}, nil
}
}
neededMemSize := segMgr.defaultSizePerRecord * int64(count)
if neededMemSize > segMgr.segmentThreshold {
return nil, errors.Errorf("request with count %d need about %d mem size which is larger than segment threshold",
count, neededMemSize)
}
segMeta, err := segMgr.openNewSegment(channelID, collID, partitionTag)
if err != nil {
return nil, err
}
allocMemSize := segMgr.calAllocMemSize(neededMemSize, segMgr.segmentThreshold)
segMgr.addAssignment(segMeta.SegmentID, allocMemSize)
return &internalpb.SegIDAssignment{
SegID: segMeta.SegmentID,
ChannelID: channelID,
Count: uint32(segMgr.calNumRows(0, 0, allocMemSize)),
CollName: collName,
PartitionTag: partitionTag,
}, nil
}
func (segMgr *SegmentManager) addAssignment(segID UniqueID, allocSize int64) {
segStatus := segMgr.segmentStatus[segID]
segStatus.assignments = append(segStatus.assignments, &assignment{
MemSize: allocSize,
AssignTime: time.Now(),
})
}
func (segMgr *SegmentManager) calNeededSize(memSize int64, numRows int64, count int64) int64 {
var avgSize int64
if memSize == 0 || numRows == 0 || memSize/numRows == 0 {
avgSize = segMgr.defaultSizePerRecord
} else {
avgSize = memSize / numRows
}
return avgSize * count
}
func (segMgr *SegmentManager) calAllocMemSize(neededSize int64, remainSize int64) int64 {
if neededSize > remainSize {
return 0
}
if remainSize < segMgr.minimumAssignSize {
return remainSize
}
if neededSize < segMgr.minimumAssignSize {
return segMgr.minimumAssignSize
}
return neededSize
}
func (segMgr *SegmentManager) calNumRows(memSize int64, numRows int64, allocMemSize int64) int64 {
var avgSize int64
if memSize == 0 || numRows == 0 || memSize/numRows == 0 {
avgSize = segMgr.defaultSizePerRecord
} else {
avgSize = memSize / numRows
}
return allocMemSize / avgSize
}
func (segMgr *SegmentManager) openNewSegment(channelID int32, collID UniqueID, partitionTag string) (*etcdpb.SegmentMeta, error) {
// find the channel range
channelStart, channelEnd := int32(-1), int32(-1)
for _, r := range segMgr.channelRanges {
if channelID >= r.channelStart && channelID <= r.channelEnd {
channelStart = r.channelStart
channelEnd = r.channelEnd
break
}
}
if channelStart == -1 {
return nil, errors.Errorf("can't find the channel range which contains channel %d", channelID)
}
newID, err := id.AllocOne()
if err != nil {
return nil, err
}
openTime, err := tso.AllocOne()
if err != nil {
return nil, err
}
newSegMeta := &etcdpb.SegmentMeta{
SegmentID: newID,
CollectionID: collID,
PartitionTag: partitionTag,
ChannelStart: channelStart,
ChannelEnd: channelEnd,
OpenTime: openTime,
NumRows: 0,
MemSize: 0,
}
err = segMgr.metaTable.AddSegment(newSegMeta)
if err != nil {
return nil, err
}
segMgr.segmentStatus[newID] = &segmentStatus{
assignments: make([]*assignment, 0),
}
collStatus := segMgr.collStatus[collID]
collStatus.openedSegments = append(collStatus.openedSegments, newSegMeta.SegmentID)
return newSegMeta, nil
}
// checkAssignedSegExpire check the expire time of assignments and return the total sum of assignments that are not expired.
func (segMgr *SegmentManager) checkAssignedSegExpire(segID UniqueID) int64 {
segStatus := segMgr.segmentStatus[segID]
assignments := segStatus.assignments
result := int64(0)
i := 0
for i < len(assignments) {
assign := assignments[i]
if time.Since(assign.AssignTime) >= time.Duration(segMgr.segmentExpireDuration)*time.Millisecond {
assignments[i] = assignments[len(assignments)-1]
assignments = assignments[:len(assignments)-1]
continue
}
result += assign.MemSize
i++
}
segStatus.assignments = assignments
return result
}
func (segMgr *SegmentManager) createChannelRanges() error {
div, rem := segMgr.numOfChannels/segMgr.numOfQueryNodes, segMgr.numOfChannels%segMgr.numOfQueryNodes
for i, j := 0, 0; i < segMgr.numOfChannels; j++ {
if j < rem {
segMgr.channelRanges = append(segMgr.channelRanges, &channelRange{
channelStart: int32(i),
channelEnd: int32(i + div),
})
i += div + 1
} else {
segMgr.channelRanges = append(segMgr.channelRanges, &channelRange{
channelStart: int32(i),
channelEnd: int32(i + div - 1),
})
i += div
}
}
return nil
}
func NewSegmentManager(meta *metaTable, opt *Option) *SegmentManager {
segMgr := &SegmentManager{
metaTable: meta,
channelRanges: make([]*channelRange, 0),
segmentStatus: make(map[UniqueID]*segmentStatus),
collStatus: make(map[UniqueID]*collectionStatus),
segmentThreshold: int64(opt.SegmentThreshold),
segmentExpireDuration: opt.SegmentExpireDuration,
minimumAssignSize: opt.MinimumAssignSize,
defaultSizePerRecord: opt.DefaultRecordSize,
numOfChannels: opt.NumOfChannel,
numOfQueryNodes: opt.NumOfQueryNode,
}
segMgr.createChannelRanges()
return segMgr
}
package master
import (
"log"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/master/id"
masterParam "github.com/zilliztech/milvus-distributed/internal/master/paramtable"
"github.com/zilliztech/milvus-distributed/internal/master/tso"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"go.etcd.io/etcd/clientv3"
)
var mt *metaTable
var segMgr *SegmentManager
var collName = "coll_segmgr_test"
var collID = int64(1001)
var partitionTag = "test"
var kvBase *kv.EtcdKV
func setup() {
masterParam.Params.Init()
etcdAddress, err := masterParam.Params.EtcdAddress()
if err != nil {
panic(err)
}
rootPath, err := masterParam.Params.EtcdRootPath()
if err != nil {
panic(err)
}
id.Init([]string{etcdAddress}, rootPath)
tso.Init([]string{etcdAddress}, rootPath)
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
if err != nil {
panic(err)
}
rootpath := "/etcd/test/root"
kvBase = kv.NewEtcdKV(cli, rootpath)
tmpMt, err := NewMetaTable(kvBase)
if err != nil {
panic(err)
}
mt = tmpMt
if mt.HasCollection(collID) {
mt.DeleteCollection(collID)
}
err = mt.AddCollection(&pb.CollectionMeta{
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: collName,
},
CreateTime: 0,
SegmentIDs: []UniqueID{},
PartitionTags: []string{},
})
if err != nil {
panic(err)
}
err = mt.AddPartition(collID, partitionTag)
if err != nil {
panic(err)
}
opt := &Option{
SegmentThreshold: 536870912,
SegmentExpireDuration: 2000,
MinimumAssignSize: 1048576,
DefaultRecordSize: 1024,
NumOfQueryNode: 3,
NumOfChannel: 5,
}
segMgr = NewSegmentManager(mt, opt)
}
func teardown() {
err := mt.DeleteCollection(collID)
if err != nil {
log.Fatalf(err.Error())
}
kvBase.Close()
}
func TestSegmentManager_AssignSegmentID(t *testing.T) {
setup()
defer teardown()
reqs := []*internalpb.SegIDRequest{
{CollName: collName, PartitionTag: partitionTag, Count: 25000, ChannelID: 0},
{CollName: collName, PartitionTag: partitionTag, Count: 10000, ChannelID: 1},
{CollName: collName, PartitionTag: partitionTag, Count: 30000, ChannelID: 2},
{CollName: collName, PartitionTag: partitionTag, Count: 25000, ChannelID: 3},
{CollName: collName, PartitionTag: partitionTag, Count: 10000, ChannelID: 4},
}
segAssigns, err := segMgr.AssignSegmentID(reqs)
assert.Nil(t, err)
assert.Equal(t, uint32(25000), segAssigns[0].Count)
assert.Equal(t, uint32(10000), segAssigns[1].Count)
assert.Equal(t, uint32(30000), segAssigns[2].Count)
assert.Equal(t, uint32(25000), segAssigns[3].Count)
assert.Equal(t, uint32(10000), segAssigns[4].Count)
assert.Equal(t, segAssigns[0].SegID, segAssigns[1].SegID)
assert.Equal(t, segAssigns[2].SegID, segAssigns[3].SegID)
newReqs := []*internalpb.SegIDRequest{
{CollName: collName, PartitionTag: partitionTag, Count: 500000, ChannelID: 0},
}
// test open a new segment
newAssign, err := segMgr.AssignSegmentID(newReqs)
assert.Nil(t, err)
assert.NotNil(t, newAssign)
assert.Equal(t, uint32(500000), newAssign[0].Count)
assert.NotEqual(t, segAssigns[0].SegID, newAssign[0].SegID)
// test assignment expiration
time.Sleep(3 * time.Second)
assignAfterExpiration, err := segMgr.AssignSegmentID(newReqs)
assert.Nil(t, err)
assert.NotNil(t, assignAfterExpiration)
assert.Equal(t, uint32(500000), assignAfterExpiration[0].Count)
assert.Equal(t, segAssigns[0].SegID, assignAfterExpiration[0].SegID)
// test invalid params
newReqs[0].CollName = "wrong_collname"
_, err = segMgr.AssignSegmentID(newReqs)
assert.Error(t, errors.Errorf("can not find collection with id=%d", collID), err)
newReqs[0].Count = 1000000
_, err = segMgr.AssignSegmentID(newReqs)
assert.Error(t, errors.Errorf("request with count %d need about %d mem size which is larger than segment threshold",
1000000, masterParam.Params.DefaultRecordSize()*1000000), err)
}
func TestSegmentManager_SegmentStats(t *testing.T) {
setup()
defer teardown()
ts, err := tso.AllocOne()
assert.Nil(t, err)
err = mt.AddSegment(&pb.SegmentMeta{
SegmentID: 100,
CollectionID: collID,
PartitionTag: partitionTag,
ChannelStart: 0,
ChannelEnd: 1,
OpenTime: ts,
})
assert.Nil(t, err)
stats := internalpb.QueryNodeSegStats{
MsgType: internalpb.MsgType_kQueryNodeSegStats,
PeerID: 1,
SegStats: []*internalpb.SegmentStats{
{SegmentID: 100, MemorySize: 25000 * masterParam.Params.DefaultRecordSize(), NumRows: 25000, RecentlyModified: true},
},
}
baseMsg := msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []int32{1},
}
msg := msgstream.QueryNodeSegStatsMsg{
QueryNodeSegStats: stats,
BaseMsg: baseMsg,
}
var tsMsg msgstream.TsMsg = &msg
msgPack := msgstream.MsgPack{
Msgs: make([]msgstream.TsMsg, 0),
}
msgPack.Msgs = append(msgPack.Msgs, tsMsg)
err = segMgr.HandleQueryNodeMsgPack(&msgPack)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
segMeta, _ := mt.GetSegmentByID(100)
assert.Equal(t, int64(100), segMeta.SegmentID)
assert.Equal(t, 25000*masterParam.Params.DefaultRecordSize(), segMeta.MemSize)
assert.Equal(t, int64(25000), segMeta.NumRows)
// close segment
stats.SegStats[0].NumRows = 520000
stats.SegStats[0].MemorySize = 520000 * masterParam.Params.DefaultRecordSize()
err = segMgr.HandleQueryNodeMsgPack(&msgPack)
assert.Nil(t, err)
time.Sleep(1 * time.Second)
segMeta, _ = mt.GetSegmentByID(100)
assert.Equal(t, int64(100), segMeta.SegmentID)
assert.NotEqual(t, 0, segMeta.CloseTime)
}
......@@ -42,7 +42,7 @@ func initTestPulsarStream(ctx context.Context, pulsarAddress string,
// set input stream
inputStream := ms.NewPulsarMsgStream(ctx, 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
......@@ -51,7 +51,7 @@ func initTestPulsarStream(ctx context.Context, pulsarAddress string,
// set output stream
outputStream := ms.NewPulsarMsgStream(ctx, 100)
outputStream.SetPulsarCient(pulsarAddress)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := ms.NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
var output ms.MsgStream = outputStream
......
......@@ -38,13 +38,13 @@ func initPulsarStream(pulsarAddress string,
// set input stream
inputStream := ms.NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
var input ms.MsgStream = inputStream
// set output stream
outputStream := ms.NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := ms.NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
......
......@@ -58,7 +58,7 @@ func NewPulsarMsgStream(ctx context.Context, receiveBufSize int64) *PulsarMsgStr
return stream
}
func (ms *PulsarMsgStream) SetPulsarCient(address string) {
func (ms *PulsarMsgStream) SetPulsarClient(address string) {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: address})
if err != nil {
log.Printf("Set pulsar client failed, error = %v", err)
......
......@@ -145,7 +145,7 @@ func initPulsarStream(pulsarAddress string,
// set input stream
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
......@@ -155,7 +155,7 @@ func initPulsarStream(pulsarAddress string,
// set output stream
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
......@@ -172,7 +172,7 @@ func initPulsarTtStream(pulsarAddress string,
// set input stream
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
for _, opt := range opts {
inputStream.SetRepackFunc(opt)
......@@ -182,7 +182,7 @@ func initPulsarTtStream(pulsarAddress string,
// set output stream
outputStream := NewPulsarTtMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
......@@ -383,12 +383,12 @@ func TestStream_PulsarMsgStream_InsertRepackFunc(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, insertMsg)
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
inputStream.Start()
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
......@@ -433,12 +433,12 @@ func TestStream_PulsarMsgStream_DeleteRepackFunc(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, deleteMsg)
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
inputStream.Start()
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
......@@ -466,12 +466,12 @@ func TestStream_PulsarMsgStream_DefaultRepackFunc(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kQueryNodeSegStats, 4, 4))
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
inputStream.Start()
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
outputStream.CreatePulsarConsumers(consumerChannels, consumerSubName, unmarshalDispatcher, 100)
outputStream.Start()
......
......@@ -128,13 +128,13 @@ func TestStream_task_Insert(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getInsertTask(3, 3))
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
inputStream.SetRepackFunc(newRepackFunc)
inputStream.Start()
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
testTask := InsertTask{}
unmarshalDispatcher.AddMsgTemplate(internalPb.MsgType_kInsert, testTask.Unmarshal)
......
......@@ -33,12 +33,12 @@ func TestStream_unmarshal_Insert(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(internalPb.MsgType_kInsert, 3, 3))
inputStream := NewPulsarMsgStream(context.Background(), 100)
inputStream.SetPulsarCient(pulsarAddress)
inputStream.SetPulsarClient(pulsarAddress)
inputStream.CreatePulsarProducers(producerChannels)
inputStream.Start()
outputStream := NewPulsarMsgStream(context.Background(), 100)
outputStream.SetPulsarCient(pulsarAddress)
outputStream.SetPulsarClient(pulsarAddress)
unmarshalDispatcher := NewUnmarshalDispatcher()
//add a new unmarshall func for msgType kInsert
......
......@@ -38,4 +38,5 @@ message SegmentMeta {
uint64 open_time=6;
uint64 close_time=7;
int64 num_rows=8;
int64 mem_size=9;
}
......@@ -220,6 +220,7 @@ type SegmentMeta struct {
OpenTime uint64 `protobuf:"varint,6,opt,name=open_time,json=openTime,proto3" json:"open_time,omitempty"`
CloseTime uint64 `protobuf:"varint,7,opt,name=close_time,json=closeTime,proto3" json:"close_time,omitempty"`
NumRows int64 `protobuf:"varint,8,opt,name=num_rows,json=numRows,proto3" json:"num_rows,omitempty"`
MemSize int64 `protobuf:"varint,9,opt,name=mem_size,json=memSize,proto3" json:"mem_size,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
......@@ -306,6 +307,13 @@ func (m *SegmentMeta) GetNumRows() int64 {
return 0
}
func (m *SegmentMeta) GetMemSize() int64 {
if m != nil {
return m.MemSize
}
return 0
}
func init() {
proto.RegisterType((*TenantMeta)(nil), "milvus.proto.etcd.TenantMeta")
proto.RegisterType((*ProxyMeta)(nil), "milvus.proto.etcd.ProxyMeta")
......@@ -316,37 +324,38 @@ func init() {
func init() { proto.RegisterFile("etcd_meta.proto", fileDescriptor_975d306d62b73e88) }
var fileDescriptor_975d306d62b73e88 = []byte{
// 506 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x93, 0x4d, 0x8f, 0xd3, 0x30,
0x10, 0x86, 0x95, 0xa6, 0x5f, 0x99, 0x7e, 0xb1, 0x39, 0x85, 0x65, 0x81, 0xaa, 0x68, 0xa1, 0x12,
0xa2, 0x95, 0x40, 0xe2, 0x06, 0x02, 0xb6, 0x1c, 0x7a, 0x00, 0x41, 0xda, 0x13, 0x97, 0xc8, 0x4d,
0x46, 0xad, 0xa5, 0xd8, 0x2e, 0xb6, 0xc3, 0xb2, 0x7b, 0xe2, 0x2f, 0x70, 0xe5, 0x0f, 0xf1, 0xb7,
0x90, 0x3f, 0x48, 0xb7, 0x52, 0x8f, 0x79, 0xe6, 0xf5, 0xf8, 0x9d, 0x77, 0x1c, 0x18, 0xa1, 0xce,
0x8b, 0x8c, 0xa1, 0x26, 0xb3, 0xbd, 0x14, 0x5a, 0xc4, 0x67, 0x8c, 0x96, 0x3f, 0x2a, 0xe5, 0xbe,
0x66, 0xa6, 0x7a, 0xde, 0xcf, 0x05, 0x63, 0x82, 0x3b, 0x74, 0xde, 0x57, 0xf9, 0x0e, 0x99, 0x97,
0x4f, 0xfe, 0x04, 0x00, 0x6b, 0xe4, 0x84, 0xeb, 0x4f, 0xa8, 0x49, 0x3c, 0x84, 0xc6, 0x72, 0x91,
0x04, 0xe3, 0x60, 0x1a, 0xa6, 0x8d, 0xe5, 0x22, 0x7e, 0x0a, 0x23, 0x5e, 0xb1, 0xec, 0x7b, 0x85,
0xf2, 0x26, 0xe3, 0xa2, 0x40, 0x95, 0x34, 0x6c, 0x71, 0xc0, 0x2b, 0xf6, 0xd5, 0xd0, 0xcf, 0x06,
0xc6, 0xcf, 0xe1, 0x8c, 0x72, 0x85, 0x52, 0x67, 0xf9, 0x8e, 0x70, 0x8e, 0xe5, 0x72, 0xa1, 0x92,
0x70, 0x1c, 0x4e, 0xa3, 0xf4, 0x9e, 0x2b, 0x5c, 0xd5, 0x3c, 0x7e, 0x06, 0x23, 0xd7, 0xb0, 0xd6,
0x26, 0xcd, 0x71, 0x30, 0x8d, 0xd2, 0xa1, 0xc5, 0xb5, 0x72, 0xf2, 0x2b, 0x80, 0xe8, 0x8b, 0x14,
0x3f, 0x6f, 0x4e, 0x7a, 0x7b, 0x0d, 0x1d, 0x52, 0x14, 0x12, 0x95, 0xf3, 0xd4, 0x7b, 0x79, 0x31,
0x3b, 0x9a, 0xdd, 0x4f, 0xfd, 0xde, 0x69, 0xd2, 0xff, 0x62, 0xe3, 0x55, 0xa2, 0xaa, 0xca, 0x53,
0x5e, 0x5d, 0xe1, 0xe0, 0x75, 0xf2, 0x37, 0x80, 0xe1, 0x95, 0x28, 0x4b, 0xcc, 0x35, 0x15, 0xfc,
0xa4, 0x8f, 0x37, 0xd0, 0x76, 0x91, 0x7a, 0x1b, 0x97, 0xc7, 0x36, 0x7c, 0xdc, 0x87, 0x26, 0x2b,
0x0b, 0x52, 0x7f, 0x28, 0x7e, 0x0c, 0xbd, 0x5c, 0x22, 0xd1, 0x98, 0x69, 0xca, 0x30, 0x09, 0xc7,
0xc1, 0xb4, 0x99, 0x82, 0x43, 0x6b, 0xca, 0x30, 0x7e, 0x04, 0xa0, 0x70, 0xcb, 0x90, 0x6b, 0x63,
0xb4, 0x39, 0x0e, 0xa7, 0x61, 0x7a, 0x87, 0xc4, 0x97, 0x30, 0xdc, 0x13, 0xa9, 0xa9, 0xe9, 0x9d,
0x69, 0xb2, 0x55, 0x49, 0xcb, 0x0e, 0x33, 0xa8, 0xe9, 0x9a, 0x6c, 0xd5, 0xe4, 0x77, 0x03, 0x7a,
0x2b, 0x77, 0xca, 0x8e, 0x71, 0x01, 0x51, 0xdd, 0xc4, 0x4f, 0x73, 0x00, 0xf1, 0x04, 0xfa, 0x79,
0xed, 0x78, 0xb9, 0xf0, 0x5b, 0x3f, 0x62, 0xf1, 0x13, 0x18, 0x1c, 0x5d, 0x6c, 0xbd, 0x47, 0x69,
0xff, 0xee, 0xbd, 0x46, 0xe4, 0x63, 0xce, 0x94, 0x26, 0x52, 0xdb, 0x55, 0xb7, 0xd2, 0xbe, 0x87,
0x2b, 0xc3, 0x6c, 0x06, 0x5e, 0x84, 0xbc, 0x48, 0x5a, 0x56, 0x02, 0x1e, 0x7d, 0xe4, 0x45, 0xfc,
0x00, 0x22, 0xb1, 0x47, 0xee, 0x22, 0x6a, 0xdb, 0x88, 0xba, 0x06, 0xd8, 0x80, 0x1e, 0x02, 0xe4,
0xa5, 0x50, 0x3e, 0xc0, 0x8e, 0xad, 0x46, 0x96, 0xd8, 0xf2, 0x7d, 0xe8, 0x9a, 0x37, 0x2c, 0xc5,
0xb5, 0x4a, 0xba, 0x76, 0x8c, 0x0e, 0xaf, 0x58, 0x2a, 0xae, 0xd5, 0x87, 0x77, 0xdf, 0xde, 0x6e,
0xa9, 0xde, 0x55, 0x1b, 0xf3, 0x58, 0xe6, 0xb7, 0xb4, 0x2c, 0xe9, 0xad, 0xc6, 0x7c, 0x37, 0x77,
0x1b, 0x7c, 0x51, 0x50, 0xa5, 0x25, 0xdd, 0x54, 0x1a, 0x8b, 0x39, 0xe5, 0x1a, 0x25, 0x27, 0xe5,
0xdc, 0xae, 0x75, 0x6e, 0xfe, 0xac, 0xfd, 0x66, 0xd3, 0xb6, 0x5f, 0xaf, 0xfe, 0x05, 0x00, 0x00,
0xff, 0xff, 0xc2, 0xde, 0x28, 0x4b, 0x88, 0x03, 0x00, 0x00,
// 521 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x93, 0xcd, 0x8e, 0xd3, 0x30,
0x14, 0x85, 0x95, 0x66, 0xfa, 0x93, 0xdb, 0x3f, 0x26, 0xab, 0x30, 0x0c, 0x50, 0x15, 0x0d, 0x54,
0x42, 0xb4, 0x12, 0x48, 0xec, 0x40, 0xc0, 0x94, 0x45, 0x17, 0x20, 0x48, 0xbb, 0x62, 0x13, 0xb9,
0xc9, 0x55, 0x6b, 0x29, 0xb6, 0x8b, 0xed, 0x30, 0x4c, 0x57, 0x3c, 0x07, 0xcf, 0xc0, 0x7b, 0xf0,
0x5a, 0xc8, 0x3f, 0xa4, 0x53, 0xa9, 0xcb, 0x7c, 0xe7, 0xc4, 0x39, 0xf7, 0x5c, 0x07, 0x86, 0xa8,
0xf3, 0x22, 0x63, 0xa8, 0xc9, 0x74, 0x27, 0x85, 0x16, 0xf1, 0x39, 0xa3, 0xe5, 0x8f, 0x4a, 0xb9,
0xa7, 0xa9, 0x51, 0x2f, 0x7a, 0xb9, 0x60, 0x4c, 0x70, 0x87, 0x2e, 0x7a, 0x2a, 0xdf, 0x22, 0xf3,
0xf6, 0xf1, 0xef, 0x00, 0x60, 0x85, 0x9c, 0x70, 0xfd, 0x09, 0x35, 0x89, 0x07, 0xd0, 0x58, 0xcc,
0x93, 0x60, 0x14, 0x4c, 0xc2, 0xb4, 0xb1, 0x98, 0xc7, 0x4f, 0x61, 0xc8, 0x2b, 0x96, 0x7d, 0xaf,
0x50, 0xde, 0x66, 0x5c, 0x14, 0xa8, 0x92, 0x86, 0x15, 0xfb, 0xbc, 0x62, 0x5f, 0x0d, 0xfd, 0x6c,
0x60, 0xfc, 0x1c, 0xce, 0x29, 0x57, 0x28, 0x75, 0x96, 0x6f, 0x09, 0xe7, 0x58, 0x2e, 0xe6, 0x2a,
0x09, 0x47, 0xe1, 0x24, 0x4a, 0xef, 0x39, 0xe1, 0xba, 0xe6, 0xf1, 0x33, 0x18, 0xba, 0x03, 0x6b,
0x6f, 0x72, 0x36, 0x0a, 0x26, 0x51, 0x3a, 0xb0, 0xb8, 0x76, 0x8e, 0x7f, 0x05, 0x10, 0x7d, 0x91,
0xe2, 0xe7, 0xed, 0xc9, 0x6c, 0xaf, 0xa1, 0x4d, 0x8a, 0x42, 0xa2, 0x72, 0x99, 0xba, 0x2f, 0x2f,
0xa7, 0x47, 0xb3, 0xfb, 0xa9, 0xdf, 0x3b, 0x4f, 0xfa, 0xdf, 0x6c, 0xb2, 0x4a, 0x54, 0x55, 0x79,
0x2a, 0xab, 0x13, 0x0e, 0x59, 0xc7, 0x7f, 0x03, 0x18, 0x5c, 0x8b, 0xb2, 0xc4, 0x5c, 0x53, 0xc1,
0x4f, 0xe6, 0x78, 0x03, 0x2d, 0x57, 0xa9, 0x8f, 0x71, 0x75, 0x1c, 0xc3, 0xd7, 0x7d, 0x38, 0x64,
0x69, 0x41, 0xea, 0x5f, 0x8a, 0x1f, 0x43, 0x37, 0x97, 0x48, 0x34, 0x66, 0x9a, 0x32, 0x4c, 0xc2,
0x51, 0x30, 0x39, 0x4b, 0xc1, 0xa1, 0x15, 0x65, 0x18, 0x3f, 0x02, 0x50, 0xb8, 0x61, 0xc8, 0xb5,
0x09, 0x7a, 0x36, 0x0a, 0x27, 0x61, 0x7a, 0x87, 0xc4, 0x57, 0x30, 0xd8, 0x11, 0xa9, 0xa9, 0x39,
0x3b, 0xd3, 0x64, 0xa3, 0x92, 0xa6, 0x1d, 0xa6, 0x5f, 0xd3, 0x15, 0xd9, 0xa8, 0xf1, 0x9f, 0x06,
0x74, 0x97, 0xee, 0x2d, 0x3b, 0xc6, 0x25, 0x44, 0xf5, 0x21, 0x7e, 0x9a, 0x03, 0x88, 0xc7, 0xd0,
0xcb, 0xeb, 0xc4, 0x8b, 0xb9, 0xdf, 0xfa, 0x11, 0x8b, 0x9f, 0x40, 0xff, 0xe8, 0xc3, 0x36, 0x7b,
0x94, 0xf6, 0xee, 0x7e, 0xd7, 0x98, 0x7c, 0xcd, 0x99, 0xd2, 0x44, 0x6a, 0xbb, 0xea, 0x66, 0xda,
0xf3, 0x70, 0x69, 0x98, 0xed, 0xc0, 0x9b, 0x90, 0x17, 0x49, 0xd3, 0x5a, 0xc0, 0xa3, 0x8f, 0xbc,
0x88, 0x1f, 0x40, 0x24, 0x76, 0xc8, 0x5d, 0x45, 0x2d, 0x5b, 0x51, 0xc7, 0x00, 0x5b, 0xd0, 0x43,
0x80, 0xbc, 0x14, 0xca, 0x17, 0xd8, 0xb6, 0x6a, 0x64, 0x89, 0x95, 0xef, 0x43, 0xc7, 0xdc, 0x61,
0x29, 0x6e, 0x54, 0xd2, 0xb1, 0x63, 0xb4, 0x79, 0xc5, 0x52, 0x71, 0xa3, 0x8c, 0xc4, 0x90, 0x65,
0x8a, 0xee, 0x31, 0x89, 0x9c, 0xc4, 0x90, 0x2d, 0xe9, 0x1e, 0x3f, 0xbc, 0xfb, 0xf6, 0x76, 0x43,
0xf5, 0xb6, 0x5a, 0x9b, 0x7b, 0x34, 0xdb, 0xd3, 0xb2, 0xa4, 0x7b, 0x8d, 0xf9, 0x76, 0xe6, 0x96,
0xfb, 0xa2, 0xa0, 0x4a, 0x4b, 0xba, 0xae, 0x34, 0x16, 0x33, 0xca, 0x35, 0x4a, 0x4e, 0xca, 0x99,
0xdd, 0xf8, 0xcc, 0xfc, 0x74, 0xbb, 0xf5, 0xba, 0x65, 0x9f, 0x5e, 0xfd, 0x0b, 0x00, 0x00, 0xff,
0xff, 0x59, 0xd6, 0xe1, 0xfb, 0xa3, 0x03, 0x00, 0x00,
}
......@@ -71,6 +71,33 @@ message TsoResponse {
uint32 count = 3;
}
message SegIDRequest {
uint32 count = 1;
int32 channelID = 2;
string coll_name = 3;
string partition_tag = 4;
}
message AssignSegIDRequest {
int64 peerID = 1;
PeerRole role = 2;
repeated SegIDRequest per_channel_req = 3;
}
message SegIDAssignment {
int64 segID = 1;
int32 channelID = 2;
uint32 count = 3;
string coll_name = 4;
string partition_tag = 5;
}
message AssignSegIDResponse {
common.Status status = 1;
uint64 timestamp = 2;
uint64 expire_duration = 3;
repeated SegIDAssignment per_channel_assignment = 4;
}
message CreateCollectionRequest {
MsgType msg_type = 1;
......
......@@ -91,4 +91,6 @@ service Master {
rpc AllocTimestamp(internal.TsoRequest) returns (internal.TsoResponse) {}
rpc AllocID(internal.IDRequest) returns (internal.IDResponse) {}
rpc AssignSegmentID(internal.AssignSegIDRequest) returns (internal.AssignSegIDResponse) {}
}
......@@ -30,34 +30,36 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
func init() { proto.RegisterFile("master.proto", fileDescriptor_f9c348dec43a6705) }
var fileDescriptor_f9c348dec43a6705 = []byte{
// 432 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0x51, 0xaf, 0xd2, 0x30,
0x14, 0xc7, 0x79, 0xba, 0xc6, 0x86, 0xcb, 0xf5, 0xd6, 0x37, 0x7c, 0xf1, 0xee, 0xc9, 0x80, 0x6c,
0x46, 0xbf, 0x80, 0xc2, 0x1e, 0x20, 0xd1, 0x84, 0x00, 0x2f, 0x6a, 0x0c, 0x76, 0xa3, 0x81, 0xc6,
0x6e, 0x9d, 0x3d, 0x67, 0x98, 0xf0, 0xe1, 0xfc, 0x6c, 0x66, 0x1b, 0xdd, 0x56, 0xa1, 0x88, 0xf7,
0x8d, 0xb6, 0xff, 0xf3, 0xfb, 0x73, 0xce, 0xf9, 0x67, 0xa4, 0x9b, 0x30, 0x40, 0xae, 0xfd, 0x4c,
0x2b, 0x54, 0xf4, 0x79, 0x22, 0xe4, 0x3e, 0x87, 0xea, 0xe4, 0x57, 0x4f, 0xfd, 0x6e, 0xac, 0x92,
0x44, 0xa5, 0xd5, 0x65, 0x9f, 0x8a, 0x14, 0xb9, 0x4e, 0x99, 0x5c, 0x27, 0xb0, 0x3d, 0xde, 0xdd,
0x03, 0xd7, 0x7b, 0x11, 0xf3, 0xe6, 0xea, 0xed, 0xef, 0xa7, 0xe4, 0xe6, 0x53, 0x59, 0x4f, 0x19,
0x79, 0x36, 0xd1, 0x9c, 0x21, 0x9f, 0x28, 0x29, 0x79, 0x8c, 0x42, 0xa5, 0xd4, 0xf7, 0x2d, 0x27,
0xc3, 0xf4, 0xff, 0x16, 0x2e, 0xf8, 0xcf, 0x9c, 0x03, 0xf6, 0x5f, 0xd8, 0xfa, 0xe3, 0x3f, 0x5a,
0x22, 0xc3, 0x1c, 0xbc, 0x0e, 0xfd, 0x46, 0x7a, 0xa1, 0x56, 0x59, 0xcb, 0xe0, 0xb5, 0xc3, 0xc0,
0x96, 0x5d, 0x89, 0x8f, 0xc8, 0xed, 0x94, 0x41, 0x8b, 0x3e, 0x74, 0xd0, 0x2d, 0x95, 0x81, 0x7b,
0xb6, 0xf8, 0x38, 0x2b, 0x7f, 0xac, 0x94, 0x5c, 0x70, 0xc8, 0x54, 0x0a, 0xdc, 0xeb, 0xd0, 0x9c,
0xd0, 0x90, 0x43, 0xac, 0x45, 0xd4, 0x9e, 0xd3, 0x1b, 0x57, 0x1b, 0x27, 0x52, 0xe3, 0x36, 0x3c,
0xef, 0xd6, 0x08, 0xab, 0xd2, 0xac, 0xf8, 0xe9, 0x75, 0xe8, 0x0f, 0x72, 0xb7, 0xdc, 0xa9, 0x5f,
0xcd, 0x33, 0x38, 0x47, 0x67, 0xeb, 0x8c, 0xdf, 0xab, 0xf3, 0x7e, 0x4b, 0xd4, 0x22, 0xdd, 0x7e,
0x14, 0x80, 0xad, 0x1e, 0xd7, 0xe4, 0xae, 0x5a, 0xf0, 0x9c, 0x69, 0x14, 0x65, 0x83, 0xa3, 0x8b,
0x41, 0xa8, 0x75, 0x57, 0x2e, 0xea, 0x2b, 0xb9, 0x2d, 0x16, 0xdc, 0xe0, 0x87, 0x17, 0x62, 0xf0,
0xbf, 0xf0, 0xef, 0xa4, 0x3b, 0x65, 0xd0, 0xb0, 0x07, 0xee, 0x10, 0x9c, 0xa0, 0xaf, 0xcb, 0x80,
0x26, 0xf7, 0x66, 0xb1, 0x8d, 0x4d, 0xf0, 0x8f, 0x08, 0x9c, 0x78, 0x0d, 0xce, 0x7b, 0xd5, 0x3a,
0x3b, 0x00, 0x82, 0xf4, 0x8a, 0xc5, 0xd6, 0xaf, 0xe0, 0x9c, 0x99, 0x25, 0x7b, 0xcc, 0xfa, 0x3f,
0x93, 0xde, 0x07, 0x29, 0x55, 0xbc, 0x12, 0x09, 0x07, 0x64, 0x49, 0x46, 0x1f, 0x1c, 0x56, 0x2b,
0x50, 0x8e, 0xc9, 0xd9, 0x92, 0x1a, 0x3d, 0x27, 0x4f, 0x4a, 0xf4, 0x2c, 0xa4, 0x2f, 0x1d, 0x05,
0xb3, 0xd0, 0x20, 0x1f, 0x2e, 0x28, 0x0c, 0x71, 0x3c, 0xfe, 0xf2, 0x7e, 0x2b, 0x70, 0x97, 0x47,
0x45, 0x0e, 0x82, 0x83, 0x90, 0x52, 0x1c, 0x90, 0xc7, 0xbb, 0xa0, 0xaa, 0x1d, 0x6d, 0x04, 0xa0,
0x16, 0x51, 0x8e, 0x7c, 0x13, 0x18, 0x42, 0x50, 0x02, 0x83, 0xea, 0xbb, 0x99, 0x45, 0xd1, 0x4d,
0x79, 0x7e, 0xf7, 0x27, 0x00, 0x00, 0xff, 0xff, 0xba, 0x9e, 0x0e, 0x5d, 0x65, 0x05, 0x00, 0x00,
// 458 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0x41, 0x6f, 0xd3, 0x30,
0x14, 0xc7, 0x7b, 0x1a, 0x92, 0xd5, 0xb5, 0xcc, 0xdc, 0xca, 0x85, 0xf5, 0x04, 0x2d, 0x4b, 0x10,
0x7c, 0x01, 0xd6, 0xe5, 0xb0, 0x4a, 0x20, 0x4d, 0xeb, 0x2e, 0x80, 0xd0, 0x70, 0xb2, 0xa7, 0xf4,
0x81, 0x13, 0x07, 0xbf, 0x97, 0x21, 0xed, 0x23, 0xf1, 0x29, 0x51, 0x93, 0x26, 0xa9, 0x69, 0x5d,
0xca, 0x6e, 0xb5, 0xfd, 0xf3, 0xef, 0x5f, 0xbf, 0xf7, 0x14, 0xd1, 0xcf, 0x14, 0x31, 0xd8, 0xa0,
0xb0, 0x86, 0x8d, 0x7c, 0x96, 0xa1, 0xbe, 0x2f, 0xa9, 0x5e, 0x05, 0xf5, 0xd1, 0xa8, 0x9f, 0x98,
0x2c, 0x33, 0x79, 0xbd, 0x39, 0x92, 0x98, 0x33, 0xd8, 0x5c, 0xe9, 0xdb, 0x8c, 0xd2, 0xf5, 0xde,
0x09, 0x81, 0xbd, 0xc7, 0x04, 0xba, 0xad, 0xb7, 0xbf, 0x85, 0x38, 0xfa, 0x58, 0xdd, 0x97, 0x4a,
0x3c, 0xbd, 0xb0, 0xa0, 0x18, 0x2e, 0x8c, 0xd6, 0x90, 0x30, 0x9a, 0x5c, 0x06, 0x81, 0x93, 0xd4,
0x38, 0x83, 0xbf, 0xc1, 0x6b, 0xf8, 0x59, 0x02, 0xf1, 0xe8, 0xb9, 0xcb, 0xaf, 0xff, 0xd1, 0x82,
0x15, 0x97, 0x34, 0xee, 0xc9, 0xaf, 0x62, 0x10, 0x59, 0x53, 0x6c, 0x04, 0xbc, 0xf6, 0x04, 0xb8,
0xd8, 0x81, 0xfa, 0x58, 0x1c, 0x5f, 0x2a, 0xda, 0xb0, 0x4f, 0x3d, 0x76, 0x87, 0x6a, 0xe4, 0x63,
0x17, 0x5e, 0xd7, 0x2a, 0x98, 0x19, 0xa3, 0xaf, 0x81, 0x0a, 0x93, 0x13, 0x8c, 0x7b, 0xb2, 0x14,
0x32, 0x02, 0x4a, 0x2c, 0xc6, 0x9b, 0x75, 0x7a, 0xe3, 0x7b, 0xc6, 0x16, 0xda, 0xa4, 0x4d, 0x77,
0xa7, 0x75, 0x60, 0x7d, 0xb5, 0x58, 0xfd, 0x1c, 0xf7, 0xe4, 0x0f, 0x31, 0x5c, 0x2c, 0xcd, 0xaf,
0xee, 0x98, 0xbc, 0xa5, 0x73, 0xb9, 0x26, 0xef, 0xe5, 0xee, 0xbc, 0x05, 0x5b, 0xcc, 0xd3, 0x0f,
0x48, 0xbc, 0xf1, 0xc6, 0x5b, 0x31, 0xac, 0x1b, 0x7c, 0xa5, 0x2c, 0x63, 0xf5, 0xc0, 0xb3, 0xbd,
0x83, 0xd0, 0x72, 0x07, 0x36, 0xea, 0x8b, 0x38, 0x5e, 0x35, 0xb8, 0xd3, 0x4f, 0xf7, 0x8c, 0xc1,
0xff, 0xca, 0xbf, 0x89, 0xfe, 0xa5, 0xa2, 0xce, 0x3d, 0xf1, 0x0f, 0xc1, 0x96, 0xfa, 0xb0, 0x19,
0xb0, 0xe2, 0xa4, 0x69, 0x6c, 0x17, 0x13, 0xfe, 0x63, 0x04, 0xb6, 0xb2, 0x26, 0xbb, 0xb3, 0x5a,
0xce, 0x1d, 0x00, 0x14, 0x83, 0x55, 0x63, 0xdb, 0x53, 0xf2, 0xd6, 0xcc, 0xc1, 0x1e, 0xd3, 0xfe,
0x4f, 0x62, 0x70, 0xae, 0xb5, 0x49, 0x6e, 0x30, 0x03, 0x62, 0x95, 0x15, 0xf2, 0xd4, 0x13, 0x75,
0x43, 0xc6, 0x53, 0x39, 0x17, 0x69, 0xd5, 0x57, 0xe2, 0x49, 0xa5, 0x9e, 0x47, 0xf2, 0x85, 0xe7,
0xc2, 0x3c, 0x6a, 0x94, 0xa7, 0x7b, 0x88, 0xd6, 0xf8, 0x5d, 0x0c, 0xcf, 0x89, 0x30, 0xcd, 0x17,
0x90, 0x66, 0x90, 0xf3, 0x3c, 0x92, 0xaf, 0x3c, 0xf7, 0x5a, 0xae, 0x8b, 0x98, 0x1c, 0x82, 0x36,
0x59, 0xb3, 0xd9, 0xe7, 0xf7, 0x29, 0xf2, 0xb2, 0x8c, 0x57, 0x33, 0x17, 0x3e, 0xa0, 0xd6, 0xf8,
0xc0, 0x90, 0x2c, 0xc3, 0x5a, 0x72, 0x76, 0x87, 0xc4, 0x16, 0xe3, 0x92, 0xe1, 0x2e, 0x6c, 0x54,
0x61, 0x65, 0x0e, 0xeb, 0x6f, 0x74, 0x11, 0xc7, 0x47, 0xd5, 0xfa, 0xdd, 0x9f, 0x00, 0x00, 0x00,
0xff, 0xff, 0xa0, 0xb5, 0xeb, 0xf6, 0xd1, 0x05, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
......@@ -134,6 +136,7 @@ type MasterClient interface {
ShowPartitions(ctx context.Context, in *internalpb.ShowPartitionRequest, opts ...grpc.CallOption) (*servicepb.StringListResponse, error)
AllocTimestamp(ctx context.Context, in *internalpb.TsoRequest, opts ...grpc.CallOption) (*internalpb.TsoResponse, error)
AllocID(ctx context.Context, in *internalpb.IDRequest, opts ...grpc.CallOption) (*internalpb.IDResponse, error)
AssignSegmentID(ctx context.Context, in *internalpb.AssignSegIDRequest, opts ...grpc.CallOption) (*internalpb.AssignSegIDResponse, error)
}
type masterClient struct {
......@@ -252,6 +255,15 @@ func (c *masterClient) AllocID(ctx context.Context, in *internalpb.IDRequest, op
return out, nil
}
func (c *masterClient) AssignSegmentID(ctx context.Context, in *internalpb.AssignSegIDRequest, opts ...grpc.CallOption) (*internalpb.AssignSegIDResponse, error) {
out := new(internalpb.AssignSegIDResponse)
err := c.cc.Invoke(ctx, "/milvus.proto.master.Master/AssignSegmentID", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// MasterServer is the server API for Master service.
type MasterServer interface {
//*
......@@ -316,6 +328,7 @@ type MasterServer interface {
ShowPartitions(context.Context, *internalpb.ShowPartitionRequest) (*servicepb.StringListResponse, error)
AllocTimestamp(context.Context, *internalpb.TsoRequest) (*internalpb.TsoResponse, error)
AllocID(context.Context, *internalpb.IDRequest) (*internalpb.IDResponse, error)
AssignSegmentID(context.Context, *internalpb.AssignSegIDRequest) (*internalpb.AssignSegIDResponse, error)
}
// UnimplementedMasterServer can be embedded to have forward compatible implementations.
......@@ -358,6 +371,9 @@ func (*UnimplementedMasterServer) AllocTimestamp(ctx context.Context, req *inter
func (*UnimplementedMasterServer) AllocID(ctx context.Context, req *internalpb.IDRequest) (*internalpb.IDResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AllocID not implemented")
}
func (*UnimplementedMasterServer) AssignSegmentID(ctx context.Context, req *internalpb.AssignSegIDRequest) (*internalpb.AssignSegIDResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AssignSegmentID not implemented")
}
func RegisterMasterServer(s *grpc.Server, srv MasterServer) {
s.RegisterService(&_Master_serviceDesc, srv)
......@@ -579,6 +595,24 @@ func _Master_AllocID_Handler(srv interface{}, ctx context.Context, dec func(inte
return interceptor(ctx, in, info, handler)
}
func _Master_AssignSegmentID_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(internalpb.AssignSegIDRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MasterServer).AssignSegmentID(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/milvus.proto.master.Master/AssignSegmentID",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MasterServer).AssignSegmentID(ctx, req.(*internalpb.AssignSegIDRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Master_serviceDesc = grpc.ServiceDesc{
ServiceName: "milvus.proto.master.Master",
HandlerType: (*MasterServer)(nil),
......@@ -631,6 +665,10 @@ var _Master_serviceDesc = grpc.ServiceDesc{
MethodName: "AllocID",
Handler: _Master_AllocID_Handler,
},
{
MethodName: "AssignSegmentID",
Handler: _Master_AssignSegmentID_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "master.proto",
......
......@@ -66,15 +66,15 @@ func CreateProxy(ctx context.Context) (*Proxy, error) {
unmarshal := msgstream.NewUnmarshalDispatcher()
p.manipulationMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, bufSize)
p.manipulationMsgStream.SetPulsarCient(pulsarAddress)
p.manipulationMsgStream.SetPulsarClient(pulsarAddress)
p.manipulationMsgStream.CreatePulsarProducers(manipulationChannels)
p.queryMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, bufSize)
p.queryMsgStream.SetPulsarCient(pulsarAddress)
p.queryMsgStream.SetPulsarClient(pulsarAddress)
p.queryMsgStream.CreatePulsarProducers(queryChannels)
p.queryResultMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, bufSize)
p.queryResultMsgStream.SetPulsarCient(pulsarAddress)
p.queryResultMsgStream.SetPulsarClient(pulsarAddress)
p.queryResultMsgStream.CreatePulsarConsumers(queryResultChannels,
queryResultSubName,
unmarshal,
......
......@@ -50,19 +50,26 @@ func startMaster(ctx context.Context) {
metaRootPath := path.Join(rootPath, "meta")
opt := master.Option{
KVRootPath: kvRootPath,
MetaRootPath: metaRootPath,
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
KVRootPath: kvRootPath,
MetaRootPath: metaRootPath,
EtcdAddr: []string{etcdAddr},
PulsarAddr: "pulsar://localhost:6650",
ProxyIDs: []typeutil.UniqueID{1, 2},
PulsarProxyChannels: []string{"proxy1", "proxy2"},
PulsarProxySubName: "proxyTopics",
SoftTTBInterval: 300,
WriteIDs: []typeutil.UniqueID{3, 4},
PulsarWriteChannels: []string{"write3", "write4"},
PulsarWriteSubName: "writeTopics",
PulsarDMChannels: []string{"dm0", "dm1"},
PulsarK2SChannels: []string{"k2s0", "k2s1"},
DefaultRecordSize: 1024,
MinimumAssignSize: 1048576,
SegmentThreshold: 536870912,
SegmentExpireDuration: 2000,
NumOfChannel: 5,
NumOfQueryNode: 3,
StatsChannels: "statistic",
}
svr, err := master.CreateServer(ctx, &opt)
......@@ -289,7 +296,7 @@ func TestProxy_Search(t *testing.T) {
bufSize := 1024
queryResultMsgStream := msgstream.NewPulsarMsgStream(ctx, int64(bufSize))
pulsarAddress := "pulsar://localhost:6650"
queryResultMsgStream.SetPulsarCient(pulsarAddress)
queryResultMsgStream.SetPulsarClient(pulsarAddress)
assert.NotEqual(t, queryResultMsgStream, nil, "query result message stream should not be nil!")
queryResultMsgStream.CreatePulsarProducers(queryResultChannels)
......
......@@ -55,7 +55,7 @@ func newTimeTick(ctx context.Context,
pulsarAddress = "pulsar://" + pulsarAddress
producerChannels := []string{"timeTick"}
t.tickMsgStream.SetPulsarCient(pulsarAddress)
t.tickMsgStream.SetPulsarClient(pulsarAddress)
t.tickMsgStream.CreatePulsarProducers(producerChannels)
return t
}
......
......@@ -37,6 +37,7 @@ func (dsService *dataSyncService) initNodes() {
// TODO: add delete pipeline support
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
flowgraph.Params.Init()
var dmStreamNode Node = newDmInputNode(dsService.ctx)
var filterDmNode Node = newFilteredDmNode()
......
......@@ -160,7 +160,7 @@ func TestManipulationService_Start(t *testing.T) {
producerChannels := []string{"insert"}
insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
insertStream.SetPulsarCient(pulsarURL)
insertStream.SetPulsarClient(pulsarURL)
insertStream.CreatePulsarProducers(producerChannels)
var insertMsgStream msgstream.MsgStream = insertStream
......
package reader
import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
type deleteNode struct {
BaseNode
deleteMsg deleteMsg
......@@ -14,8 +16,8 @@ func (dNode *deleteNode) Operate(in []*Msg) []*Msg {
}
func newDeleteNode() *deleteNode {
maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength()
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism()
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
......
......@@ -5,6 +5,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/msgstream"
internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
type filterDmNode struct {
......@@ -54,8 +55,8 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
}
func newFilteredDmNode() *filterDmNode {
maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength()
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism()
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
......
......@@ -6,6 +6,7 @@ import (
"sync"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
type insertNode struct {
......@@ -126,8 +127,8 @@ func (iNode *insertNode) insert(insertData *InsertData, segmentID int64, wg *syn
}
func newInsertNode(replica *collectionReplica) *insertNode {
maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength()
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism()
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
......
package reader
import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
type key2SegNode struct {
BaseNode
key2SegMsg key2SegMsg
......@@ -14,8 +16,8 @@ func (ksNode *key2SegNode) Operate(in []*Msg) []*Msg {
}
func newKey2SegNode() *key2SegNode {
maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength()
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism()
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
......
......@@ -9,10 +9,10 @@ import (
)
func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
receiveBufSize := Params.dmReceiveBufSize()
receiveBufSize := Params.dmMsgStreamReceiveBufSize()
pulsarBufSize := Params.dmPulsarBufSize()
msgStreamURL, err := Params.pulsarAddress()
msgStreamURL, err := Params.PulsarAddress()
if err != nil {
log.Fatal(err)
}
......@@ -21,15 +21,12 @@ func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
consumeSubName := "insertSub"
insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
insertStream.SetPulsarCient(msgStreamURL)
insertStream.SetPulsarClient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
var stream msgstream.MsgStream = insertStream
maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism)
node := flowgraph.NewInputNode(&stream, "dmInputNode")
return node
}
package reader
import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
type schemaUpdateNode struct {
BaseNode
schemaUpdateMsg schemaUpdateMsg
......@@ -14,8 +16,8 @@ func (suNode *schemaUpdateNode) Operate(in []*Msg) []*Msg {
}
func newSchemaUpdateNode() *schemaUpdateNode {
maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength()
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism()
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
......
......@@ -2,6 +2,8 @@ package reader
import (
"log"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
type serviceTimeNode struct {
......@@ -33,8 +35,8 @@ func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
}
func newServiceTimeNode(replica *collectionReplica) *serviceTimeNode {
maxQueueLength := Params.flowGraphMaxQueueLength()
maxParallelism := Params.flowGraphMaxParallelism()
maxQueueLength := flowgraph.Params.FlowGraphMaxQueueLength()
maxParallelism := flowgraph.Params.FlowGraphMaxParallelism()
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
......
......@@ -126,8 +126,8 @@ func isSegmentChannelRangeInQueryNodeChannelRange(segment *etcdpb.SegmentMeta) b
}
Params.Init()
var queryNodeChannelStart = Params.topicStart()
var queryNodeChannelEnd = Params.topicEnd()
var queryNodeChannelStart = Params.TopicStart()
var queryNodeChannelEnd = Params.TopicEnd()
if segment.ChannelStart >= int32(queryNodeChannelStart) && segment.ChannelEnd <= int32(queryNodeChannelEnd) {
return true
......
......@@ -20,7 +20,7 @@ func (p *ParamTable) Init() {
}
}
func (p *ParamTable) pulsarAddress() (string, error) {
func (p *ParamTable) PulsarAddress() (string, error) {
url, err := p.Load("_PulsarAddress")
if err != nil {
panic(err)
......@@ -28,7 +28,7 @@ func (p *ParamTable) pulsarAddress() (string, error) {
return "pulsar://" + url, nil
}
func (p *ParamTable) queryNodeID() int {
func (p *ParamTable) QueryNodeID() int {
queryNodeID, err := p.Load("reader.clientid")
if err != nil {
panic(err)
......@@ -40,7 +40,7 @@ func (p *ParamTable) queryNodeID() int {
return id
}
func (p *ParamTable) topicStart() int {
func (p *ParamTable) TopicStart() int {
topicStart, err := p.Load("reader.topicstart")
if err != nil {
panic(err)
......@@ -52,7 +52,7 @@ func (p *ParamTable) topicStart() int {
return topicStartNum
}
func (p *ParamTable) topicEnd() int {
func (p *ParamTable) TopicEnd() int {
topicEnd, err := p.Load("reader.topicend")
if err != nil {
panic(err)
......@@ -64,10 +64,9 @@ func (p *ParamTable) topicEnd() int {
return topicEndNum
}
// advanced params
// stats
func (p *ParamTable) statsPublishInterval() int {
timeInterval, err := p.Load("reader.stats.publishInterval")
// private advanced params
func (p *ParamTable) statsServiceTimeInterval() int {
timeInterval, err := p.Load("service.statsServiceTimeInterval")
if err != nil {
panic(err)
}
......@@ -78,34 +77,20 @@ func (p *ParamTable) statsPublishInterval() int {
return interval
}
// dataSync:
func (p *ParamTable) flowGraphMaxQueueLength() int32 {
queueLength, err := p.Load("reader.dataSync.flowGraph.maxQueueLength")
func (p *ParamTable) statsMsgStreamReceiveBufSize() int64 {
revBufSize, err := p.Load("msgStream.receiveBufSize.statsMsgStream")
if err != nil {
panic(err)
}
length, err := strconv.Atoi(queueLength)
if err != nil {
panic(err)
}
return int32(length)
}
func (p *ParamTable) flowGraphMaxParallelism() int32 {
maxParallelism, err := p.Load("reader.dataSync.flowGraph.maxParallelism")
if err != nil {
panic(err)
}
maxPara, err := strconv.Atoi(maxParallelism)
bufSize, err := strconv.Atoi(revBufSize)
if err != nil {
panic(err)
}
return int32(maxPara)
return int64(bufSize)
}
// msgStream
func (p *ParamTable) dmReceiveBufSize() int64 {
revBufSize, err := p.Load("reader.msgStream.dm.recvBufSize")
func (p *ParamTable) dmMsgStreamReceiveBufSize() int64 {
revBufSize, err := p.Load("msgStream.receiveBufSize.dmMsgStream")
if err != nil {
panic(err)
}
......@@ -116,20 +101,20 @@ func (p *ParamTable) dmReceiveBufSize() int64 {
return int64(bufSize)
}
func (p *ParamTable) dmPulsarBufSize() int64 {
pulsarBufSize, err := p.Load("reader.msgStream.dm.pulsarBufSize")
func (p *ParamTable) searchMsgStreamReceiveBufSize() int64 {
revBufSize, err := p.Load("msgStream.receiveBufSize.searchMsgStream")
if err != nil {
panic(err)
}
bufSize, err := strconv.Atoi(pulsarBufSize)
bufSize, err := strconv.Atoi(revBufSize)
if err != nil {
panic(err)
}
return int64(bufSize)
}
func (p *ParamTable) searchReceiveBufSize() int64 {
revBufSize, err := p.Load("reader.msgStream.search.recvBufSize")
func (p *ParamTable) searchResultMsgStreamReceiveBufSize() int64 {
revBufSize, err := p.Load("msgStream.receiveBufSize.searchResultMsgStream")
if err != nil {
panic(err)
}
......@@ -141,7 +126,7 @@ func (p *ParamTable) searchReceiveBufSize() int64 {
}
func (p *ParamTable) searchPulsarBufSize() int64 {
pulsarBufSize, err := p.Load("reader.msgStream.search.pulsarBufSize")
pulsarBufSize, err := p.Load("msgStream.pulsarBufSize.search")
if err != nil {
panic(err)
}
......@@ -152,24 +137,12 @@ func (p *ParamTable) searchPulsarBufSize() int64 {
return int64(bufSize)
}
func (p *ParamTable) searchResultReceiveBufSize() int64 {
revBufSize, err := p.Load("reader.msgStream.searchResult.recvBufSize")
if err != nil {
panic(err)
}
bufSize, err := strconv.Atoi(revBufSize)
if err != nil {
panic(err)
}
return int64(bufSize)
}
func (p *ParamTable) statsReceiveBufSize() int64 {
revBufSize, err := p.Load("reader.msgStream.stats.recvBufSize")
func (p *ParamTable) dmPulsarBufSize() int64 {
pulsarBufSize, err := p.Load("msgStream.pulsarBufSize.dm")
if err != nil {
panic(err)
}
bufSize, err := strconv.Atoi(revBufSize)
bufSize, err := strconv.Atoi(pulsarBufSize)
if err != nil {
panic(err)
}
......
......@@ -12,56 +12,56 @@ func TestParamTable_Init(t *testing.T) {
func TestParamTable_PulsarAddress(t *testing.T) {
Params.Init()
address, err := Params.pulsarAddress()
address, err := Params.PulsarAddress()
assert.NoError(t, err)
assert.Equal(t, address, "pulsar://localhost:6650")
}
func TestParamTable_QueryNodeID(t *testing.T) {
Params.Init()
id := Params.queryNodeID()
id := Params.QueryNodeID()
assert.Equal(t, id, 0)
}
func TestParamTable_TopicStart(t *testing.T) {
Params.Init()
topicStart := Params.topicStart()
topicStart := Params.TopicStart()
assert.Equal(t, topicStart, 0)
}
func TestParamTable_TopicEnd(t *testing.T) {
Params.Init()
topicEnd := Params.topicEnd()
topicEnd := Params.TopicEnd()
assert.Equal(t, topicEnd, 128)
}
func TestParamTable_statsServiceTimeInterval(t *testing.T) {
Params.Init()
interval := Params.statsPublishInterval()
interval := Params.statsServiceTimeInterval()
assert.Equal(t, interval, 1000)
}
func TestParamTable_statsMsgStreamReceiveBufSize(t *testing.T) {
Params.Init()
bufSize := Params.statsReceiveBufSize()
bufSize := Params.statsMsgStreamReceiveBufSize()
assert.Equal(t, bufSize, int64(64))
}
func TestParamTable_dmMsgStreamReceiveBufSize(t *testing.T) {
Params.Init()
bufSize := Params.dmReceiveBufSize()
bufSize := Params.dmMsgStreamReceiveBufSize()
assert.Equal(t, bufSize, int64(1024))
}
func TestParamTable_searchMsgStreamReceiveBufSize(t *testing.T) {
Params.Init()
bufSize := Params.searchReceiveBufSize()
bufSize := Params.searchMsgStreamReceiveBufSize()
assert.Equal(t, bufSize, int64(512))
}
func TestParamTable_searchResultMsgStreamReceiveBufSize(t *testing.T) {
Params.Init()
bufSize := Params.searchResultReceiveBufSize()
bufSize := Params.searchResultMsgStreamReceiveBufSize()
assert.Equal(t, bufSize, int64(64))
}
......@@ -76,15 +76,3 @@ func TestParamTable_dmPulsarBufSize(t *testing.T) {
bufSize := Params.dmPulsarBufSize()
assert.Equal(t, bufSize, int64(1024))
}
func TestParamTable_flowGraphMaxQueueLength(t *testing.T) {
Params.Init()
length := Params.flowGraphMaxQueueLength()
assert.Equal(t, length, int32(1024))
}
func TestParamTable_flowGraphMaxParallelism(t *testing.T) {
Params.Init()
maxParallelism := Params.flowGraphMaxParallelism()
assert.Equal(t, maxParallelism, int32(1024))
}
......@@ -35,10 +35,10 @@ type SearchResult struct {
}
func newSearchService(ctx context.Context, replica *collectionReplica) *searchService {
receiveBufSize := Params.searchReceiveBufSize()
receiveBufSize := Params.searchMsgStreamReceiveBufSize()
pulsarBufSize := Params.searchPulsarBufSize()
msgStreamURL, err := Params.pulsarAddress()
msgStreamURL, err := Params.PulsarAddress()
if err != nil {
log.Fatal(err)
}
......@@ -46,14 +46,14 @@ func newSearchService(ctx context.Context, replica *collectionReplica) *searchSe
consumeChannels := []string{"search"}
consumeSubName := "subSearch"
searchStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
searchStream.SetPulsarCient(msgStreamURL)
searchStream.SetPulsarClient(msgStreamURL)
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
var inputStream msgstream.MsgStream = searchStream
producerChannels := []string{"searchResult"}
searchResultStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
searchResultStream.SetPulsarCient(msgStreamURL)
searchResultStream.SetPulsarClient(msgStreamURL)
searchResultStream.CreatePulsarProducers(producerChannels)
var outputStream msgstream.MsgStream = searchResultStream
......
......@@ -154,7 +154,7 @@ func TestSearch_Search(t *testing.T) {
insertProducerChannels := []string{"insert"}
insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
insertStream.SetPulsarCient(pulsarURL)
insertStream.SetPulsarClient(pulsarURL)
insertStream.CreatePulsarProducers(insertProducerChannels)
var insertMsgStream msgstream.MsgStream = insertStream
......@@ -172,7 +172,7 @@ func TestSearch_Search(t *testing.T) {
searchProducerChannels := []string{"search"}
searchStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
searchStream.SetPulsarCient(pulsarURL)
searchStream.SetPulsarClient(pulsarURL)
searchStream.CreatePulsarProducers(searchProducerChannels)
var searchRawData []byte
......
......@@ -626,7 +626,7 @@ func TestSegment_segmentSearch(t *testing.T) {
const receiveBufSize = 1024
searchProducerChannels := []string{"search"}
searchStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
searchStream.SetPulsarCient(pulsarURL)
searchStream.SetPulsarClient(pulsarURL)
searchStream.CreatePulsarProducers(searchProducerChannels)
var searchRawData []byte
......
......@@ -28,18 +28,18 @@ func newStatsService(ctx context.Context, replica *collectionReplica) *statsServ
}
func (sService *statsService) start() {
sleepTimeInterval := Params.statsPublishInterval()
receiveBufSize := Params.statsReceiveBufSize()
sleepTimeInterval := Params.statsServiceTimeInterval()
receiveBufSize := Params.statsMsgStreamReceiveBufSize()
// start pulsar
msgStreamURL, err := Params.pulsarAddress()
msgStreamURL, err := Params.PulsarAddress()
if err != nil {
log.Fatal(err)
}
producerChannels := []string{"statistic"}
statsStream := msgstream.NewPulsarMsgStream(sService.ctx, receiveBufSize)
statsStream.SetPulsarCient(msgStreamURL)
statsStream.SetPulsarClient(msgStreamURL)
statsStream.CreatePulsarProducers(producerChannels)
var statsMsgStream msgstream.MsgStream = statsStream
......
......@@ -174,7 +174,7 @@ func TestSegmentManagement_SegmentStatisticService(t *testing.T) {
producerChannels := []string{"statistic"}
statsStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
statsStream.SetPulsarCient(pulsarURL)
statsStream.SetPulsarClient(pulsarURL)
statsStream.CreatePulsarProducers(producerChannels)
var statsMsgStream msgstream.MsgStream = statsStream
......
......@@ -45,7 +45,10 @@ func (inNode *InputNode) Operate(in []*Msg) []*Msg {
return []*Msg{&msgStreamMsg}
}
func NewInputNode(inStream *msgstream.MsgStream, nodeName string, maxQueueLength int32, maxParallelism int32) *InputNode {
func NewInputNode(inStream *msgstream.MsgStream, nodeName string) *InputNode {
maxQueueLength := Params.FlowGraphMaxQueueLength()
maxParallelism := Params.FlowGraphMaxParallelism()
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(maxQueueLength)
baseNode.SetMaxParallelism(maxParallelism)
......
package flowgraph
import (
"strconv"
"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
)
type ParamTable struct {
paramtable.BaseTable
}
var Params ParamTable
func (p *ParamTable) Init() {
p.BaseTable.Init()
err := p.LoadYaml("advanced/flow_graph.yaml")
if err != nil {
panic(err)
}
}
func (p *ParamTable) FlowGraphMaxQueueLength() int32 {
queueLength, err := p.Load("flowGraph.maxQueueLength")
if err != nil {
panic(err)
}
length, err := strconv.Atoi(queueLength)
if err != nil {
panic(err)
}
return int32(length)
}
func (p *ParamTable) FlowGraphMaxParallelism() int32 {
maxParallelism, err := p.Load("flowGraph.maxParallelism")
if err != nil {
panic(err)
}
maxPara, err := strconv.Atoi(maxParallelism)
if err != nil {
panic(err)
}
return int32(maxPara)
}
package flowgraph
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestParamTable_flowGraphMaxQueueLength(t *testing.T) {
Params.Init()
length := Params.FlowGraphMaxQueueLength()
assert.Equal(t, length, int32(1024))
}
func TestParamTable_flowGraphMaxParallelism(t *testing.T) {
Params.Init()
maxParallelism := Params.FlowGraphMaxParallelism()
assert.Equal(t, maxParallelism, int32(1024))
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册