From 057563d93608bd8022ab171c543998a8e403ef1b Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Wed, 11 Nov 2020 17:53:54 +0800 Subject: [PATCH] Refactor master Signed-off-by: zhenshan.cao --- internal/master/id/id.go | 14 +- internal/master/id/id_test.go | 16 +- internal/master/master.go | 29 ++- internal/master/tso/global_allocator.go | 19 +- internal/master/tso/global_allocator_test.go | 15 +- internal/master/tso/tso.go | 29 ++- internal/proxy/grpc_service.go | 183 ++++++++----------- internal/proxy/proxy.go | 4 +- internal/proxy/task_scheduler.go | 120 +++++++----- internal/proxy/timetick.go | 1 - internal/proxy/timetick_test.go | 2 +- internal/util/tsoutil/tso.go | 19 ++ 12 files changed, 241 insertions(+), 210 deletions(-) diff --git a/internal/master/id/id.go b/internal/master/id/id.go index 4c4466e0a..8f584f3f2 100644 --- a/internal/master/id/id.go +++ b/internal/master/id/id.go @@ -1,13 +1,12 @@ - package id import ( "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/master/tso" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) - type UniqueID = typeutil.UniqueID // GlobalTSOAllocator is the global single point TSO allocator. @@ -17,13 +16,18 @@ type GlobalIdAllocator struct { var allocator *GlobalIdAllocator -func InitGlobalIdAllocator(key string, base kv.KVBase){ +func Init() { + InitGlobalIdAllocator("idTimestamp", tsoutil.NewTSOKVBase("gid")) +} + +func InitGlobalIdAllocator(key string, base kv.KVBase) { allocator = NewGlobalIdAllocator(key, base) + allocator.Initialize() } -func NewGlobalIdAllocator(key string, base kv.KVBase) * GlobalIdAllocator{ +func NewGlobalIdAllocator(key string, base kv.KVBase) *GlobalIdAllocator { return &GlobalIdAllocator{ - allocator: tso.NewGlobalTSOAllocator( key, base), + allocator: tso.NewGlobalTSOAllocator(key, base), } } diff --git a/internal/master/id/id_test.go b/internal/master/id/id_test.go index c80c75bc5..150af1d5c 100644 --- a/internal/master/id/id_test.go +++ b/internal/master/id/id_test.go @@ -1,17 +1,19 @@ package id import ( - "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/kv/mockkv" "os" - "testing" + + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/conf" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" ) var GIdAllocator *GlobalIdAllocator func TestMain(m *testing.M) { - GIdAllocator = NewGlobalIdAllocator("idTimestamp", mockkv.NewEtcdKV()) + conf.LoadConfig("config.yaml") + GIdAllocator = NewGlobalIdAllocator("idTimestamp", tsoutil.NewTSOKVBase("gid")) exitCode := m.Run() os.Exit(exitCode) } @@ -30,8 +32,8 @@ func TestGlobalIdAllocator_AllocOne(t *testing.T) { } func TestGlobalIdAllocator_Alloc(t *testing.T) { - count := uint32(2<<10) + count := uint32(2 << 10) idStart, idEnd, err := GIdAllocator.Alloc(count) assert.Nil(t, err) - assert.Equal(t, count, uint32(idEnd - idStart)) -} \ No newline at end of file + assert.Equal(t, count, uint32(idEnd-idStart)) +} diff --git a/internal/master/master.go b/internal/master/master.go index e45d0a0bf..1ad0320ff 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -6,15 +6,16 @@ import ( "log" "math/rand" "net" - "path" "strconv" "sync" "sync/atomic" "time" + "github.com/zilliztech/milvus-distributed/internal/master/id" + "github.com/zilliztech/milvus-distributed/internal/master/tso" + "github.com/apache/pulsar-client-go/pulsar" "github.com/golang/protobuf/proto" - "github.com/zilliztech/milvus-distributed/internal/master/id" "github.com/zilliztech/milvus-distributed/internal/conf" "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/master/controller" @@ -23,7 +24,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "google.golang.org/grpc" - "github.com/zilliztech/milvus-distributed/internal/master/tso" "go.etcd.io/etcd/clientv3" ) @@ -57,17 +57,6 @@ type Master struct { closeCallbacks []func() } -func newTSOKVBase(subPath string) * kv.EtcdKV{ - etcdAddr := conf.Config.Etcd.Address - etcdAddr += ":" - etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) - client, _ := clientv3.New(clientv3.Config{ - Endpoints: []string{etcdAddr}, - DialTimeout: 5 * time.Second, - }) - return kv.NewEtcdKV(client, path.Join(conf.Config.Etcd.Rootpath, subPath)) -} - func newKVBase() *kv.EtcdKV { etcdAddr := conf.Config.Etcd.Address etcdAddr += ":" @@ -80,11 +69,15 @@ func newKVBase() *kv.EtcdKV { return kvBase } +func Init() { + rand.Seed(time.Now().UnixNano()) + id.Init() + tso.Init() +} + // CreateServer creates the UNINITIALIZED pd server with given configuration. func CreateServer(ctx context.Context) (*Master, error) { - rand.Seed(time.Now().UnixNano()) - id.InitGlobalIdAllocator("idTimestamp", newTSOKVBase("gid")) - tso.InitGlobalTsoAllocator("timestamp", newTSOKVBase("tso")) + Init() m := &Master{ ctx: ctx, startTimestamp: time.Now().Unix(), @@ -179,7 +172,7 @@ func (s *Master) startServerLoop(ctx context.Context) { } func (s *Master) stopServerLoop() { - if s.grpcServer != nil{ + if s.grpcServer != nil { s.grpcServer.GracefulStop() } s.serverLoopCancel() diff --git a/internal/master/tso/global_allocator.go b/internal/master/tso/global_allocator.go index a8ea4f2c3..921dd94dd 100644 --- a/internal/master/tso/global_allocator.go +++ b/internal/master/tso/global_allocator.go @@ -38,10 +38,14 @@ type GlobalTSOAllocator struct { var allocator *GlobalTSOAllocator -func InitGlobalTsoAllocator(key string, base kv.KVBase){ - allocator = NewGlobalTSOAllocator(key, base) +func Init() { + InitGlobalTsoAllocator("timestamp", tsoutil.NewTSOKVBase("tso")) } +func InitGlobalTsoAllocator(key string, base kv.KVBase) { + allocator = NewGlobalTSOAllocator(key, base) + allocator.Initialize() +} // NewGlobalTSOAllocator creates a new global TSO allocator. func NewGlobalTSOAllocator(key string, kvBase kv.KVBase) *GlobalTSOAllocator { @@ -59,7 +63,7 @@ func NewGlobalTSOAllocator(key string, kvBase kv.KVBase) *GlobalTSOAllocator { // Initialize will initialize the created global TSO allocator. func (gta *GlobalTSOAllocator) Initialize() error { - return gta.tso.SyncTimestamp() + return gta.tso.InitTimestamp() } // UpdateTSO is used to update the TSO in memory and the time window in etcd. @@ -104,7 +108,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) { return 0, errors.New("can not get timestamp") } -func (gta *GlobalTSOAllocator) Alloc(count uint32)(typeutil.Timestamp, error) { +func (gta *GlobalTSOAllocator) Alloc(count uint32) (typeutil.Timestamp, error) { //return gta.tso.SyncTimestamp() start, err := gta.GenerateTSO(count) if err != nil { @@ -117,21 +121,20 @@ func (gta *GlobalTSOAllocator) Alloc(count uint32)(typeutil.Timestamp, error) { return start, err } -func (gta *GlobalTSOAllocator) AllocOne()(typeutil.Timestamp, error) { +func (gta *GlobalTSOAllocator) AllocOne() (typeutil.Timestamp, error) { return gta.GenerateTSO(1) } - // Reset is used to reset the TSO allocator. func (gta *GlobalTSOAllocator) Reset() { gta.tso.ResetTimestamp() } -func AllocOne()(typeutil.Timestamp, error) { +func AllocOne() (typeutil.Timestamp, error) { return allocator.AllocOne() } // Reset is used to reset the TSO allocator. -func Alloc(count uint32)(typeutil.Timestamp, error) { +func Alloc(count uint32) (typeutil.Timestamp, error) { return allocator.Alloc(count) } diff --git a/internal/master/tso/global_allocator_test.go b/internal/master/tso/global_allocator_test.go index 91d6459d7..3b4634ed1 100644 --- a/internal/master/tso/global_allocator_test.go +++ b/internal/master/tso/global_allocator_test.go @@ -1,18 +1,21 @@ package tso import ( - "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/kv/mockkv" - "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "os" "testing" "time" + + "github.com/stretchr/testify/assert" + "github.com/zilliztech/milvus-distributed/internal/conf" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" ) var GTsoAllocator Allocator func TestMain(m *testing.M) { - GTsoAllocator = NewGlobalTSOAllocator("timestamp", mockkv.NewEtcdKV()) + conf.LoadConfig("config.yaml") + GTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase("tso")) + exitCode := m.Run() os.Exit(exitCode) } @@ -28,7 +31,7 @@ func TestGlobalTSOAllocator_GenerateTSO(t *testing.T) { startTs, err := GTsoAllocator.GenerateTSO(perCount) assert.Nil(t, err) lastPhysical, lastLogical := tsoutil.ParseTS(startTs) - for i:=0;i < count; i++{ + for i := 0; i < count; i++ { ts, _ := GTsoAllocator.GenerateTSO(perCount) physical, logical := tsoutil.ParseTS(ts) if lastPhysical == physical { @@ -41,7 +44,7 @@ func TestGlobalTSOAllocator_GenerateTSO(t *testing.T) { func TestGlobalTSOAllocator_SetTSO(t *testing.T) { curTime := time.Now() - nextTime := curTime.Add(2 * time.Second ) + nextTime := curTime.Add(2 * time.Second) physical := nextTime.UnixNano() / int64(time.Millisecond) logical := int64(0) err := GTsoAllocator.SetTSO(tsoutil.ComposeTS(physical, logical)) diff --git a/internal/master/tso/tso.go b/internal/master/tso/tso.go index 111e98698..f66709a02 100644 --- a/internal/master/tso/tso.go +++ b/internal/master/tso/tso.go @@ -46,8 +46,8 @@ type atomicObject struct { // timestampOracle is used to maintain the logic of tso. type timestampOracle struct { - key string - kvBase kv.KVBase + key string + kvBase kv.KVBase // TODO: remove saveInterval saveInterval time.Duration @@ -83,28 +83,27 @@ func (t *timestampOracle) saveTimestamp(ts time.Time) error { return nil } -// SyncTimestamp is used to synchronize the timestamp. -func (t *timestampOracle) SyncTimestamp() error { +func (t *timestampOracle) InitTimestamp() error { - last, err := t.loadTimestamp() - if err != nil { - return err - } + //last, err := t.loadTimestamp() + //if err != nil { + // return err + //} next := time.Now() // If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`, // the timestamp allocation will start from the saved etcd timestamp temporarily. - if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard { - next = last.Add(updateTimestampGuard) - } + //if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard { + // next = last.Add(updateTimestampGuard) + //} save := next.Add(t.saveInterval) - if err = t.saveTimestamp(save); err != nil { + if err := t.saveTimestamp(save); err != nil { return err } - log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next)) + //log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next)) current := &atomicObject{ physical: next, @@ -156,7 +155,7 @@ func (t *timestampOracle) UpdateTimestamp() error { now := time.Now() jetLag := typeutil.SubTimeByWallClock(now, prev.physical) - if jetLag > 3 * UpdateTimestampStep { + if jetLag > 3*UpdateTimestampStep { log.Print("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now)) } @@ -197,7 +196,7 @@ func (t *timestampOracle) UpdateTimestamp() error { // ResetTimestamp is used to reset the timestamp. func (t *timestampOracle) ResetTimestamp() { zero := &atomicObject{ - physical: typeutil.ZeroTime, + physical: time.Now(), } atomic.StorePointer(&t.TSO, unsafe.Pointer(zero)) } diff --git a/internal/proxy/grpc_service.go b/internal/proxy/grpc_service.go index 877136a72..08d818d0d 100644 --- a/internal/proxy/grpc_service.go +++ b/internal/proxy/grpc_service.go @@ -35,21 +35,18 @@ func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb. defer it.cancel() - var t task = it - p.taskSch.DmQueue.Enqueue(&t) - for { - select { - case <-ctx.Done(): - log.Print("insert timeout!") - return &servicepb.IntegerRangeResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "insert timeout!", - }, - }, errors.New("insert timeout!") - case result := <-it.resultChan: - return result, nil - } + p.taskSch.DmQueue.Enqueue(it) + select { + case <-ctx.Done(): + log.Print("insert timeout!") + return &servicepb.IntegerRangeResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "insert timeout!", + }, + }, errors.New("insert timeout!") + case result := <-it.resultChan: + return result, nil } } @@ -69,19 +66,16 @@ func (p *Proxy) CreateCollection(ctx context.Context, req *schemapb.CollectionSc cct.ctx, cct.cancel = context.WithCancel(ctx) defer cct.cancel() - var t task = cct - p.taskSch.DdQueue.Enqueue(&t) - for { - select { - case <-ctx.Done(): - log.Print("create collection timeout!") - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "create collection timeout!", - }, errors.New("create collection timeout!") - case result := <-cct.resultChan: - return result, nil - } + p.taskSch.DdQueue.Enqueue(cct) + select { + case <-ctx.Done(): + log.Print("create collection timeout!") + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "create collection timeout!", + }, errors.New("create collection timeout!") + case result := <-cct.resultChan: + return result, nil } } @@ -102,21 +96,18 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu qt.SearchRequest.Query.Value = queryBytes defer qt.cancel() - var t task = qt - p.taskSch.DqQueue.Enqueue(&t) - for { - select { - case <-ctx.Done(): - log.Print("query timeout!") - return &servicepb.QueryResult{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "query timeout!", - }, - }, errors.New("query timeout!") - case result := <-qt.resultChan: - return result, nil - } + p.taskSch.DqQueue.Enqueue(qt) + select { + case <-ctx.Done(): + log.Print("query timeout!") + return &servicepb.QueryResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "query timeout!", + }, + }, errors.New("query timeout!") + case result := <-qt.resultChan: + return result, nil } } @@ -134,19 +125,16 @@ func (p *Proxy) DropCollection(ctx context.Context, req *servicepb.CollectionNam dct.ctx, dct.cancel = context.WithCancel(ctx) defer dct.cancel() - var t task = dct - p.taskSch.DdQueue.Enqueue(&t) - for { - select { - case <-ctx.Done(): - log.Print("create collection timeout!") - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "create collection timeout!", - }, errors.New("create collection timeout!") - case result := <-dct.resultChan: - return result, nil - } + p.taskSch.DdQueue.Enqueue(dct) + select { + case <-ctx.Done(): + log.Print("create collection timeout!") + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "create collection timeout!", + }, errors.New("create collection timeout!") + case result := <-dct.resultChan: + return result, nil } } @@ -164,22 +152,19 @@ func (p *Proxy) HasCollection(ctx context.Context, req *servicepb.CollectionName hct.ctx, hct.cancel = context.WithCancel(ctx) defer hct.cancel() - var t task = hct - p.taskSch.DqQueue.Enqueue(&t) - for { - select { - case <-ctx.Done(): - log.Print("has collection timeout!") - return &servicepb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "has collection timeout!", - }, - Value: false, - }, errors.New("has collection timeout!") - case result := <-hct.resultChan: - return result, nil - } + p.taskSch.DqQueue.Enqueue(hct) + select { + case <-ctx.Done(): + log.Print("has collection timeout!") + return &servicepb.BoolResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "has collection timeout!", + }, + Value: false, + }, errors.New("has collection timeout!") + case result := <-hct.resultChan: + return result, nil } } @@ -197,21 +182,18 @@ func (p *Proxy) DescribeCollection(ctx context.Context, req *servicepb.Collectio dct.ctx, dct.cancel = context.WithCancel(ctx) defer dct.cancel() - var t task = dct - p.taskSch.DqQueue.Enqueue(&t) - for { - select { - case <-ctx.Done(): - log.Print("has collection timeout!") - return &servicepb.CollectionDescription{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "describe collection timeout!", - }, - }, errors.New("describe collection timeout!") - case result := <-dct.resultChan: - return result, nil - } + p.taskSch.DqQueue.Enqueue(dct) + select { + case <-ctx.Done(): + log.Print("has collection timeout!") + return &servicepb.CollectionDescription{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "describe collection timeout!", + }, + }, errors.New("describe collection timeout!") + case result := <-dct.resultChan: + return result, nil } } @@ -228,21 +210,18 @@ func (p *Proxy) ShowCollections(ctx context.Context, req *commonpb.Empty) (*serv sct.ctx, sct.cancel = context.WithCancel(ctx) defer sct.cancel() - var t task = sct - p.taskSch.DqQueue.Enqueue(&t) - for { - select { - case <-ctx.Done(): - log.Print("show collections timeout!") - return &servicepb.StringListResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: "show collections timeout!", - }, - }, errors.New("show collections timeout!") - case result := <-sct.resultChan: - return result, nil - } + p.taskSch.DqQueue.Enqueue(sct) + select { + case <-ctx.Done(): + log.Print("show collections timeout!") + return &servicepb.StringListResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "show collections timeout!", + }, + }, errors.New("show collections timeout!") + case result := <-sct.resultChan: + return result, nil } } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 7ede1b8d2..9acc93a33 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -2,6 +2,7 @@ package proxy import ( "context" + "google.golang.org/grpc" "log" "math/rand" "net" @@ -14,7 +15,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "google.golang.org/grpc" ) type UniqueID = typeutil.UniqueID @@ -157,7 +157,7 @@ func (p *Proxy) queryResultLoop() { if len(queryResultBuf[reqId]) == 4 { // TODO: use the number of query node instead t := p.taskSch.getTaskByReqId(reqId) - qt := (*t).(*QueryTask) + qt := t.(*QueryTask) qt.resultBuf <- queryResultBuf[reqId] delete(queryResultBuf, reqId) } diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index 361733468..dcf1335ad 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -11,7 +11,7 @@ import ( type BaseTaskQueue struct { unissuedTasks *list.List - activeTasks map[Timestamp]*task + activeTasks map[Timestamp]task utLock sync.Mutex atLock sync.Mutex } @@ -24,23 +24,23 @@ func (queue *BaseTaskQueue) Empty() bool { return queue.unissuedTasks.Len() <= 0 && len(queue.activeTasks) <= 0 } -func (queue *BaseTaskQueue) AddUnissuedTask(t *task) { +func (queue *BaseTaskQueue) AddUnissuedTask(t task) { queue.utLock.Lock() defer queue.utLock.Unlock() queue.unissuedTasks.PushBack(t) } -func (queue *BaseTaskQueue) FrontUnissuedTask() *task { +func (queue *BaseTaskQueue) FrontUnissuedTask() task { queue.utLock.Lock() defer queue.utLock.Unlock() if queue.unissuedTasks.Len() <= 0 { log.Fatal("sorry, but the unissued task list is empty!") return nil } - return queue.unissuedTasks.Front().Value.(*task) + return queue.unissuedTasks.Front().Value.(task) } -func (queue *BaseTaskQueue) PopUnissuedTask() *task { +func (queue *BaseTaskQueue) PopUnissuedTask() task { queue.utLock.Lock() defer queue.utLock.Unlock() if queue.unissuedTasks.Len() <= 0 { @@ -48,13 +48,13 @@ func (queue *BaseTaskQueue) PopUnissuedTask() *task { return nil } ft := queue.unissuedTasks.Front() - return queue.unissuedTasks.Remove(ft).(*task) + return queue.unissuedTasks.Remove(ft).(task) } -func (queue *BaseTaskQueue) AddActiveTask(t *task) { +func (queue *BaseTaskQueue) AddActiveTask(t task) { queue.atLock.Lock() defer queue.atLock.Lock() - ts := (*t).EndTs() + ts := t.EndTs() _, ok := queue.activeTasks[ts] if ok { log.Fatalf("task with timestamp %v already in active task list!", ts) @@ -62,7 +62,7 @@ func (queue *BaseTaskQueue) AddActiveTask(t *task) { queue.activeTasks[ts] = t } -func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) *task { +func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) task { queue.atLock.Lock() defer queue.atLock.Lock() t, ok := queue.activeTasks[ts] @@ -74,19 +74,19 @@ func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) *task { return nil } -func (queue *BaseTaskQueue) getTaskByReqId(reqId UniqueID) *task { +func (queue *BaseTaskQueue) getTaskByReqId(reqId UniqueID) task { queue.utLock.Lock() defer queue.utLock.Lock() for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() { - if (*(e.Value.(*task))).Id() == reqId { - return e.Value.(*task) + if e.Value.(task).Id() == reqId { + return e.Value.(task) } } queue.atLock.Lock() defer queue.atLock.Unlock() for ats := range queue.activeTasks { - if (*(queue.activeTasks[ats])).Id() == reqId { + if queue.activeTasks[ats].Id() == reqId { return queue.activeTasks[ats] } } @@ -98,7 +98,7 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool { queue.utLock.Lock() defer queue.utLock.Unlock() for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() { - if (*(e.Value.(*task))).EndTs() >= ts { + if e.Value.(task).EndTs() >= ts { return false } } @@ -114,20 +114,20 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool { return true } -type ddTaskQueue struct { +type DdTaskQueue struct { BaseTaskQueue lock sync.Mutex } -type dmTaskQueue struct { +type DmTaskQueue struct { BaseTaskQueue } -type dqTaskQueue struct { +type DqTaskQueue struct { BaseTaskQueue } -func (queue *ddTaskQueue) Enqueue(t *task) error { +func (queue *DdTaskQueue) Enqueue(t task) error { queue.lock.Lock() defer queue.lock.Unlock() // TODO: set Ts, ReqId, ProxyId @@ -135,22 +135,49 @@ func (queue *ddTaskQueue) Enqueue(t *task) error { return nil } -func (queue *dmTaskQueue) Enqueue(t *task) error { +func (queue *DmTaskQueue) Enqueue(t task) error { // TODO: set Ts, ReqId, ProxyId queue.AddUnissuedTask(t) return nil } -func (queue *dqTaskQueue) Enqueue(t *task) error { +func (queue *DqTaskQueue) Enqueue(t task) error { // TODO: set Ts, ReqId, ProxyId queue.AddUnissuedTask(t) return nil } +func NewDdTaskQueue() *DdTaskQueue { + return &DdTaskQueue{ + BaseTaskQueue: BaseTaskQueue{ + unissuedTasks: list.New(), + activeTasks: make(map[Timestamp]task), + }, + } +} + +func NewDmTaskQueue() *DmTaskQueue { + return &DmTaskQueue{ + BaseTaskQueue: BaseTaskQueue{ + unissuedTasks: list.New(), + activeTasks: make(map[Timestamp]task), + }, + } +} + +func NewDqTaskQueue() *DqTaskQueue { + return &DqTaskQueue{ + BaseTaskQueue: BaseTaskQueue{ + unissuedTasks: list.New(), + activeTasks: make(map[Timestamp]task), + }, + } +} + type TaskScheduler struct { - DdQueue *ddTaskQueue - DmQueue *dmTaskQueue - DqQueue *dqTaskQueue + DdQueue *DdTaskQueue + DmQueue *DmTaskQueue + DqQueue *DqTaskQueue idAllocator *allocator.IdAllocator tsoAllocator *allocator.TimestampAllocator @@ -165,6 +192,9 @@ func NewTaskScheduler(ctx context.Context, tsoAllocator *allocator.TimestampAllocator) (*TaskScheduler, error) { ctx1, cancel := context.WithCancel(ctx) s := &TaskScheduler{ + DdQueue: NewDdTaskQueue(), + DmQueue: NewDmTaskQueue(), + DqQueue: NewDqTaskQueue(), idAllocator: idAllocator, tsoAllocator: tsoAllocator, ctx: ctx1, @@ -174,19 +204,19 @@ func NewTaskScheduler(ctx context.Context, return s, nil } -func (sched *TaskScheduler) scheduleDdTask() *task { +func (sched *TaskScheduler) scheduleDdTask() task { return sched.DdQueue.PopUnissuedTask() } -func (sched *TaskScheduler) scheduleDmTask() *task { +func (sched *TaskScheduler) scheduleDmTask() task { return sched.DmQueue.PopUnissuedTask() } -func (sched *TaskScheduler) scheduleDqTask() *task { +func (sched *TaskScheduler) scheduleDqTask() task { return sched.DqQueue.PopUnissuedTask() } -func (sched *TaskScheduler) getTaskByReqId(reqId UniqueID) *task { +func (sched *TaskScheduler) getTaskByReqId(reqId UniqueID) task { if t := sched.DdQueue.getTaskByReqId(reqId); t != nil { return t } @@ -211,22 +241,22 @@ func (sched *TaskScheduler) definitionLoop() { //sched.DdQueue.atLock.Lock() t := sched.scheduleDdTask() - err := (*t).PreExecute() + err := t.PreExecute() if err != nil { return } - err = (*t).Execute() + err = t.Execute() if err != nil { log.Printf("execute definition task failed, error = %v", err) } - (*t).Notify(err) + t.Notify(err) sched.DdQueue.AddActiveTask(t) - (*t).WaitToFinish() - (*t).PostExecute() + t.WaitToFinish() + t.PostExecute() - sched.DdQueue.PopActiveTask((*t).EndTs()) + sched.DdQueue.PopActiveTask(t.EndTs()) } } @@ -242,27 +272,27 @@ func (sched *TaskScheduler) manipulationLoop() { sched.DmQueue.atLock.Lock() t := sched.scheduleDmTask() - if err := (*t).PreExecute(); err != nil { + if err := t.PreExecute(); err != nil { return } go func() { - err := (*t).Execute() + err := t.Execute() if err != nil { log.Printf("execute manipulation task failed, error = %v", err) } - (*t).Notify(err) + t.Notify(err) }() sched.DmQueue.AddActiveTask(t) sched.DmQueue.atLock.Unlock() go func() { - (*t).WaitToFinish() - (*t).PostExecute() + t.WaitToFinish() + t.PostExecute() // remove from active list - sched.DmQueue.PopActiveTask((*t).EndTs()) + sched.DmQueue.PopActiveTask(t.EndTs()) }() } } @@ -279,27 +309,27 @@ func (sched *TaskScheduler) queryLoop() { sched.DqQueue.atLock.Lock() t := sched.scheduleDqTask() - if err := (*t).PreExecute(); err != nil { + if err := t.PreExecute(); err != nil { return } go func() { - err := (*t).Execute() + err := t.Execute() if err != nil { log.Printf("execute query task failed, error = %v", err) } - (*t).Notify(err) + t.Notify(err) }() sched.DqQueue.AddActiveTask(t) sched.DqQueue.atLock.Unlock() go func() { - (*t).WaitToFinish() - (*t).PostExecute() + t.WaitToFinish() + t.PostExecute() // remove from active list - sched.DqQueue.PopActiveTask((*t).EndTs()) + sched.DqQueue.PopActiveTask(t.EndTs()) }() } } diff --git a/internal/proxy/timetick.go b/internal/proxy/timetick.go index 6269940b2..3778c2505 100644 --- a/internal/proxy/timetick.go +++ b/internal/proxy/timetick.go @@ -51,7 +51,6 @@ func newTimeTick(ctx context.Context, tsoAllocator *allocator.TimestampAllocator return t } - func (tt *timeTick) tick() error { if tt.lastTick == tt.currentTick { diff --git a/internal/proxy/timetick_test.go b/internal/proxy/timetick_test.go index edaa4bd5b..e159188c7 100644 --- a/internal/proxy/timetick_test.go +++ b/internal/proxy/timetick_test.go @@ -33,7 +33,7 @@ func TestTimeTick(t *testing.T) { tt := timeTick{ interval: 200, pulsarProducer: producer, - peerID: 1, + peerID: 1, ctx: ctx, areRequestsDelivered: func(ts Timestamp) bool { return true }, } diff --git a/internal/util/tsoutil/tso.go b/internal/util/tsoutil/tso.go index 625c16635..c1e3b3491 100644 --- a/internal/util/tsoutil/tso.go +++ b/internal/util/tsoutil/tso.go @@ -1,7 +1,14 @@ package tsoutil import ( + "fmt" + "path" + "strconv" "time" + + "github.com/zilliztech/milvus-distributed/internal/conf" + "github.com/zilliztech/milvus-distributed/internal/kv" + "go.etcd.io/etcd/clientv3" ) const ( @@ -20,3 +27,15 @@ func ParseTS(ts uint64) (time.Time, uint64) { physicalTime := time.Unix(int64(physical/1000), int64(physical)%1000*time.Millisecond.Nanoseconds()) return physicalTime, logical } + +func NewTSOKVBase(subPath string) *kv.EtcdKV { + etcdAddr := conf.Config.Etcd.Address + etcdAddr += ":" + etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10) + fmt.Println("etcdAddr ::: ", etcdAddr) + client, _ := clientv3.New(clientv3.Config{ + Endpoints: []string{etcdAddr}, + DialTimeout: 5 * time.Second, + }) + return kv.NewEtcdKV(client, path.Join(conf.Config.Etcd.Rootpath, subPath)) +} -- GitLab