diff --git a/pkg/master/informer/pulsar.go b/pkg/master/informer/pulsar.go index e6eaee7c1ccec657036f8a505c8c57ab041009c6..e22c48ee0288dd614109243ad84abd89ab020c75 100644 --- a/pkg/master/informer/pulsar.go +++ b/pkg/master/informer/pulsar.go @@ -49,7 +49,10 @@ func (pc PulsarClient) Listener(ssChan chan mock.SegmentStats) error { if err != nil { log.Fatal(err) } - m, _ := mock.SegmentUnMarshal(msg.Payload()) + m, err := mock.SegmentUnMarshal(msg.Payload()) + if err != nil { + log.Println("SegmentUnMarshal Failed") + } fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID(), m.SegementID) ssChan <- m diff --git a/pkg/master/mock/segment.go b/pkg/master/mock/segment.go index de317fe0a712e3de082ed098640fef1396cbdd34..b101f8a2cf2953db4f40a93447a9b4e5b2c301e5 100644 --- a/pkg/master/mock/segment.go +++ b/pkg/master/mock/segment.go @@ -3,6 +3,7 @@ package mock import ( "bytes" "encoding/gob" + "github.com/golang/protobuf/proto" "time" masterpb "github.com/czs007/suvlim/pkg/master/grpc/master" @@ -16,6 +17,9 @@ type SegmentStats struct { Rows int64 } +// map[SegmentID]SegmentCloseTime +type SegmentCloseLog map[uint64]uint64 + func SegmentMarshal(s SegmentStats) ([]byte, error) { var nb bytes.Buffer enc := gob.NewEncoder(&nb) @@ -27,12 +31,18 @@ func SegmentMarshal(s SegmentStats) ([]byte, error) { } func SegmentUnMarshal(data []byte) (SegmentStats, error) { - var ss SegmentStats - dec := gob.NewDecoder(bytes.NewBuffer(data)) - err := dec.Decode(&ss) + var pbSS masterpb.SegmentStat + err := proto.Unmarshal(data, &pbSS) if err != nil { return SegmentStats{}, err } + var ss = SegmentStats{ + SegementID: pbSS.SegmentId, + MemorySize: pbSS.MemorySize, + MemoryRate: float64(pbSS.MemoryRate), + Status: pbSS.Status, + Rows: pbSS.Rows, + } return ss, nil } diff --git a/pkg/master/server.go b/pkg/master/server.go index 0910df24cd0cc752765e95bc93284b1814905672..9b709b1ef4e4012b74f58f0b41777f541011ffc0 100644 --- a/pkg/master/server.go +++ b/pkg/master/server.go @@ -21,7 +21,7 @@ import ( ) func Run() { - go mock.FakePulsarProducer() + // go mock.FakePulsarProducer() go SegmentStatsController() collectionChan := make(chan *messagepb.Mapping) defer close(collectionChan) @@ -45,27 +45,39 @@ func SegmentStatsController() { ssChan := make(chan mock.SegmentStats, 10) defer close(ssChan) ssClient := informer.NewPulsarClient() + + segmentCloseLog := make(map[uint64]uint64, 0) + go ssClient.Listener(ssChan) for { select { case ss := <-ssChan: - ComputeCloseTime(ss, kvbase) + ComputeCloseTime(&segmentCloseLog, ss, kvbase) UpdateSegmentStatus(ss, kvbase) - case <-time.After(5 * time.Second): - fmt.Println("timeout") - return + //case <-time.After(5 * time.Second): + // fmt.Println("timeout") + // return } } } -func ComputeCloseTime(ss mock.SegmentStats, kvbase kv.Base) error { +func ComputeCloseTime(segmentCloseLog *map[uint64]uint64, ss mock.SegmentStats, kvbase kv.Base) error { + segmentID := ss.SegementID + if _, ok := (*segmentCloseLog)[segmentID]; ok { + // This segment has been closed + log.Println("Segment", segmentID, "has been closed") + return nil + } + if int(ss.MemorySize) > int(conf.Config.Master.SegmentThreshole*0.8) { currentTime := time.Now() memRate := int(ss.MemoryRate) if memRate == 0 { - memRate = 1 + //memRate = 1 + log.Println("memRate = 0") + return nil } - sec := int(conf.Config.Master.SegmentThreshole*0.2) / memRate + sec := float64(conf.Config.Master.SegmentThreshole*0.2) / float64(memRate) data, err := kvbase.Load("segment/" + strconv.Itoa(int(ss.SegementID))) if err != nil { return err @@ -74,13 +86,15 @@ func ComputeCloseTime(ss mock.SegmentStats, kvbase kv.Base) error { if err != nil { return err } - seg.CloseTimeStamp = uint64(currentTime.Add(time.Duration(sec) * time.Second).Unix()) + segmentLogicTime := seg.CloseTimeStamp << 46 >> 46 + seg.CloseTimeStamp = uint64(currentTime.Add(time.Duration(sec) * time.Second).Unix()) << 18 + segmentLogicTime fmt.Println("memRate = ", memRate, ",sec = ", sec ,",Close time = ", seg.CloseTimeStamp) updateData, err := mock.Segment2JSON(*seg) if err != nil { return err } kvbase.Save("segment/"+strconv.Itoa(int(ss.SegementID)), updateData) + (*segmentCloseLog)[segmentID] = seg.CloseTimeStamp //create new segment newSegID := id.New().Uint64() newSeg := mock.NewSegment(newSegID, seg.CollectionID, seg.CollectionName, "default", seg.ChannelStart, seg.ChannelEnd, currentTime, time.Unix(1<<36-1, 0)) diff --git a/proxy/src/message_client/ClientV2.cpp b/proxy/src/message_client/ClientV2.cpp index d8769bd3dcee3c5f3acb70edde3e2c2509d8167a..b17cde0c066e2ae2fdc6d3285ac6ccabea515455 100644 --- a/proxy/src/message_client/ClientV2.cpp +++ b/proxy/src/message_client/ClientV2.cpp @@ -146,9 +146,14 @@ Status MsgClientV2::GetQueryResult(int64_t query_id, milvus::grpc::QueryResult * auto status = search_res_msg.ParseFromString(msg.getDataAsString()); if (status) { auto message = std::make_shared(search_res_msg); + if (message->status().error_code() != grpc::ErrorCode::SUCCESS) { + consumer_->acknowledge(msg); + return Status(DB_ERROR, "Search Failed"); + } total_results[message->query_id()].push_back(message); consumer_->acknowledge(msg); } else { + consumer_->acknowledge(msg); return Status(DB_ERROR, "can't parse message which from pulsar!"); } } diff --git a/proxy/src/server/delivery/ReqScheduler.cpp b/proxy/src/server/delivery/ReqScheduler.cpp index 84fca9100baecc24522499a267aedd7c7664639d..8b41ad38d1be2b8d0fae9f700ea0eb7359781ce5 100644 --- a/proxy/src/server/delivery/ReqScheduler.cpp +++ b/proxy/src/server/delivery/ReqScheduler.cpp @@ -121,11 +121,14 @@ ReqScheduler::TakeToExecute(ReqQueuePtr req_queue) { } try { - if (req->type() == ReqType::kInsert || req->type() == ReqType::kDeleteEntityByID || req->type() == ReqType::kSearch) { + if (req->type() == ReqType::kInsert || req->type() == ReqType::kDeleteEntityByID) { std::lock_guard lock(time_syc_mtx_); sending_ = true; req->SetTimestamp(TSOracle::GetInstance().GetTimeStamp()); } + if (req->type() == ReqType::kSearch){ + req->SetTimestamp(TSOracle::GetInstance().GetTimeStamp()); + } auto status = req->Execute(); if (!status.ok()) { LOG_SERVER_ERROR_ << "Req failed with code: " << status.ToString(); diff --git a/reader/message_client/message_client.go b/reader/message_client/message_client.go index 270e8635e0b17d841c2f8024512ef2f44b7c2e79..71e8bcf261ab6a7e7791f4891429b67dd5651a6f 100644 --- a/reader/message_client/message_client.go +++ b/reader/message_client/message_client.go @@ -167,7 +167,8 @@ func (mc *MessageClient) InitClient(url string) { mc.searchResultProducers[key] = mc.creatProducer(topic) } //mc.searchResultProducer = mc.creatProducer("SearchResult") - mc.segmentsStatisticProducer = mc.creatProducer("SegmentsStatistic") + SegmentsStatisticTopicName := conf.Config.Master.PulsarTopic + mc.segmentsStatisticProducer = mc.creatProducer(SegmentsStatisticTopicName) //create consumer mc.searchConsumer = mc.createConsumer("Search") diff --git a/reader/read_node/query_node.go b/reader/read_node/query_node.go index 3ca1ad8c209cf60e35e2fefe36e76a167e1c9980..fd6f9a8ff612ee4a93c8768f370671e7354d964b 100644 --- a/reader/read_node/query_node.go +++ b/reader/read_node/query_node.go @@ -16,6 +16,7 @@ import "C" import ( "encoding/json" "fmt" + "github.com/czs007/suvlim/conf" "github.com/stretchr/testify/assert" "log" "sort" @@ -94,7 +95,7 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode { ReadTimeSyncMin: timeSync, ReadTimeSyncMax: timeSync, WriteTimeSync: timeSync, - SearchTimeSync: timeSync, + ServiceTimeSync: timeSync, TSOTimeSync: timeSync, } @@ -134,7 +135,7 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes ReadTimeSyncMin: timeSync, ReadTimeSyncMax: timeSync, WriteTimeSync: timeSync, - SearchTimeSync: timeSync, + ServiceTimeSync: timeSync, TSOTimeSync: timeSync, } @@ -327,8 +328,8 @@ func (node *QueryNode) TestInsertDelete(timeRange TimeRange) { func (node *QueryNode) RunSearch(wg *sync.WaitGroup) { for { - msg, ok := <-node.messageClient.GetSearchChan() - if ok { + select { + case msg := <-node.messageClient.GetSearchChan(): node.messageClient.SearchMsg = node.messageClient.SearchMsg[:0] node.messageClient.SearchMsg = append(node.messageClient.SearchMsg, msg) fmt.Println("Do Search...") @@ -337,6 +338,8 @@ func (node *QueryNode) RunSearch(wg *sync.WaitGroup) { fmt.Println("Search Failed") node.PublishFailedSearchResult() } + default: + } } wg.Done() @@ -569,15 +572,25 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { var clientId = msg.ClientId var resultsTmp = make([]SearchResultTmp, 0) - var timestamp = msg.Timestamp + var searchTimestamp = msg.Timestamp + + // ServiceTimeSync update by readerTimeSync, which is get from proxy. + // Proxy send this timestamp per `conf.Config.Timesync.Interval` milliseconds. + // However, timestamp of search request (searchTimestamp) is precision time. + // So the ServiceTimeSync is always less than searchTimestamp. + // Here, we manually make searchTimestamp's logic time minus `conf.Config.Timesync.Interval` milliseconds. + // Which means `searchTimestamp.logicTime = searchTimestamp.logicTime - conf.Config.Timesync.Interval`. + var logicTimestamp = searchTimestamp << 46 >> 46 + searchTimestamp = (searchTimestamp >> 18 - uint64(conf.Config.Timesync.Interval)) << 18 + logicTimestamp + var vector = msg.Records // We now only the first Json is valid. var queryJson = msg.Json[0] // 1. Timestamp check // TODO: return or wait? Or adding graceful time - if timestamp > node.queryNodeTimeSync.SearchTimeSync { - fmt.Println("Invalid query time, timestamp = ", timestamp, ", SearchTimeSync = ", node.queryNodeTimeSync.SearchTimeSync) + if searchTimestamp > node.queryNodeTimeSync.ServiceTimeSync { + fmt.Println("Invalid query time, timestamp = ", searchTimestamp, ", SearchTimeSync = ", node.queryNodeTimeSync.ServiceTimeSync) return msgPb.Status{ErrorCode: 1} } @@ -586,7 +599,7 @@ func (node *QueryNode) Search(searchMessages []*msgPb.SearchMsg) msgPb.Status { // 3. Do search in all segments for _, segment := range node.SegmentsMap { - var res, err = segment.SegmentSearch(query, timestamp, vector) + var res, err = segment.SegmentSearch(query, searchTimestamp, vector) if err != nil { fmt.Println(err.Error()) return msgPb.Status{ErrorCode: 1} diff --git a/reader/read_node/query_node_time.go b/reader/read_node/query_node_time.go index 629729500f3d1bccbd7ebe6c73fe8bd1c571a261..336a146fa9d15a28648185b9bdc35c3856d2802e 100644 --- a/reader/read_node/query_node_time.go +++ b/reader/read_node/query_node_time.go @@ -4,7 +4,7 @@ type QueryNodeTime struct { ReadTimeSyncMin uint64 ReadTimeSyncMax uint64 WriteTimeSync uint64 - SearchTimeSync uint64 + ServiceTimeSync uint64 TSOTimeSync uint64 } @@ -25,7 +25,7 @@ func (t *QueryNodeTime) UpdateWriteTimeSync() { } func (t *QueryNodeTime) UpdateSearchTimeSync(timeRange TimeRange) { - t.SearchTimeSync = timeRange.timestampMax + t.ServiceTimeSync = timeRange.timestampMax } func (t *QueryNodeTime) UpdateTSOTimeSync() { diff --git a/reader/read_node/segment_service.go b/reader/read_node/segment_service.go index f4e8e8efaceb35198b13450375b6dc39b1c1d545..75380be6aec458f7d148e5e098d8629c240e7b1a 100644 --- a/reader/read_node/segment_service.go +++ b/reader/read_node/segment_service.go @@ -80,8 +80,8 @@ func (node *QueryNode) SegmentStatistic(sleepMillisecondTime int) { statisticData = append(statisticData, stat) } - fmt.Println("Publish segment statistic") - fmt.Println(statisticData) + // fmt.Println("Publish segment statistic") + // fmt.Println(statisticData) var status = node.PublicStatistic(&statisticData) if status.ErrorCode != msgPb.ErrorCode_SUCCESS { log.Printf("Publish segments statistic failed")