提交 fe5f0afe 编写于 作者: B bigsheeper 提交者: yefu.chen

Fix query node service

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 d8b1a7cf
......@@ -28,7 +28,7 @@ timesync:
storage:
driver: TIKV
address: localhost
port: 0
port: 2379
accesskey: ab
secretkey: dd
......@@ -41,7 +41,7 @@ pulsar:
reader:
clientid: 1
stopflag: -1
readerqueuesize: 1024
readerqueuesize: 10240
searchchansize: 10000
key2segchansize: 10000
inserttopicstart: 0
......
......@@ -289,6 +289,9 @@ TEST(CApiTest, GetMemoryUsageInBytesTest) {
auto partition = NewPartition(collection, partition_name);
auto segment = NewSegment(partition, 0);
auto old_memory_usage_size = GetMemoryUsageInBytes(segment);
std::cout << "old_memory_usage_size = " << old_memory_usage_size << std::endl;
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
std::vector<int64_t> uids;
......@@ -317,6 +320,8 @@ TEST(CApiTest, GetMemoryUsageInBytesTest) {
auto memory_usage_size = GetMemoryUsageInBytes(segment);
std::cout << "new_memory_usage_size = " << memory_usage_size << std::endl;
assert(memory_usage_size == 1898459);
DeleteCollection(collection);
......
......@@ -178,7 +178,6 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request,
auto channel_id = makeHash(&uid, sizeof(uint64_t)) % topic_num;
try {
mut_msg.set_segment_id(segment_id(request.collection_name(), channel_id, timestamp));
printf("%ld \n", mut_msg.segment_id());
mut_msg.mutable_rows_data()->CopyFrom(request.rows_data(i));
mut_msg.mutable_extra_params()->CopyFrom(request.extra_params());
......
......@@ -104,6 +104,7 @@ func (node *QueryNode) processSegmentCreate(id string, value string) {
}
printSegmentStruct(segment)
// TODO: fix this after channel range config finished
//if !isSegmentChannelRangeInQueryNodeChannelRange(segment) {
// return
//}
......@@ -117,7 +118,6 @@ func (node *QueryNode) processSegmentCreate(id string, value string) {
newSegment := partition.NewSegment(newSegmentID)
newSegment.SegmentStatus = SegmentOpened
newSegment.SegmentCloseTime = segment.CloseTimeStamp
partition.OpenedSegments = append(partition.OpenedSegments, newSegment)
node.SegmentsMap[newSegmentID] = newSegment
}
}
......@@ -147,6 +147,7 @@ func (node *QueryNode) processSegmentModify(id string, value string) {
}
printSegmentStruct(segment)
// TODO: fix this after channel range config finished
//if !isSegmentChannelRangeInQueryNodeChannelRange(segment) {
// return
//}
......
......@@ -16,8 +16,7 @@ import "C"
type Partition struct {
PartitionPtr C.CPartition
PartitionName string
OpenedSegments []*Segment
ClosedSegments []*Segment
Segments []*Segment
}
func (p *Partition) NewSegment(segmentId int64) *Segment {
......@@ -28,7 +27,7 @@ func (p *Partition) NewSegment(segmentId int64) *Segment {
segmentPtr := C.NewSegment(p.PartitionPtr, C.ulong(segmentId))
var newSegment = &Segment{SegmentPtr: segmentPtr, SegmentId: segmentId}
p.OpenedSegments = append(p.OpenedSegments, newSegment)
p.Segments = append(p.Segments, newSegment)
return newSegment
}
......
......@@ -21,6 +21,7 @@ import (
"sort"
"sync"
"sync/atomic"
"time"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/czs007/suvlim/pkg/master/kv"
......@@ -65,6 +66,12 @@ type QueryInfo struct {
FieldName string `json:"field_name"`
}
type MsgCounter struct {
InsertCounter int64
DeleteCounter int64
SearchCounter int64
}
type QueryNode struct {
QueryNodeId uint64
Collections []*Collection
......@@ -77,6 +84,7 @@ type QueryNode struct {
deleteData DeleteData
insertData InsertData
kvBase *kv.EtcdKVBase
msgCounter *MsgCounter
}
func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
......@@ -99,6 +107,12 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
validSearchBuffer: make([]bool, 0),
}
msgCounter := MsgCounter{
InsertCounter: 0,
DeleteCounter: 0,
SearchCounter: 0,
}
return &QueryNode{
QueryNodeId: queryNodeId,
Collections: nil,
......@@ -106,6 +120,7 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
messageClient: &mc,
queryNodeTimeSync: queryNodeTimeSync,
buffer: buffer,
msgCounter: &msgCounter,
}
}
......@@ -132,6 +147,12 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes
validSearchBuffer: make([]bool, 0),
}
msgCounter := MsgCounter{
InsertCounter: 0,
DeleteCounter: 0,
SearchCounter: 0,
}
return &QueryNode{
QueryNodeId: queryNodeId,
Collections: nil,
......@@ -139,6 +160,7 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes
messageClient: mc,
queryNodeTimeSync: queryNodeTimeSync,
buffer: buffer,
msgCounter: &msgCounter,
}
}
......@@ -168,9 +190,9 @@ func (node *QueryNode) QueryNodeDataInit() {
func (node *QueryNode) NewCollection(collectionID uint64, collectionName string, schemaConfig string) *Collection {
/*
void
UpdateIndexes(CCollection c_collection, const char *index_string);
*/
void
UpdateIndexes(CCollection c_collection, const char *index_string);
*/
cName := C.CString(collectionName)
cSchema := C.CString(schemaConfig)
collection := C.NewCollection(cName, cSchema)
......@@ -183,9 +205,9 @@ func (node *QueryNode) NewCollection(collectionID uint64, collectionName string,
func (node *QueryNode) DeleteCollection(collection *Collection) {
/*
void
DeleteCollection(CCollection collection);
*/
void
DeleteCollection(CCollection collection);
*/
cPtr := collection.CollectionPtr
C.DeleteCollection(cPtr)
......@@ -194,8 +216,8 @@ func (node *QueryNode) DeleteCollection(collection *Collection) {
func (node *QueryNode) UpdateIndexes(collection *Collection, indexConfig *string) {
/*
void
UpdateIndexes(CCollection c_collection, const char *index_string);
void
UpdateIndexes(CCollection c_collection, const char *index_string);
*/
cCollectionPtr := collection.CollectionPtr
cIndexConfig := C.CString(*indexConfig)
......@@ -222,8 +244,50 @@ func (node *QueryNode) InitQueryNodeCollection() {
////////////////////////////////////////////////////////////////////////////////////////////////////
func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) {
const Debug = true
const CountMsgNum = 1000 * 1000
if Debug {
var printFlag = true
var startTime = true
var start time.Time
for {
var msgLen = node.PrepareBatchMsg()
var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()}
assert.NotEqual(nil, 0, timeRange.timestampMin)
assert.NotEqual(nil, 0, timeRange.timestampMax)
if msgLen[0] == 0 && len(node.buffer.InsertDeleteBuffer) <= 0 {
continue
}
if startTime {
fmt.Println("============> Start Test <============")
startTime = false
start = time.Now()
}
node.QueryNodeDataInit()
node.MessagesPreprocess(node.messageClient.InsertOrDeleteMsg, timeRange)
//fmt.Println("MessagesPreprocess Done")
node.WriterDelete()
node.PreInsertAndDelete()
//fmt.Println("PreInsertAndDelete Done")
node.DoInsertAndDelete()
//fmt.Println("DoInsertAndDelete Done")
node.queryNodeTimeSync.UpdateSearchTimeSync(timeRange)
// Test insert time
if printFlag && node.msgCounter.InsertCounter >= CountMsgNum {
printFlag = false
timeSince := time.Since(start)
fmt.Println("============> Do", node.msgCounter.InsertCounter, "Insert in", timeSince, "<============")
}
}
}
for {
// TODO: get timeRange from message client
var msgLen = node.PrepareBatchMsg()
var timeRange = TimeRange{node.messageClient.TimeSyncStart(), node.messageClient.TimeSyncEnd()}
assert.NotEqual(nil, 0, timeRange.timestampMin)
......@@ -444,6 +508,7 @@ func (node *QueryNode) DoInsert(segmentID int64, records *[][]byte, wg *sync.Wai
timestamps := node.insertData.insertTimestamps[segmentID]
offsets := node.insertData.insertOffset[segmentID]
node.msgCounter.InsertCounter += int64(len(ids))
err = targetSegment.SegmentInsert(offsets, &ids, &timestamps, records)
if err != nil {
fmt.Println(err.Error())
......@@ -463,6 +528,7 @@ func (node *QueryNode) DoDelete(segmentID int64, deleteIDs *[]int64, deleteTimes
offset := node.deleteData.deleteOffset[segmentID]
node.msgCounter.DeleteCounter += int64(len(*deleteIDs))
err = segment.SegmentDelete(offset, deleteIDs, deleteTimestamps)
if err != nil {
fmt.Println(err.Error())
......@@ -487,17 +553,18 @@ func (node *QueryNode) QueryJson2Info(queryJson *string) *QueryInfo {
}
func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
// TODO: use client id to publish results to different clients
// var clientId = (*(searchMessages[0])).ClientId
type SearchResultTmp struct {
ResultId int64
ResultDistance float32
}
node.msgCounter.SearchCounter += int64(len(searchMessages))
// Traverse all messages in the current messageClient.
// TODO: Do not receive batched search requests
for _, msg := range searchMessages {
var clientId = msg.ClientId
var resultsTmp = make([]SearchResultTmp, 0)
var timestamp = msg.Timestamp
......@@ -522,7 +589,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
fmt.Println(err.Error())
return msgPb.Status{ErrorCode: 1}
}
fmt.Println(res.ResultIds)
for i := 0; i < len(res.ResultIds); i++ {
resultsTmp = append(resultsTmp, SearchResultTmp{ResultId: res.ResultIds[i], ResultDistance: res.ResultDistances[i]})
}
......@@ -543,6 +610,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status {
Entities: &entities,
Distances: make([]float32, 0),
QueryId: msg.Uid,
ClientId: clientId,
}
for _, res := range resultsTmp {
results.Entities.Ids = append(results.Entities.Ids, res.ResultId)
......
......@@ -3,6 +3,7 @@ package reader
import (
"context"
"github.com/czs007/suvlim/reader/message_client"
"log"
"sync"
)
......@@ -15,11 +16,17 @@ func StartQueryNode(pulsarURL string) {
ctx := context.Background()
// Segments Services
//go qn.SegmentManagementService()
go qn.SegmentManagementService()
go qn.SegmentStatisticService()
wg := sync.WaitGroup{}
qn.InitFromMeta()
err := qn.InitFromMeta()
if err != nil {
log.Printf("Init query node from meta failed")
return
}
wg.Add(3)
go qn.RunMetaService(ctx, &wg)
go qn.RunInsertDelete(&wg)
......
......@@ -73,6 +73,8 @@ func (s *Segment) CloseSegment(collection* Collection) error {
int
Close(CSegmentBase c_segment);
*/
fmt.Println("Closing segment :", s.SegmentId)
var status = C.Close(s.SegmentPtr)
s.SegmentStatus = SegmentClosed
......@@ -82,11 +84,13 @@ func (s *Segment) CloseSegment(collection* Collection) error {
// Build index after closing segment
s.SegmentStatus = SegmentIndexing
fmt.Println("Building index...")
s.buildIndex(collection)
// TODO: remove redundant segment indexed status
// Change segment status to indexed
s.SegmentStatus = SegmentIndexed
fmt.Println("Segment closed and indexed")
return nil
}
......
......@@ -13,20 +13,19 @@ func (node *QueryNode) SegmentsManagement() {
//node.queryNodeTimeSync.UpdateTSOTimeSync()
//var timeNow = node.queryNodeTimeSync.TSOTimeSync
timeNow := node.messageClient.GetTimeNow()
timeNow := node.messageClient.GetTimeNow() >> 18
for _, collection := range node.Collections {
for _, partition := range collection.Partitions {
for _, oldSegment := range partition.OpenedSegments {
// TODO: check segment status
if timeNow >= oldSegment.SegmentCloseTime {
// close old segment and move it into partition.ClosedSegments
if oldSegment.SegmentStatus != SegmentOpened {
log.Println("Never reach here, Opened segment cannot be closed")
continue
}
go oldSegment.CloseSegment(collection)
partition.ClosedSegments = append(partition.ClosedSegments, oldSegment)
for _, segment := range partition.Segments {
if segment.SegmentStatus != SegmentOpened {
log.Println("Segment have been closed")
continue
}
fmt.Println("timeNow = ", timeNow, "SegmentCloseTime = ", segment.SegmentCloseTime)
if timeNow >= segment.SegmentCloseTime {
go segment.CloseSegment(collection)
}
}
}
......@@ -34,7 +33,7 @@ func (node *QueryNode) SegmentsManagement() {
}
func (node *QueryNode) SegmentManagementService() {
sleepMillisecondTime := 1000
sleepMillisecondTime := 3000
fmt.Println("do segments management in ", strconv.Itoa(sleepMillisecondTime), "ms")
for {
time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond)
......@@ -81,6 +80,8 @@ func (node *QueryNode) SegmentStatistic(sleepMillisecondTime int) {
statisticData = append(statisticData, stat)
}
fmt.Println("Publish segment statistic")
fmt.Println(statisticData)
var status = node.PublicStatistic(&statisticData)
if status.ErrorCode != msgPb.ErrorCode_SUCCESS {
log.Printf("Publish segments statistic failed")
......@@ -88,7 +89,7 @@ func (node *QueryNode) SegmentStatistic(sleepMillisecondTime int) {
}
func (node *QueryNode) SegmentStatisticService() {
sleepMillisecondTime := 1000
sleepMillisecondTime := 3000
fmt.Println("do segments statistic in ", strconv.Itoa(sleepMillisecondTime), "ms")
for {
time.Sleep(time.Duration(sleepMillisecondTime) * time.Millisecond)
......
......@@ -21,7 +21,7 @@
#include "interface/ConnectionImpl.h"
#include "utils/TimeRecorder.h"
const int N = 100;
const int N = 200000;
const int DIM = 16;
const int LOOP = 10;
......
......@@ -3,6 +3,7 @@ package tikv_driver
import (
"context"
"errors"
"github.com/czs007/suvlim/conf"
. "github.com/czs007/suvlim/storage/internal/tikv/codec"
. "github.com/czs007/suvlim/storage/pkg/types"
"github.com/tikv/client-go/config"
......@@ -86,7 +87,8 @@ type TikvStore struct {
}
func NewTikvStore(ctx context.Context) (*TikvStore, error) {
pdAddrs := []string{"127.0.0.1:2379"}
var pdAddress0 = conf.Config.Storage.Address + ":" + strconv.FormatInt(int64(conf.Config.Storage.Port), 10)
pdAddrs := []string{pdAddress0}
conf := config.Default()
client, err := rawkv.NewClient(ctx, pdAddrs, conf)
if err != nil {
......
......@@ -10,6 +10,7 @@ import (
"log"
"strconv"
"sync"
"time"
)
func main() {
......@@ -31,10 +32,49 @@ func main() {
log.Fatal(err)
}
msgCounter := write_node.MsgCounter{
InsertCounter: 0,
DeleteCounter: 0,
}
wn := write_node.WriteNode{
KvStore: &kv,
MessageClient: &mc,
TimeSync: 100,
MsgCounter: &msgCounter,
}
const Debug = true
const CountMsgNum = 1000 * 1000
if Debug {
var printFlag = true
var startTime = true
var start time.Time
for {
if ctx.Err() != nil {
break
}
msgLength := wn.MessageClient.PrepareBatchMsg()
if msgLength > 0 {
if startTime {
fmt.Println("============> Start Test <============")
startTime = false
start = time.Now()
}
wn.DoWriteNode(ctx, &wg)
fmt.Println("write node do a batch message, storage len: ", msgLength)
}
// Test insert time
if printFlag && wn.MsgCounter.InsertCounter >= CountMsgNum {
printFlag = false
timeSince := time.Since(start)
fmt.Println("============> Do", wn.MsgCounter.InsertCounter, "Insert in", timeSince, "<============")
}
}
}
//TODO:: start a gorouter for searchById
......
......@@ -17,11 +17,16 @@ type SegmentIdInfo struct {
SegmentIds []string
}
type MsgCounter struct {
InsertCounter int64
DeleteCounter int64
}
type WriteNode struct {
KvStore *types.Store
MessageClient *message_client.MessageClient
TimeSync uint64
MsgCounter *MsgCounter
}
func (wn *WriteNode) Close() {
......@@ -34,10 +39,17 @@ func NewWriteNode(ctx context.Context,
timeSync uint64) (*WriteNode, error) {
kv, err := storage.NewStore(context.Background(), types.MinIODriver)
mc := message_client.MessageClient{}
msgCounter := MsgCounter{
InsertCounter: 0,
DeleteCounter: 0,
}
return &WriteNode{
KvStore: &kv,
MessageClient: &mc,
TimeSync: timeSync,
MsgCounter: &msgCounter,
}, err
}
......@@ -58,6 +70,8 @@ func (wn *WriteNode) InsertBatchData(ctx context.Context, data []*msgpb.InsertOr
timeStamp = append(timeStamp, uint64(data[i].Timestamp))
}
wn.MsgCounter.InsertCounter += int64(len(timeStamp))
error := (*wn.KvStore).PutRows(ctx, prefixKeys, binaryData, suffixKeys, timeStamp)
if error != nil {
fmt.Println("Can't insert data!")
......@@ -89,13 +103,15 @@ func (wn *WriteNode) DeleteBatchData(ctx context.Context, data []*msgpb.InsertOr
}
segmentInfo := msgpb.Key2SegMsg{
Uid: data[i].Uid,
Uid: data[i].Uid,
SegmentId: segmentIds,
Timestamp: data[i].Timestamp,
}
wn.MessageClient.Send(ctx, segmentInfo)
}
wn.MsgCounter.DeleteCounter += int64(len(timeStamps))
err := (*wn.KvStore).DeleteRows(ctx, prefixKeys, timeStamps)
if err != nil {
fmt.Println("Can't delete data")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册