提交 61540012 编写于 作者: X xige-16 提交者: yefu.chen

Set default config in config.yaml

Signed-off-by: Nxige-16 <xi.ge@zilliz.com>
上级 efd3eaad
......@@ -17,6 +17,9 @@ type MasterConfig struct {
PulsarMoniterInterval int32
PulsarTopic string
SegmentThreshole float32
ProxyIdList []int64
QueryNodeNum int
WriteNodeNum int
}
type EtcdConfig struct {
......@@ -42,7 +45,6 @@ type PulsarConfig struct {
Address string
Port int32
TopicNum int
NodeNum int
}
//type ProxyConfig struct {
......@@ -52,13 +54,13 @@ type PulsarConfig struct {
//}
type Reader struct {
ClientId int
StopFlag int64
ReaderQueueSize int
SearchChanSize int
Key2SegChanSize int
InsertTopicStart int
InsertTopicEnd int
ClientId int
StopFlag int64
ReaderQueueSize int
SearchChanSize int
Key2SegChanSize int
TopicStart int
TopicEnd int
}
type Writer struct {
......@@ -66,8 +68,8 @@ type Writer struct {
StopFlag int64
ReaderQueueSize int
SearchByIdChanSize int
InsertTopicStart int
InsertTopicEnd int
TopicStart int
TopicEnd int
}
type ServerConfig struct {
......
......@@ -15,6 +15,9 @@ master:
pulsarmoniterinterval: 1
pulsartopic: "monitor-topic"
segmentthreshole: 10000
proxyidlist: [0]
querynodenum: 1
writenodenum: 1
etcd:
address: localhost
......@@ -28,36 +31,35 @@ timesync:
storage:
driver: TIKV
address: localhost
port: 0
accesskey: ab
secretkey: dd
port: 2379
accesskey:
secretkey:
pulsar:
address: localhost
port: 6650
topicnum: 128
nodenum: 1
reader:
clientid: 1
clientid: 0
stopflag: -1
readerqueuesize: 1024
readerqueuesize: 10000
searchchansize: 10000
key2segchansize: 10000
inserttopicstart: 0
inserttopicend: 128
topicstart: 0
topicend: 128
writer:
clientid: 1
clientid: 0
stopflag: -2
readerqueuesize: 1024
searchbyidchansize: 10000
insertopicstart: 0
inserttopicend: 128
topicstart: 0
topicend: 128
proxy:
timezone: UTC+8
query_node_num: 1
proxy_id: 0
network:
address: 0.0.0.0
......
......@@ -615,9 +615,9 @@ SegmentNaive::GetMemoryUsageInBytes() {
total_bytes += vec_ptr->IndexSize();
}
}
int64_t ins_n = (record_.reserved + DefaultElementPerChunk - 1) & (DefaultElementPerChunk - 1);
int64_t ins_n = (record_.reserved + DefaultElementPerChunk - 1) & ~(DefaultElementPerChunk - 1);
total_bytes += ins_n * (schema_->get_total_sizeof() + 16 + 1);
int64_t del_n = (deleted_record_.reserved + DefaultElementPerChunk - 1) & (DefaultElementPerChunk - 1);
int64_t del_n = (deleted_record_.reserved + DefaultElementPerChunk - 1) & ~(DefaultElementPerChunk - 1);
total_bytes += del_n * (16 * 2);
return total_bytes;
}
......
......@@ -289,6 +289,9 @@ TEST(CApiTest, GetMemoryUsageInBytesTest) {
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto old_memory_usage_size = GetMemoryUsageInBytes(segment);
std::cout << "old_memory_usage_size = " << old_memory_usage_size << std::endl;
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
......@@ -317,6 +320,8 @@ TEST(CApiTest, GetMemoryUsageInBytesTest) {
auto memory_usage_size = GetMemoryUsageInBytes(segment);
std::cout << "new_memory_usage_size = " << memory_usage_size << std::endl;
assert(memory_usage_size == 1898459);
DeleteCollection(collection);
......
......@@ -70,11 +70,8 @@ ConfigMgr::ConfigMgr() {
config_list_ = {
/* general */
{"timezone",
CreateStringConfig("timezone", false, &config.timezone.value, "UTC+8", nullptr, nullptr)},
{"query_node_num", CreateIntegerConfig("query_node_num", false, 0, 1023, &config.query_node_num.value,
1, nullptr, nullptr)},
{"timezone", CreateStringConfig("timezone", false, &config.timezone.value, "UTC+8", nullptr, nullptr)},
{"proxy_id", CreateIntegerConfig("proxy_id", false, 0, 1024, &config.proxy_id.value, 0, nullptr, nullptr)},
/* network */
{"network.address", CreateStringConfig("network.address", false, &config.network.address.value,
......@@ -90,13 +87,13 @@ ConfigMgr::ConfigMgr() {
6650, nullptr, nullptr)},
{"pulsar.topicnum", CreateIntegerConfig("pulsar.topicnum", false, 0, 1024, &config.pulsar.topicnum.value,
1024, nullptr, nullptr)},
{"pulsar.nodenum", CreateIntegerConfig("pulsar.nodenum", false, 0, 1024, &config.pulsar.nodenum.value,
1, nullptr, nullptr)},
/* master */
{"master.address", CreateStringConfig("master.address", false, &config.master.address.value,
"localhost", nullptr, nullptr)},
{"master.port", CreateIntegerConfig("master.port", false, 0, 65535, &config.master.port.value,
53100, nullptr, nullptr)},
{"master.querynodenum", CreateIntegerConfig("master.querynodenum", false, 0, 1023, &config.master.query_node_num.value,
2, nullptr, nullptr)},
/* etcd */
{"etcd.address", CreateStringConfig("etcd.address", false, &config.etcd.address.value, "localhost", nullptr,
......
......@@ -65,7 +65,8 @@ struct ServerConfig {
using Floating = ConfigValue<double>;
String timezone{"unknown"};
Integer query_node_num{1};
// Integer query_node_num{1};
Integer proxy_id{0};
struct Network {
String address{"unknown"};
......@@ -76,12 +77,12 @@ struct ServerConfig {
String address{"localhost"};
Integer port{6650};
Integer topicnum{1024};
Integer nodenum{1};
}pulsar;
struct Master{
String address{"localhost"};
Integer port{53100};
Integer port{53100};
Integer query_node_num{2};
}master;
struct Etcd{
......
......@@ -16,7 +16,8 @@ MsgClientV2::MsgClientV2(int64_t client_id,
const std::string &service_url,
const uint32_t mut_parallelism,
const pulsar::ClientConfiguration &config)
: client_id_(client_id), service_url_(service_url), mut_parallelism_(mut_parallelism) {}
: client_id_(client_id), service_url_(service_url), mut_parallelism_(mut_parallelism) {
}
Status MsgClientV2::Init(const std::string &insert_delete,
const std::string &search,
......@@ -35,16 +36,17 @@ Status MsgClientV2::Init(const std::string &insert_delete,
time_sync_producer_ = std::make_shared<MsgProducer>(pulsar_client, time_sync);
for (auto i = 0; i < mut_parallelism_; i++) {
// std::string topic = insert_delete + "-" + std::to_string(i);
std::string insert_or_delete_topic = insert_delete + "-" + std::to_string(i);
paralle_mut_producers_.emplace_back(std::make_shared<MsgProducer>(pulsar_client,
insert_delete,
insert_or_delete_topic,
producerConfiguration));
}
//create pulsar consumer
std::string subscribe_name = std::to_string(CommonUtil::RandomUINT64());
consumer_ = std::make_shared<MsgConsumer>(pulsar_client, search_result + subscribe_name);
auto result = consumer_->subscribe(search_result);
std::string search_topic = search_result + "-" + std::to_string(config.proxy_id());
auto result = consumer_->subscribe(search_topic);
if (result != pulsar::Result::ResultOk) {
return Status(SERVER_UNEXPECTED_ERROR,
"Pulsar message client init occur error, " + std::string(pulsar::strResult(result)));
......@@ -53,7 +55,7 @@ Status MsgClientV2::Init(const std::string &insert_delete,
}
int64_t GetQueryNodeNum() {
return config.query_node_num();
return config.master.query_node_num();
}
Status
......@@ -161,10 +163,11 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request,
uint64_t timestamp)> &segment_id) {
// may have retry policy?
auto row_count = request.rows_data_size();
auto stats = std::vector<Status>(ParallelNum);
std::atomic_uint64_t msg_sended = 0;
auto topic_num = config.pulsar.topicnum();
#pragma omp parallel for default(none), shared(row_count, request, timestamp, stats, segment_id, msg_sended, topic_num), num_threads(ParallelNum)
auto stats = std::vector<Status>(topic_num);
std::atomic_uint64_t msg_sended = 0;
#pragma omp parallel for default(none), shared(row_count, request, timestamp, stats, segment_id, msg_sended, topic_num), num_threads(topic_num)
for (auto i = 0; i < row_count; i++) {
milvus::grpc::InsertOrDeleteMsg mut_msg;
int this_thread = omp_get_thread_num();
......@@ -211,10 +214,11 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request,
uint64_t channel_id,
uint64_t timestamp)> &segment_id) {
auto row_count = request.id_array_size();
auto stats = std::vector<Status>(ParallelNum);
auto topicnum = config.pulsar.topicnum();
auto stats = std::vector<Status>(topicnum);
std::atomic_uint64_t msg_sended = 0;
#pragma omp parallel for default(none), shared( request, timestamp, stats, segment_id, msg_sended, row_count), num_threads(ParallelNum)
#pragma omp parallel for default(none), shared( request, timestamp, stats, segment_id, msg_sended, row_count, topicnum), num_threads(topicnum)
for (auto i = 0; i < row_count; i++) {
milvus::grpc::InsertOrDeleteMsg mut_msg;
mut_msg.set_op(milvus::grpc::OpType::DELETE);
......@@ -223,7 +227,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request,
mut_msg.set_collection_name(request.collection_name());
mut_msg.set_timestamp(timestamp);
uint64_t uid = request.id_array(i);
auto channel_id = makeHash(&uid, sizeof(uint64_t)) % 1024;
auto channel_id = makeHash(&uid, sizeof(uint64_t)) % topicnum;
mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp));
int this_thread = omp_get_thread_num();
......@@ -253,7 +257,7 @@ Status MsgClientV2::SendQueryMessage(const milvus::grpc::SearchParam &request, u
search_msg.set_collection_name(request.collection_name());
search_msg.set_uid(query_id);
//TODO: get client id from master
search_msg.set_client_id(1);
search_msg.set_client_id(client_id_);
search_msg.set_timestamp(timestamp);
search_msg.set_dsl(request.dsl());
......
#pragma once
#include "config/ServerConfig.h"
#include "utils/Status.h"
#include "Producer.h"
#include "Consumer.h"
#include "grpc/message.pb.h"
namespace milvus::message_client {
constexpr uint32_t ParallelNum = 12 * 20;
//constexpr uint32_t ParallelNum = 12 * 20;
class MsgClientV2 {
public:
MsgClientV2(int64_t client_id,
const std::string &service_url,
const uint32_t mut_parallelism = ParallelNum,
const uint32_t mut_parallelism,
const pulsar::ClientConfiguration &config = pulsar::ClientConfiguration());
~MsgClientV2();
......
......@@ -12,8 +12,8 @@ MessageWrapper &MessageWrapper::GetInstance() {
Status MessageWrapper::Init() {
std::string pulsar_server_addr
(std::string{"pulsar://"} + config.pulsar.address() + ":" + std::to_string(config.pulsar.port()));
int64_t client_id = 0;
msg_client_ = std::make_shared<message_client::MsgClientV2>(client_id, pulsar_server_addr);
int client_id = config.proxy_id();
msg_client_ = std::make_shared<message_client::MsgClientV2>(client_id, pulsar_server_addr, config.pulsar.topicnum());
auto status = msg_client_->Init("InsertOrDelete", "Search", "TimeSync", "SearchById", "SearchResult");
return status;
}
......
......@@ -10,6 +10,7 @@
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include <chrono>
#include "config/ServerConfig.h"
#include "TimeSync.h"
#include "message_client/Producer.h"
......@@ -30,6 +31,7 @@ TimeSync::TimeSync(int64_t id,
for (;;) {
if (this->stop_) break;
this->sync_msg_.set_peer_id(config.proxy_id());
this->sync_msg_.set_timestamp(this->timestamp_());
this->sync_msg_.set_sync_type(milvus::grpc::READ);
auto rst = producer.send(sync_msg_.SerializeAsString());
......
#include <gtest/gtest.h>
#include "src/message_client/ClientV2.h"
#include "config/ServerConfig.h"
TEST(CLIENT_CPP, GetResult) {
// auto client= std::make_shared<milvus::message_client::MsgClient>("pulsar://localhost:6650");
......@@ -73,7 +74,7 @@ TEST(CLIENT_CPP, GetResult) {
// producer.send(fake_message2.SerializeAsString());
int64_t query_id = 10;
milvus::message_client::MsgClientV2 client_v2(1, "pulsar://localhost:6650");
milvus::message_client::MsgClientV2 client_v2(1, "pulsar://localhost:6650", milvus::config.pulsar.topicnum());
auto init_status = client_v2.Init("insert_delete", "search", "time_sync", "result", "result");
// client_v2.SendQueryMessage();
......
......@@ -23,7 +23,8 @@ type MessageClient struct {
// pulsar
client pulsar.Client
searchResultProducer pulsar.Producer
//searchResultProducer pulsar.Producer
searchResultProducers map[int64]pulsar.Producer
segmentsStatisticProducer pulsar.Producer
searchConsumer pulsar.Consumer
key2segConsumer pulsar.Consumer
......@@ -56,9 +57,9 @@ func (mc *MessageClient) TimeSyncEnd() uint64 {
return mc.timestampBatchEnd
}
func (mc *MessageClient) SendResult(ctx context.Context, msg msgpb.QueryResult) {
func (mc *MessageClient) SendResult(ctx context.Context, msg msgpb.QueryResult, producerKey int64) {
var msgBuffer, _ = proto.Marshal(&msg)
if _, err := mc.searchResultProducer.Send(ctx, &pulsar.ProducerMessage{
if _, err := mc.searchResultProducers[producerKey].Send(ctx, &pulsar.ProducerMessage{
Payload: msgBuffer,
}); err != nil {
log.Fatal(err)
......@@ -158,7 +159,14 @@ func (mc *MessageClient) InitClient(url string) {
mc.MessageClientID = conf.Config.Reader.ClientId
//create producer
mc.searchResultProducer = mc.creatProducer("SearchResult")
mc.searchResultProducers = make(map[int64]pulsar.Producer)
proxyIdList := conf.Config.Master.ProxyIdList
for _, key := range proxyIdList{
topic := "SearchResult-"
topic = topic + strconv.Itoa(int(key))
mc.searchResultProducers[key] = mc.creatProducer(topic)
}
//mc.searchResultProducer = mc.creatProducer("SearchResult")
mc.segmentsStatisticProducer = mc.creatProducer("SegmentsStatistic")
//create consumer
......@@ -176,15 +184,13 @@ func (mc *MessageClient) InitClient(url string) {
timeSyncTopic := "TimeSync"
timeSyncSubName := "reader" + strconv.Itoa(mc.MessageClientID)
readTopics := make([]string, 0)
for i := conf.Config.Reader.InsertTopicStart; i < conf.Config.Reader.InsertTopicEnd; i++ {
str := "InsertOrDelete-partition-"
for i := conf.Config.Reader.TopicStart; i < conf.Config.Reader.TopicEnd; i++ {
str := "InsertOrDelete-"
str = str + strconv.Itoa(i)
readTopics = append(readTopics, str)
}
readSubName := "reader" + strconv.Itoa(mc.MessageClientID)
// TODO::read proxy conf from config.yaml
proxyIdList := []int64{0}
readerQueueSize := timesync.WithReaderQueueSize(conf.Config.Reader.ReaderQueueSize)
timeSync, err := timesync.NewReaderTimeSync(timeSyncTopic,
timeSyncSubName,
......@@ -205,7 +211,9 @@ func (mc *MessageClient) InitClient(url string) {
func (mc *MessageClient) Close() {
mc.client.Close()
mc.searchResultProducer.Close()
for key, _ := range mc.searchResultProducers {
mc.searchResultProducers[key].Close()
}
mc.segmentsStatisticProducer.Close()
mc.searchConsumer.Close()
mc.key2segConsumer.Close()
......
......@@ -104,6 +104,7 @@ func (node *QueryNode) processSegmentCreate(id string, value string) {
}
printSegmentStruct(segment)
// TODO: fix this after channel range config finished
//if !isSegmentChannelRangeInQueryNodeChannelRange(segment) {
// return
//}
......@@ -117,7 +118,6 @@ func (node *QueryNode) processSegmentCreate(id string, value string) {
newSegment := partition.NewSegment(newSegmentID)
newSegment.SegmentStatus = SegmentOpened
newSegment.SegmentCloseTime = segment.CloseTimeStamp
partition.OpenedSegments = append(partition.OpenedSegments, newSegment)
node.SegmentsMap[newSegmentID] = newSegment
}
}
......@@ -147,6 +147,7 @@ func (node *QueryNode) processSegmentModify(id string, value string) {
}
printSegmentStruct(segment)
// TODO: fix this after channel range config finished
//if !isSegmentChannelRangeInQueryNodeChannelRange(segment) {
// return
//}
......
......@@ -16,8 +16,7 @@ import "C"
type Partition struct {
PartitionPtr C.CPartition
PartitionName string
OpenedSegments []*Segment
ClosedSegments []*Segment
Segments []*Segment
}
func (p *Partition) NewSegment(segmentId int64) *Segment {
......@@ -28,7 +27,7 @@ func (p *Partition) NewSegment(segmentId int64) *Segment {
segmentPtr := C.NewSegment(p.PartitionPtr, C.ulong(segmentId))
var newSegment = &Segment{SegmentPtr: segmentPtr, SegmentId: segmentId}
p.OpenedSegments = append(p.OpenedSegments, newSegment)
p.Segments = append(p.Segments, newSegment)
return newSegment
}
......
......@@ -294,6 +294,7 @@ func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) {
assert.NotEqual(nil, 0, timeRange.timestampMax)
if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 {
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
continue
}
......
......@@ -3,6 +3,7 @@ package reader
import (
"context"
"github.com/czs007/suvlim/reader/message_client"
"log"
"sync"
)
......@@ -15,11 +16,17 @@ func StartQueryNode(pulsarURL string) {
ctx := context.Background()
// Segments Services
//go qn.SegmentManagementService()
go qn.SegmentManagementService()
go qn.SegmentStatisticService()
wg := sync.WaitGroup{}
qn.InitFromMeta()
err := qn.InitFromMeta()
if err != nil {
log.Printf("Init query node from meta failed")
return
}
wg.Add(3)
go qn.RunMetaService(ctx, &wg)
go qn.RunInsertDelete(&wg)
......
......@@ -16,7 +16,7 @@ type SearchResult struct {
func (node *QueryNode) PublishSearchResult(results *msgPb.QueryResult) msgPb.Status {
var ctx = context.Background()
node.messageClient.SendResult(ctx, *results)
node.messageClient.SendResult(ctx, *results, results.ClientId)
return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS}
}
......@@ -31,7 +31,7 @@ func (node *QueryNode) PublishFailedSearchResult() msgPb.Status {
var ctx = context.Background()
node.messageClient.SendResult(ctx, results)
node.messageClient.SendResult(ctx, results, results.ClientId)
return msgPb.Status{ErrorCode: msgPb.ErrorCode_SUCCESS}
}
......
......@@ -73,6 +73,8 @@ func (s *Segment) CloseSegment(collection* Collection) error {
int
Close(CSegmentBase c_segment);
*/
fmt.Println("Closing segment :", s.SegmentId)
var status = C.Close(s.SegmentPtr)
s.SegmentStatus = SegmentClosed
......@@ -82,11 +84,13 @@ func (s *Segment) CloseSegment(collection* Collection) error {
// Build index after closing segment
s.SegmentStatus = SegmentIndexing
fmt.Println("Building index...")
s.buildIndex(collection)
// TODO: remove redundant segment indexed status
// Change segment status to indexed
s.SegmentStatus = SegmentIndexed
fmt.Println("Segment closed and indexed")
return nil
}
......
......@@ -13,20 +13,19 @@ func (node *QueryNode) SegmentsManagement() {
//node.queryNodeTimeSync.UpdateTSOTimeSync()
//var timeNow = node.queryNodeTimeSync.TSOTimeSync
timeNow := node.messageClient.GetTimeNow()
timeNow := node.messageClient.GetTimeNow() >> 18
for _, collection := range node.Collections {
for _, partition := range collection.Partitions {
for _, oldSegment := range partition.OpenedSegments {
// TODO: check segment status
if timeNow >= oldSegment.SegmentCloseTime {
// close old segment and move it into partition.ClosedSegments
if oldSegment.SegmentStatus != SegmentOpened {
log.Println("Never reach here, Opened segment cannot be closed")
continue
}
go oldSegment.CloseSegment(collection)
partition.ClosedSegments = append(partition.ClosedSegments, oldSegment)
for _, segment := range partition.Segments {
if segment.SegmentStatus != SegmentOpened {
log.Println("Segment have been closed")
continue
}
fmt.Println("timeNow = ", timeNow, "SegmentCloseTime = ", segment.SegmentCloseTime)
if timeNow >= segment.SegmentCloseTime {
go segment.CloseSegment(collection)
}
}
}
......@@ -34,7 +33,7 @@ func (node *QueryNode) SegmentsManagement() {
}
func (node *QueryNode) SegmentManagementService() {
sleepMillisecondTime := 1000
sleepMillisecondTime := 3000
fmt.Println("do segments management in ", strconv.Itoa(sleepMillisecondTime), "ms")
for {
time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond)
......@@ -81,6 +80,8 @@ func (node *QueryNode) SegmentStatistic(sleepMillisecondTime int) {
statisticData = append(statisticData, stat)
}
fmt.Println("Publish segment statistic")
fmt.Println(statisticData)
var status = node.PublicStatistic(&statisticData)
if status.ErrorCode != msgPb.ErrorCode_SUCCESS {
log.Printf("Publish segments statistic failed")
......@@ -88,7 +89,7 @@ func (node *QueryNode) SegmentStatistic(sleepMillisecondTime int) {
}
func (node *QueryNode) SegmentStatisticService() {
sleepMillisecondTime := 1000
sleepMillisecondTime := 3000
fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms")
for {
time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond)
......
......@@ -21,7 +21,7 @@
#include "interface/ConnectionImpl.h"
#include "utils/TimeRecorder.h"
const int N = 100;
const int N = 200000;
const int DIM = 16;
const int LOOP = 10;
......
......@@ -3,6 +3,7 @@ package tikv_driver
import (
"context"
"errors"
"github.com/czs007/suvlim/conf"
. "github.com/czs007/suvlim/storage/internal/tikv/codec"
. "github.com/czs007/suvlim/storage/pkg/types"
"github.com/tikv/client-go/config"
......@@ -86,7 +87,8 @@ type TikvStore struct {
}
func NewTikvStore(ctx context.Context) (*TikvStore, error) {
pdAddrs := []string{"127.0.0.1:2379"}
var pdAddress0 = conf.Config.Storage.Address + ":" + strconv.FormatInt(int64(conf.Config.Storage.Port), 10)
pdAddrs := []string{pdAddress0}
conf := config.Default()
client, err := rawkv.NewClient(ctx, pdAddrs, conf)
if err != nil {
......
......@@ -130,14 +130,13 @@ func (mc *MessageClient) InitClient(url string) {
timeSyncTopic := "TimeSync"
timeSyncSubName := "writer" + strconv.Itoa(mc.MessageClientID)
readTopics := make([]string, 0)
for i := conf.Config.Writer.InsertTopicStart; i < conf.Config.Writer.InsertTopicEnd; i++ {
str := "InsertOrDelete-partition-"
for i := conf.Config.Writer.TopicStart; i < conf.Config.Writer.TopicEnd; i++ {
str := "InsertOrDelete-"
str = str + strconv.Itoa(i)
readTopics = append(readTopics, str)
}
readSubName := "writer" + strconv.Itoa(mc.MessageClientID)
// TODO::read proxy conf from config.yaml
proxyIdList := []int64{0}
proxyIdList := conf.Config.Master.ProxyIdList
readerQueueSize := timesync.WithReaderQueueSize(conf.Config.Reader.ReaderQueueSize)
timeSync, err := timesync.NewReaderTimeSync(timeSyncTopic,
timeSyncSubName,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册