提交 e36b565e 编写于 作者: D dragondriver 提交者: yefu.chen

Normalize the naming of variable and function in Proxy

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 9f35dee2
......@@ -34,7 +34,7 @@ func main() {
cancel()
}()
if err := svr.Run(); err != nil {
if err := svr.Start(); err != nil {
log.Fatal("run server failed", zap.Error(err))
}
......
......@@ -187,7 +187,7 @@ func TestMaster_CollectionTask(t *testing.T) {
assert.NotEqual(t, commonpb.ErrorCode_SUCCESS, listResp.Status.ErrorCode)
// CreateCollection Test
collMeta, err := svr.mt.GetCollectionByName(sch.Name)
collMeta, err := svr.metaTable.GetCollectionByName(sch.Name)
assert.Nil(t, err)
t.Logf("collection id = %d", collMeta.ID)
assert.Equal(t, collMeta.CreateTime, uint64(11))
......@@ -298,7 +298,7 @@ func TestMaster_CollectionTask(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS)
collMeta, err = svr.mt.GetCollectionByName(sch.Name)
collMeta, err = svr.metaTable.GetCollectionByName(sch.Name)
assert.NotNil(t, err)
// HasCollection "col1" is false
......
......@@ -16,7 +16,7 @@ func (s *Master) CreateCollection(ctx context.Context, in *internalpb.CreateColl
req: in,
baseTask: baseTask{
sch: s.scheduler,
mt: s.mt,
mt: s.metaTable,
cv: make(chan error),
},
}
......@@ -46,7 +46,7 @@ func (s *Master) DropCollection(ctx context.Context, in *internalpb.DropCollecti
req: in,
baseTask: baseTask{
sch: s.scheduler,
mt: s.mt,
mt: s.metaTable,
cv: make(chan error),
},
}
......@@ -76,7 +76,7 @@ func (s *Master) HasCollection(ctx context.Context, in *internalpb.HasCollection
req: in,
baseTask: baseTask{
sch: s.scheduler,
mt: s.mt,
mt: s.metaTable,
cv: make(chan error),
},
hasCollection: false,
......@@ -113,7 +113,7 @@ func (s *Master) DescribeCollection(ctx context.Context, in *internalpb.Describe
req: in,
baseTask: baseTask{
sch: s.scheduler,
mt: s.mt,
mt: s.metaTable,
cv: make(chan error),
},
description: nil,
......@@ -149,7 +149,7 @@ func (s *Master) ShowCollections(ctx context.Context, in *internalpb.ShowCollect
req: in,
baseTask: baseTask{
sch: s.scheduler,
mt: s.mt,
mt: s.metaTable,
cv: make(chan error),
},
stringListResponse: nil,
......@@ -187,7 +187,7 @@ func (s *Master) CreatePartition(ctx context.Context, in *internalpb.CreateParti
req: in,
baseTask: baseTask{
sch: s.scheduler,
mt: s.mt,
mt: s.metaTable,
cv: make(chan error),
},
}
......@@ -218,7 +218,7 @@ func (s *Master) DropPartition(ctx context.Context, in *internalpb.DropPartition
req: in,
baseTask: baseTask{
sch: s.scheduler,
mt: s.mt,
mt: s.metaTable,
cv: make(chan error),
},
}
......@@ -249,7 +249,7 @@ func (s *Master) HasPartition(ctx context.Context, in *internalpb.HasPartitionRe
req: in,
baseTask: baseTask{
sch: s.scheduler,
mt: s.mt,
mt: s.metaTable,
cv: make(chan error),
},
hasPartition: false,
......@@ -290,7 +290,7 @@ func (s *Master) DescribePartition(ctx context.Context, in *internalpb.DescribeP
req: in,
baseTask: baseTask{
sch: s.scheduler,
mt: s.mt,
mt: s.metaTable,
cv: make(chan error),
},
description: nil,
......@@ -328,7 +328,7 @@ func (s *Master) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitio
req: in,
baseTask: baseTask{
sch: s.scheduler,
mt: s.mt,
mt: s.metaTable,
cv: make(chan error),
},
stringListResponse: nil,
......
......@@ -131,7 +131,7 @@ func TestMaster_CreateCollection(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS)
collMeta, err := svr.mt.GetCollectionByName(sch.Name)
collMeta, err := svr.metaTable.GetCollectionByName(sch.Name)
assert.Nil(t, err)
t.Logf("collection id = %d", collMeta.ID)
assert.Equal(t, collMeta.CreateTime, uint64(11))
......
......@@ -13,7 +13,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/kv"
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
......@@ -72,26 +71,22 @@ type Master struct {
//grpc server
grpcServer *grpc.Server
grpcErr chan error
// chans
ssChan chan internalpb.SegmentStats
grpcErr chan error
kvBase *kv.EtcdKV
scheduler *ddRequestScheduler
mt *metaTable
tsmp *timeSyncMsgProducer
kvBase *kv.EtcdKV
scheduler *ddRequestScheduler
metaTable *metaTable
timesSyncMsgProducer *timeSyncMsgProducer
// tso ticker
tsTicker *time.Ticker
tsoTicker *time.Ticker
// Add callback functions at different stages
startCallbacks []func()
closeCallbacks []func()
segmentMgr *SegmentManager
statsMs ms.MsgStream
segmentMgr *SegmentManager
segmentStatusMsg ms.MsgStream
//id allocator
idAllocator *GlobalIDAllocator
......@@ -128,7 +123,7 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
}
//timeSyncMsgProducer
tsmp, err := NewTimeSyncMsgProducer(ctx)
tsMsgProducer, err := NewTimeSyncMsgProducer(ctx)
if err != nil {
return nil, err
}
......@@ -138,7 +133,7 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
pulsarProxyStream.Start()
var proxyStream ms.MsgStream = pulsarProxyStream
proxyTimeTickBarrier := newSoftTimeTickBarrier(ctx, &proxyStream, opt.ProxyIDs, opt.SoftTTBInterval)
tsmp.SetProxyTtBarrier(proxyTimeTickBarrier)
tsMsgProducer.SetProxyTtBarrier(proxyTimeTickBarrier)
pulsarWriteStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream
pulsarWriteStream.SetPulsarClient(opt.PulsarAddr)
......@@ -146,17 +141,17 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
pulsarWriteStream.Start()
var writeStream ms.MsgStream = pulsarWriteStream
writeTimeTickBarrier := newHardTimeTickBarrier(ctx, &writeStream, opt.WriteIDs)
tsmp.SetWriteNodeTtBarrier(writeTimeTickBarrier)
tsMsgProducer.SetWriteNodeTtBarrier(writeTimeTickBarrier)
pulsarDMStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarDMStream.SetPulsarClient(opt.PulsarAddr)
pulsarDMStream.CreatePulsarProducers(opt.PulsarDMChannels)
tsmp.SetDMSyncStream(pulsarDMStream)
tsMsgProducer.SetDMSyncStream(pulsarDMStream)
pulsarK2SStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
pulsarK2SStream.SetPulsarClient(opt.PulsarAddr)
pulsarK2SStream.CreatePulsarProducers(opt.PulsarK2SChannels)
tsmp.SetK2sSyncStream(pulsarK2SStream)
tsMsgProducer.SetK2sSyncStream(pulsarK2SStream)
// stats msg stream
statsMs := ms.NewPulsarMsgStream(ctx, 1024)
......@@ -165,14 +160,13 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
statsMs.Start()
m := &Master{
ctx: ctx,
startTimestamp: time.Now().Unix(),
kvBase: newKVBase(opt.KVRootPath, opt.EtcdAddr),
mt: metakv,
tsmp: tsmp,
ssChan: make(chan internalpb.SegmentStats, 10),
grpcErr: make(chan error),
statsMs: statsMs,
ctx: ctx,
startTimestamp: time.Now().Unix(),
kvBase: newKVBase(opt.KVRootPath, opt.EtcdAddr),
metaTable: metakv,
timesSyncMsgProducer: tsMsgProducer,
grpcErr: make(chan error),
segmentStatusMsg: statsMs,
}
//init idAllocator
......@@ -270,7 +264,7 @@ func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error {
//go s.Se
s.serverLoopWg.Add(1)
if err := s.tsmp.Start(); err != nil {
if err := s.timesSyncMsgProducer.Start(); err != nil {
return err
}
......@@ -294,7 +288,7 @@ func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error {
}
func (s *Master) stopServerLoop() {
s.tsmp.Close()
s.timesSyncMsgProducer.Close()
s.serverLoopWg.Done()
if s.grpcServer != nil {
......@@ -340,13 +334,13 @@ func (s *Master) grpcLoop(grpcPort int64) {
func (s *Master) tsLoop() {
defer s.serverLoopWg.Done()
s.tsTicker = time.NewTicker(UpdateTimestampStep)
defer s.tsTicker.Stop()
s.tsoTicker = time.NewTicker(UpdateTimestampStep)
defer s.tsoTicker.Stop()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
for {
select {
case <-s.tsTicker.C:
case <-s.tsoTicker.C:
if err := s.tsoAllocator.UpdateTSO(); err != nil {
log.Println("failed to update timestamp", err)
return
......@@ -392,13 +386,13 @@ func (s *Master) tasksExecutionLoop() {
func (s *Master) segmentStatisticsLoop() {
defer s.serverLoopWg.Done()
defer s.statsMs.Close()
defer s.segmentStatusMsg.Close()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()
for {
select {
case msg := <-s.statsMs.Chan():
case msg := <-s.segmentStatusMsg.Chan():
err := s.segmentMgr.HandleQueryNodeMsgPack(msg)
if err != nil {
log.Println(err)
......
......@@ -105,7 +105,7 @@ func (mt *metaTable) reloadFromKV() error {
return nil
}
// mt.ddLock.Lock() before call this function
// metaTable.ddLock.Lock() before call this function
func (mt *metaTable) saveCollectionMeta(coll *pb.CollectionMeta) error {
collBytes, err := proto.Marshal(coll)
if err != nil {
......@@ -116,7 +116,7 @@ func (mt *metaTable) saveCollectionMeta(coll *pb.CollectionMeta) error {
return mt.client.Save("/collection/"+strconv.FormatInt(coll.ID, 10), string(collBytes))
}
// mt.ddLock.Lock() before call this function
// metaTable.ddLock.Lock() before call this function
func (mt *metaTable) saveSegmentMeta(seg *pb.SegmentMeta) error {
segBytes, err := proto.Marshal(seg)
if err != nil {
......@@ -128,7 +128,7 @@ func (mt *metaTable) saveSegmentMeta(seg *pb.SegmentMeta) error {
return mt.client.Save("/segment/"+strconv.FormatInt(seg.SegmentID, 10), string(segBytes))
}
// mt.ddLock.Lock() before call this function
// metaTable.ddLock.Lock() before call this function
func (mt *metaTable) saveCollectionAndDeleteSegmentsMeta(coll *pb.CollectionMeta, segIDs []UniqueID) error {
segIDStrs := make([]string, 0, len(segIDs))
for _, segID := range segIDs {
......@@ -156,7 +156,7 @@ func (mt *metaTable) saveCollectionAndDeleteSegmentsMeta(coll *pb.CollectionMeta
return mt.client.MultiSaveAndRemove(kvs, segIDStrs)
}
// mt.ddLock.Lock() before call this function
// metaTable.ddLock.Lock() before call this function
func (mt *metaTable) saveCollectionsAndSegmentsMeta(coll *pb.CollectionMeta, seg *pb.SegmentMeta) error {
kvs := make(map[string]string)
collBytes, err := proto.Marshal(coll)
......@@ -179,7 +179,7 @@ func (mt *metaTable) saveCollectionsAndSegmentsMeta(coll *pb.CollectionMeta, seg
return mt.client.MultiSave(kvs)
}
// mt.ddLock.Lock() before call this function
// metaTable.ddLock.Lock() before call this function
func (mt *metaTable) deleteCollectionsAndSegmentsMeta(collID UniqueID, segIDs []UniqueID) error {
collIDStr := "/collection/" + strconv.FormatInt(collID, 10)
......
......@@ -168,7 +168,7 @@ func TestMaster_Partition(t *testing.T) {
assert.NotNil(t, st)
assert.Equal(t, commonpb.ErrorCode_SUCCESS, st.ErrorCode)
collMeta, err := svr.mt.GetCollectionByName(sch.Name)
collMeta, err := svr.metaTable.GetCollectionByName(sch.Name)
assert.Nil(t, err)
t.Logf("collection id = %d", collMeta.ID)
assert.Equal(t, collMeta.CreateTime, uint64(1))
......
......@@ -48,7 +48,7 @@ func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb.
case <-ctx.Done():
return errors.New("insert timeout")
default:
return p.taskSch.DmQueue.Enqueue(it)
return p.sched.DmQueue.Enqueue(it)
}
}
err := fn()
......@@ -96,7 +96,7 @@ func (p *Proxy) CreateCollection(ctx context.Context, req *schemapb.CollectionSc
case <-ctx.Done():
return errors.New("create collection timeout")
default:
return p.taskSch.DdQueue.Enqueue(cct)
return p.sched.DdQueue.Enqueue(cct)
}
}
err := fn()
......@@ -144,7 +144,7 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu
case <-ctx.Done():
return errors.New("create collection timeout")
default:
return p.taskSch.DqQueue.Enqueue(qt)
return p.sched.DqQueue.Enqueue(qt)
}
}
err := fn()
......@@ -189,7 +189,7 @@ func (p *Proxy) DropCollection(ctx context.Context, req *servicepb.CollectionNam
case <-ctx.Done():
return errors.New("create collection timeout")
default:
return p.taskSch.DdQueue.Enqueue(dct)
return p.sched.DdQueue.Enqueue(dct)
}
}
err := fn()
......@@ -230,7 +230,7 @@ func (p *Proxy) HasCollection(ctx context.Context, req *servicepb.CollectionName
case <-ctx.Done():
return errors.New("create collection timeout")
default:
return p.taskSch.DdQueue.Enqueue(hct)
return p.sched.DdQueue.Enqueue(hct)
}
}
err := fn()
......@@ -275,7 +275,7 @@ func (p *Proxy) DescribeCollection(ctx context.Context, req *servicepb.Collectio
case <-ctx.Done():
return errors.New("create collection timeout")
default:
return p.taskSch.DdQueue.Enqueue(dct)
return p.sched.DdQueue.Enqueue(dct)
}
}
err := fn()
......@@ -319,7 +319,7 @@ func (p *Proxy) ShowCollections(ctx context.Context, req *commonpb.Empty) (*serv
case <-ctx.Done():
return errors.New("create collection timeout")
default:
return p.taskSch.DdQueue.Enqueue(sct)
return p.sched.DdQueue.Enqueue(sct)
}
}
err := fn()
......@@ -369,7 +369,7 @@ func (p *Proxy) CreatePartition(ctx context.Context, in *servicepb.PartitionName
case <-ctx.Done():
return errors.New("create partition timeout")
default:
return p.taskSch.DdQueue.Enqueue(cpt)
return p.sched.DdQueue.Enqueue(cpt)
}
}()
......@@ -415,7 +415,7 @@ func (p *Proxy) DropPartition(ctx context.Context, in *servicepb.PartitionName)
case <-ctx.Done():
return errors.New("drop partition timeout")
default:
return p.taskSch.DdQueue.Enqueue(dpt)
return p.sched.DdQueue.Enqueue(dpt)
}
}()
......@@ -461,7 +461,7 @@ func (p *Proxy) HasPartition(ctx context.Context, in *servicepb.PartitionName) (
case <-ctx.Done():
return errors.New("has partition timeout")
default:
return p.taskSch.DdQueue.Enqueue(hpt)
return p.sched.DdQueue.Enqueue(hpt)
}
}()
......@@ -513,7 +513,7 @@ func (p *Proxy) DescribePartition(ctx context.Context, in *servicepb.PartitionNa
case <-ctx.Done():
return errors.New("describe partion timeout")
default:
return p.taskSch.DdQueue.Enqueue(dpt)
return p.sched.DdQueue.Enqueue(dpt)
}
}()
......@@ -566,7 +566,7 @@ func (p *Proxy) ShowPartitions(ctx context.Context, req *servicepb.CollectionNam
case <-ctx.Done():
return errors.New("show partition timeout")
default:
return p.taskSch.DdQueue.Enqueue(spt)
return p.sched.DdQueue.Enqueue(spt)
}
}()
......
......@@ -12,7 +12,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
......@@ -29,7 +28,7 @@ type Proxy struct {
grpcServer *grpc.Server
masterConn *grpc.ClientConn
masterClient masterpb.MasterClient
taskSch *TaskScheduler
sched *TaskScheduler
tick *timeTick
idAllocator *allocator.IDAllocator
......@@ -38,7 +37,6 @@ type Proxy struct {
manipulationMsgStream *msgstream.PulsarMsgStream
queryMsgStream *msgstream.PulsarMsgStream
queryResultMsgStream *msgstream.PulsarMsgStream
// Add callback functions at different stages
startCallbacks []func()
......@@ -62,9 +60,6 @@ func CreateProxy(ctx context.Context) (*Proxy, error) {
bufSize := int64(1000)
manipulationChannels := []string{"manipulation"}
queryChannels := []string{"query"}
queryResultChannels := []string{"QueryResult"}
queryResultSubName := "QueryResultSubject"
unmarshal := msgstream.NewUnmarshalDispatcher()
p.manipulationMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, bufSize)
p.manipulationMsgStream.SetPulsarClient(pulsarAddress)
......@@ -74,13 +69,6 @@ func CreateProxy(ctx context.Context) (*Proxy, error) {
p.queryMsgStream.SetPulsarClient(pulsarAddress)
p.queryMsgStream.CreatePulsarProducers(queryChannels)
p.queryResultMsgStream = msgstream.NewPulsarMsgStream(p.proxyLoopCtx, bufSize)
p.queryResultMsgStream.SetPulsarClient(pulsarAddress)
p.queryResultMsgStream.CreatePulsarConsumers(queryResultChannels,
queryResultSubName,
unmarshal,
bufSize)
masterAddr := Params.MasterAddress()
idAllocator, err := allocator.NewIDAllocator(p.proxyLoopCtx, masterAddr)
......@@ -101,7 +89,7 @@ func CreateProxy(ctx context.Context) (*Proxy, error) {
}
p.segAssigner = segAssigner
p.taskSch, err = NewTaskScheduler(p.proxyLoopCtx, p.idAllocator, p.tsoAllocator)
p.sched, err = NewTaskScheduler(p.proxyLoopCtx, p.idAllocator, p.tsoAllocator)
if err != nil {
return nil, err
}
......@@ -122,17 +110,19 @@ func (p *Proxy) startProxy() error {
initGlobalMetaCache(p.proxyLoopCtx, p.masterClient, p.idAllocator, p.tsoAllocator)
p.manipulationMsgStream.Start()
p.queryMsgStream.Start()
p.queryResultMsgStream.Start()
p.taskSch.Start()
p.sched.Start()
p.idAllocator.Start()
p.tsoAllocator.Start()
p.segAssigner.Start()
// Run callbacks
// Start callbacks
for _, cb := range p.startCallbacks {
cb()
}
p.proxyLoopWg.Add(1)
go p.grpcLoop()
return nil
}
......@@ -173,65 +163,8 @@ func (p *Proxy) connectMaster() error {
return nil
}
func (p *Proxy) queryResultLoop() {
defer p.proxyLoopWg.Done()
defer p.proxyLoopCancel()
queryResultBuf := make(map[UniqueID][]*internalpb.SearchResult)
for {
select {
case msgPack, ok := <-p.queryResultMsgStream.Chan():
if !ok {
log.Print("buf chan closed")
return
}
if msgPack == nil {
continue
}
for _, tsMsg := range msgPack.Msgs {
searchResultMsg, _ := tsMsg.(*msgstream.SearchResultMsg)
reqID := searchResultMsg.GetReqID()
_, ok = queryResultBuf[reqID]
if !ok {
queryResultBuf[reqID] = make([]*internalpb.SearchResult, 0)
}
queryResultBuf[reqID] = append(queryResultBuf[reqID], &searchResultMsg.SearchResult)
if len(queryResultBuf[reqID]) == 4 {
// TODO: use the number of query node instead
t := p.taskSch.getTaskByReqID(reqID)
if t != nil {
qt, ok := t.(*QueryTask)
if ok {
log.Printf("address of query task: %p", qt)
qt.resultBuf <- queryResultBuf[reqID]
delete(queryResultBuf, reqID)
}
} else {
log.Printf("task with reqID %v is nil", reqID)
}
}
}
case <-p.proxyLoopCtx.Done():
log.Print("proxy server is closed ...")
return
}
}
}
func (p *Proxy) startProxyLoop() {
p.proxyLoopWg.Add(2)
go p.grpcLoop()
go p.queryResultLoop()
}
func (p *Proxy) Run() error {
if err := p.startProxy(); err != nil {
return err
}
p.startProxyLoop()
return nil
func (p *Proxy) Start() error {
return p.startProxy()
}
func (p *Proxy) stopProxyLoop() {
......@@ -246,14 +179,12 @@ func (p *Proxy) stopProxyLoop() {
p.segAssigner.Close()
p.taskSch.Close()
p.sched.Close()
p.manipulationMsgStream.Close()
p.queryMsgStream.Close()
p.queryResultMsgStream.Close()
p.proxyLoopWg.Wait()
}
......
......@@ -97,7 +97,7 @@ func startProxy(ctx context.Context) {
}
// TODO: change to wait until master is ready
if err := svr.Run(); err != nil {
if err := svr.Start(); err != nil {
log.Fatal("run proxy failed", zap.Error(err))
}
}
......
......@@ -8,6 +8,8 @@ import (
"sync"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
type TaskQueue interface {
......@@ -140,7 +142,7 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool {
queue.utLock.Lock()
defer queue.utLock.Unlock()
for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() {
if e.Value.(task).EndTs() >= ts {
if e.Value.(task).EndTs() < ts {
return false
}
}
......@@ -148,7 +150,7 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool {
queue.atLock.Lock()
defer queue.atLock.Unlock()
for ats := range queue.activeTasks {
if ats >= ts {
if ats < ts {
return false
}
}
......@@ -357,6 +359,68 @@ func (sched *TaskScheduler) queryLoop() {
}
}
func (sched *TaskScheduler) queryResultLoop() {
defer sched.wg.Done()
// TODO: use config instead
pulsarAddress := "pulsar://localhost:6650"
bufSize := int64(1000)
queryResultChannels := []string{"QueryResult"}
queryResultSubName := "QueryResultSubject"
unmarshal := msgstream.NewUnmarshalDispatcher()
queryResultMsgStream := msgstream.NewPulsarMsgStream(sched.ctx, bufSize)
queryResultMsgStream.SetPulsarClient(pulsarAddress)
queryResultMsgStream.CreatePulsarConsumers(queryResultChannels,
queryResultSubName,
unmarshal,
bufSize)
queryResultMsgStream.Start()
defer queryResultMsgStream.Close()
queryResultBuf := make(map[UniqueID][]*internalpb.SearchResult)
for {
select {
case msgPack, ok := <-queryResultMsgStream.Chan():
if !ok {
log.Print("buf chan closed")
return
}
if msgPack == nil {
continue
}
for _, tsMsg := range msgPack.Msgs {
searchResultMsg, _ := tsMsg.(*msgstream.SearchResultMsg)
reqID := searchResultMsg.GetReqID()
_, ok = queryResultBuf[reqID]
if !ok {
queryResultBuf[reqID] = make([]*internalpb.SearchResult, 0)
}
queryResultBuf[reqID] = append(queryResultBuf[reqID], &searchResultMsg.SearchResult)
if len(queryResultBuf[reqID]) == 4 {
// TODO: use the number of query node instead
t := sched.getTaskByReqID(reqID)
if t != nil {
qt, ok := t.(*QueryTask)
if ok {
log.Printf("address of query task: %p", qt)
qt.resultBuf <- queryResultBuf[reqID]
delete(queryResultBuf, reqID)
}
} else {
log.Printf("task with reqID %v is nil", reqID)
}
}
}
case <-sched.ctx.Done():
log.Print("proxy server is closed ...")
return
}
}
}
func (sched *TaskScheduler) Start() error {
sched.wg.Add(1)
go sched.definitionLoop()
......@@ -367,6 +431,9 @@ func (sched *TaskScheduler) Start() error {
sched.wg.Add(1)
go sched.queryLoop()
sched.wg.Add(1)
go sched.queryResultLoop()
return nil
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册