提交 057563d9 编写于 作者: Z zhenshan.cao 提交者: yefu.chen

Refactor master

Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 a2289d3c
package id package id
import ( import (
"github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/master/tso" "github.com/zilliztech/milvus-distributed/internal/master/tso"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
) )
type UniqueID = typeutil.UniqueID type UniqueID = typeutil.UniqueID
// GlobalTSOAllocator is the global single point TSO allocator. // GlobalTSOAllocator is the global single point TSO allocator.
...@@ -17,13 +16,18 @@ type GlobalIdAllocator struct { ...@@ -17,13 +16,18 @@ type GlobalIdAllocator struct {
var allocator *GlobalIdAllocator var allocator *GlobalIdAllocator
func InitGlobalIdAllocator(key string, base kv.KVBase){ func Init() {
InitGlobalIdAllocator("idTimestamp", tsoutil.NewTSOKVBase("gid"))
}
func InitGlobalIdAllocator(key string, base kv.KVBase) {
allocator = NewGlobalIdAllocator(key, base) allocator = NewGlobalIdAllocator(key, base)
allocator.Initialize()
} }
func NewGlobalIdAllocator(key string, base kv.KVBase) * GlobalIdAllocator{ func NewGlobalIdAllocator(key string, base kv.KVBase) *GlobalIdAllocator {
return &GlobalIdAllocator{ return &GlobalIdAllocator{
allocator: tso.NewGlobalTSOAllocator( key, base), allocator: tso.NewGlobalTSOAllocator(key, base),
} }
} }
......
package id package id
import ( import (
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/kv/mockkv"
"os" "os"
"testing" "testing"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
) )
var GIdAllocator *GlobalIdAllocator var GIdAllocator *GlobalIdAllocator
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
GIdAllocator = NewGlobalIdAllocator("idTimestamp", mockkv.NewEtcdKV()) conf.LoadConfig("config.yaml")
GIdAllocator = NewGlobalIdAllocator("idTimestamp", tsoutil.NewTSOKVBase("gid"))
exitCode := m.Run() exitCode := m.Run()
os.Exit(exitCode) os.Exit(exitCode)
} }
...@@ -30,8 +32,8 @@ func TestGlobalIdAllocator_AllocOne(t *testing.T) { ...@@ -30,8 +32,8 @@ func TestGlobalIdAllocator_AllocOne(t *testing.T) {
} }
func TestGlobalIdAllocator_Alloc(t *testing.T) { func TestGlobalIdAllocator_Alloc(t *testing.T) {
count := uint32(2<<10) count := uint32(2 << 10)
idStart, idEnd, err := GIdAllocator.Alloc(count) idStart, idEnd, err := GIdAllocator.Alloc(count)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, count, uint32(idEnd - idStart)) assert.Equal(t, count, uint32(idEnd-idStart))
} }
\ No newline at end of file
...@@ -6,15 +6,16 @@ import ( ...@@ -6,15 +6,16 @@ import (
"log" "log"
"math/rand" "math/rand"
"net" "net"
"path"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/zilliztech/milvus-distributed/internal/master/id"
"github.com/zilliztech/milvus-distributed/internal/master/tso"
"github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/master/id"
"github.com/zilliztech/milvus-distributed/internal/conf" "github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/master/controller" "github.com/zilliztech/milvus-distributed/internal/master/controller"
...@@ -23,7 +24,6 @@ import ( ...@@ -23,7 +24,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/master/tso"
"go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3"
) )
...@@ -57,17 +57,6 @@ type Master struct { ...@@ -57,17 +57,6 @@ type Master struct {
closeCallbacks []func() closeCallbacks []func()
} }
func newTSOKVBase(subPath string) * kv.EtcdKV{
etcdAddr := conf.Config.Etcd.Address
etcdAddr += ":"
etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10)
client, _ := clientv3.New(clientv3.Config{
Endpoints: []string{etcdAddr},
DialTimeout: 5 * time.Second,
})
return kv.NewEtcdKV(client, path.Join(conf.Config.Etcd.Rootpath, subPath))
}
func newKVBase() *kv.EtcdKV { func newKVBase() *kv.EtcdKV {
etcdAddr := conf.Config.Etcd.Address etcdAddr := conf.Config.Etcd.Address
etcdAddr += ":" etcdAddr += ":"
...@@ -80,11 +69,15 @@ func newKVBase() *kv.EtcdKV { ...@@ -80,11 +69,15 @@ func newKVBase() *kv.EtcdKV {
return kvBase return kvBase
} }
func Init() {
rand.Seed(time.Now().UnixNano())
id.Init()
tso.Init()
}
// CreateServer creates the UNINITIALIZED pd server with given configuration. // CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context) (*Master, error) { func CreateServer(ctx context.Context) (*Master, error) {
rand.Seed(time.Now().UnixNano()) Init()
id.InitGlobalIdAllocator("idTimestamp", newTSOKVBase("gid"))
tso.InitGlobalTsoAllocator("timestamp", newTSOKVBase("tso"))
m := &Master{ m := &Master{
ctx: ctx, ctx: ctx,
startTimestamp: time.Now().Unix(), startTimestamp: time.Now().Unix(),
...@@ -179,7 +172,7 @@ func (s *Master) startServerLoop(ctx context.Context) { ...@@ -179,7 +172,7 @@ func (s *Master) startServerLoop(ctx context.Context) {
} }
func (s *Master) stopServerLoop() { func (s *Master) stopServerLoop() {
if s.grpcServer != nil{ if s.grpcServer != nil {
s.grpcServer.GracefulStop() s.grpcServer.GracefulStop()
} }
s.serverLoopCancel() s.serverLoopCancel()
......
...@@ -38,10 +38,14 @@ type GlobalTSOAllocator struct { ...@@ -38,10 +38,14 @@ type GlobalTSOAllocator struct {
var allocator *GlobalTSOAllocator var allocator *GlobalTSOAllocator
func InitGlobalTsoAllocator(key string, base kv.KVBase){ func Init() {
allocator = NewGlobalTSOAllocator(key, base) InitGlobalTsoAllocator("timestamp", tsoutil.NewTSOKVBase("tso"))
} }
func InitGlobalTsoAllocator(key string, base kv.KVBase) {
allocator = NewGlobalTSOAllocator(key, base)
allocator.Initialize()
}
// NewGlobalTSOAllocator creates a new global TSO allocator. // NewGlobalTSOAllocator creates a new global TSO allocator.
func NewGlobalTSOAllocator(key string, kvBase kv.KVBase) *GlobalTSOAllocator { func NewGlobalTSOAllocator(key string, kvBase kv.KVBase) *GlobalTSOAllocator {
...@@ -59,7 +63,7 @@ func NewGlobalTSOAllocator(key string, kvBase kv.KVBase) *GlobalTSOAllocator { ...@@ -59,7 +63,7 @@ func NewGlobalTSOAllocator(key string, kvBase kv.KVBase) *GlobalTSOAllocator {
// Initialize will initialize the created global TSO allocator. // Initialize will initialize the created global TSO allocator.
func (gta *GlobalTSOAllocator) Initialize() error { func (gta *GlobalTSOAllocator) Initialize() error {
return gta.tso.SyncTimestamp() return gta.tso.InitTimestamp()
} }
// UpdateTSO is used to update the TSO in memory and the time window in etcd. // UpdateTSO is used to update the TSO in memory and the time window in etcd.
...@@ -104,7 +108,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) { ...@@ -104,7 +108,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) {
return 0, errors.New("can not get timestamp") return 0, errors.New("can not get timestamp")
} }
func (gta *GlobalTSOAllocator) Alloc(count uint32)(typeutil.Timestamp, error) { func (gta *GlobalTSOAllocator) Alloc(count uint32) (typeutil.Timestamp, error) {
//return gta.tso.SyncTimestamp() //return gta.tso.SyncTimestamp()
start, err := gta.GenerateTSO(count) start, err := gta.GenerateTSO(count)
if err != nil { if err != nil {
...@@ -117,21 +121,20 @@ func (gta *GlobalTSOAllocator) Alloc(count uint32)(typeutil.Timestamp, error) { ...@@ -117,21 +121,20 @@ func (gta *GlobalTSOAllocator) Alloc(count uint32)(typeutil.Timestamp, error) {
return start, err return start, err
} }
func (gta *GlobalTSOAllocator) AllocOne()(typeutil.Timestamp, error) { func (gta *GlobalTSOAllocator) AllocOne() (typeutil.Timestamp, error) {
return gta.GenerateTSO(1) return gta.GenerateTSO(1)
} }
// Reset is used to reset the TSO allocator. // Reset is used to reset the TSO allocator.
func (gta *GlobalTSOAllocator) Reset() { func (gta *GlobalTSOAllocator) Reset() {
gta.tso.ResetTimestamp() gta.tso.ResetTimestamp()
} }
func AllocOne()(typeutil.Timestamp, error) { func AllocOne() (typeutil.Timestamp, error) {
return allocator.AllocOne() return allocator.AllocOne()
} }
// Reset is used to reset the TSO allocator. // Reset is used to reset the TSO allocator.
func Alloc(count uint32)(typeutil.Timestamp, error) { func Alloc(count uint32) (typeutil.Timestamp, error) {
return allocator.Alloc(count) return allocator.Alloc(count)
} }
package tso package tso
import ( import (
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/kv/mockkv"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
"os" "os"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
) )
var GTsoAllocator Allocator var GTsoAllocator Allocator
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
GTsoAllocator = NewGlobalTSOAllocator("timestamp", mockkv.NewEtcdKV()) conf.LoadConfig("config.yaml")
GTsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase("tso"))
exitCode := m.Run() exitCode := m.Run()
os.Exit(exitCode) os.Exit(exitCode)
} }
...@@ -28,7 +31,7 @@ func TestGlobalTSOAllocator_GenerateTSO(t *testing.T) { ...@@ -28,7 +31,7 @@ func TestGlobalTSOAllocator_GenerateTSO(t *testing.T) {
startTs, err := GTsoAllocator.GenerateTSO(perCount) startTs, err := GTsoAllocator.GenerateTSO(perCount)
assert.Nil(t, err) assert.Nil(t, err)
lastPhysical, lastLogical := tsoutil.ParseTS(startTs) lastPhysical, lastLogical := tsoutil.ParseTS(startTs)
for i:=0;i < count; i++{ for i := 0; i < count; i++ {
ts, _ := GTsoAllocator.GenerateTSO(perCount) ts, _ := GTsoAllocator.GenerateTSO(perCount)
physical, logical := tsoutil.ParseTS(ts) physical, logical := tsoutil.ParseTS(ts)
if lastPhysical == physical { if lastPhysical == physical {
...@@ -41,7 +44,7 @@ func TestGlobalTSOAllocator_GenerateTSO(t *testing.T) { ...@@ -41,7 +44,7 @@ func TestGlobalTSOAllocator_GenerateTSO(t *testing.T) {
func TestGlobalTSOAllocator_SetTSO(t *testing.T) { func TestGlobalTSOAllocator_SetTSO(t *testing.T) {
curTime := time.Now() curTime := time.Now()
nextTime := curTime.Add(2 * time.Second ) nextTime := curTime.Add(2 * time.Second)
physical := nextTime.UnixNano() / int64(time.Millisecond) physical := nextTime.UnixNano() / int64(time.Millisecond)
logical := int64(0) logical := int64(0)
err := GTsoAllocator.SetTSO(tsoutil.ComposeTS(physical, logical)) err := GTsoAllocator.SetTSO(tsoutil.ComposeTS(physical, logical))
......
...@@ -46,8 +46,8 @@ type atomicObject struct { ...@@ -46,8 +46,8 @@ type atomicObject struct {
// timestampOracle is used to maintain the logic of tso. // timestampOracle is used to maintain the logic of tso.
type timestampOracle struct { type timestampOracle struct {
key string key string
kvBase kv.KVBase kvBase kv.KVBase
// TODO: remove saveInterval // TODO: remove saveInterval
saveInterval time.Duration saveInterval time.Duration
...@@ -83,28 +83,27 @@ func (t *timestampOracle) saveTimestamp(ts time.Time) error { ...@@ -83,28 +83,27 @@ func (t *timestampOracle) saveTimestamp(ts time.Time) error {
return nil return nil
} }
// SyncTimestamp is used to synchronize the timestamp. func (t *timestampOracle) InitTimestamp() error {
func (t *timestampOracle) SyncTimestamp() error {
last, err := t.loadTimestamp() //last, err := t.loadTimestamp()
if err != nil { //if err != nil {
return err // return err
} //}
next := time.Now() next := time.Now()
// If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`, // If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`,
// the timestamp allocation will start from the saved etcd timestamp temporarily. // the timestamp allocation will start from the saved etcd timestamp temporarily.
if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard { //if typeutil.SubTimeByWallClock(next, last) < updateTimestampGuard {
next = last.Add(updateTimestampGuard) // next = last.Add(updateTimestampGuard)
} //}
save := next.Add(t.saveInterval) save := next.Add(t.saveInterval)
if err = t.saveTimestamp(save); err != nil { if err := t.saveTimestamp(save); err != nil {
return err return err
} }
log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next)) //log.Print("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next))
current := &atomicObject{ current := &atomicObject{
physical: next, physical: next,
...@@ -156,7 +155,7 @@ func (t *timestampOracle) UpdateTimestamp() error { ...@@ -156,7 +155,7 @@ func (t *timestampOracle) UpdateTimestamp() error {
now := time.Now() now := time.Now()
jetLag := typeutil.SubTimeByWallClock(now, prev.physical) jetLag := typeutil.SubTimeByWallClock(now, prev.physical)
if jetLag > 3 * UpdateTimestampStep { if jetLag > 3*UpdateTimestampStep {
log.Print("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now)) log.Print("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now))
} }
...@@ -197,7 +196,7 @@ func (t *timestampOracle) UpdateTimestamp() error { ...@@ -197,7 +196,7 @@ func (t *timestampOracle) UpdateTimestamp() error {
// ResetTimestamp is used to reset the timestamp. // ResetTimestamp is used to reset the timestamp.
func (t *timestampOracle) ResetTimestamp() { func (t *timestampOracle) ResetTimestamp() {
zero := &atomicObject{ zero := &atomicObject{
physical: typeutil.ZeroTime, physical: time.Now(),
} }
atomic.StorePointer(&t.TSO, unsafe.Pointer(zero)) atomic.StorePointer(&t.TSO, unsafe.Pointer(zero))
} }
...@@ -35,21 +35,18 @@ func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb. ...@@ -35,21 +35,18 @@ func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb.
defer it.cancel() defer it.cancel()
var t task = it p.taskSch.DmQueue.Enqueue(it)
p.taskSch.DmQueue.Enqueue(&t) select {
for { case <-ctx.Done():
select { log.Print("insert timeout!")
case <-ctx.Done(): return &servicepb.IntegerRangeResponse{
log.Print("insert timeout!") Status: &commonpb.Status{
return &servicepb.IntegerRangeResponse{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Status: &commonpb.Status{ Reason: "insert timeout!",
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, },
Reason: "insert timeout!", }, errors.New("insert timeout!")
}, case result := <-it.resultChan:
}, errors.New("insert timeout!") return result, nil
case result := <-it.resultChan:
return result, nil
}
} }
} }
...@@ -69,19 +66,16 @@ func (p *Proxy) CreateCollection(ctx context.Context, req *schemapb.CollectionSc ...@@ -69,19 +66,16 @@ func (p *Proxy) CreateCollection(ctx context.Context, req *schemapb.CollectionSc
cct.ctx, cct.cancel = context.WithCancel(ctx) cct.ctx, cct.cancel = context.WithCancel(ctx)
defer cct.cancel() defer cct.cancel()
var t task = cct p.taskSch.DdQueue.Enqueue(cct)
p.taskSch.DdQueue.Enqueue(&t) select {
for { case <-ctx.Done():
select { log.Print("create collection timeout!")
case <-ctx.Done(): return &commonpb.Status{
log.Print("create collection timeout!") ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
return &commonpb.Status{ Reason: "create collection timeout!",
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, }, errors.New("create collection timeout!")
Reason: "create collection timeout!", case result := <-cct.resultChan:
}, errors.New("create collection timeout!") return result, nil
case result := <-cct.resultChan:
return result, nil
}
} }
} }
...@@ -102,21 +96,18 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu ...@@ -102,21 +96,18 @@ func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.Qu
qt.SearchRequest.Query.Value = queryBytes qt.SearchRequest.Query.Value = queryBytes
defer qt.cancel() defer qt.cancel()
var t task = qt p.taskSch.DqQueue.Enqueue(qt)
p.taskSch.DqQueue.Enqueue(&t) select {
for { case <-ctx.Done():
select { log.Print("query timeout!")
case <-ctx.Done(): return &servicepb.QueryResult{
log.Print("query timeout!") Status: &commonpb.Status{
return &servicepb.QueryResult{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Status: &commonpb.Status{ Reason: "query timeout!",
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, },
Reason: "query timeout!", }, errors.New("query timeout!")
}, case result := <-qt.resultChan:
}, errors.New("query timeout!") return result, nil
case result := <-qt.resultChan:
return result, nil
}
} }
} }
...@@ -134,19 +125,16 @@ func (p *Proxy) DropCollection(ctx context.Context, req *servicepb.CollectionNam ...@@ -134,19 +125,16 @@ func (p *Proxy) DropCollection(ctx context.Context, req *servicepb.CollectionNam
dct.ctx, dct.cancel = context.WithCancel(ctx) dct.ctx, dct.cancel = context.WithCancel(ctx)
defer dct.cancel() defer dct.cancel()
var t task = dct p.taskSch.DdQueue.Enqueue(dct)
p.taskSch.DdQueue.Enqueue(&t) select {
for { case <-ctx.Done():
select { log.Print("create collection timeout!")
case <-ctx.Done(): return &commonpb.Status{
log.Print("create collection timeout!") ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
return &commonpb.Status{ Reason: "create collection timeout!",
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, }, errors.New("create collection timeout!")
Reason: "create collection timeout!", case result := <-dct.resultChan:
}, errors.New("create collection timeout!") return result, nil
case result := <-dct.resultChan:
return result, nil
}
} }
} }
...@@ -164,22 +152,19 @@ func (p *Proxy) HasCollection(ctx context.Context, req *servicepb.CollectionName ...@@ -164,22 +152,19 @@ func (p *Proxy) HasCollection(ctx context.Context, req *servicepb.CollectionName
hct.ctx, hct.cancel = context.WithCancel(ctx) hct.ctx, hct.cancel = context.WithCancel(ctx)
defer hct.cancel() defer hct.cancel()
var t task = hct p.taskSch.DqQueue.Enqueue(hct)
p.taskSch.DqQueue.Enqueue(&t) select {
for { case <-ctx.Done():
select { log.Print("has collection timeout!")
case <-ctx.Done(): return &servicepb.BoolResponse{
log.Print("has collection timeout!") Status: &commonpb.Status{
return &servicepb.BoolResponse{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Status: &commonpb.Status{ Reason: "has collection timeout!",
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, },
Reason: "has collection timeout!", Value: false,
}, }, errors.New("has collection timeout!")
Value: false, case result := <-hct.resultChan:
}, errors.New("has collection timeout!") return result, nil
case result := <-hct.resultChan:
return result, nil
}
} }
} }
...@@ -197,21 +182,18 @@ func (p *Proxy) DescribeCollection(ctx context.Context, req *servicepb.Collectio ...@@ -197,21 +182,18 @@ func (p *Proxy) DescribeCollection(ctx context.Context, req *servicepb.Collectio
dct.ctx, dct.cancel = context.WithCancel(ctx) dct.ctx, dct.cancel = context.WithCancel(ctx)
defer dct.cancel() defer dct.cancel()
var t task = dct p.taskSch.DqQueue.Enqueue(dct)
p.taskSch.DqQueue.Enqueue(&t) select {
for { case <-ctx.Done():
select { log.Print("has collection timeout!")
case <-ctx.Done(): return &servicepb.CollectionDescription{
log.Print("has collection timeout!") Status: &commonpb.Status{
return &servicepb.CollectionDescription{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Status: &commonpb.Status{ Reason: "describe collection timeout!",
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, },
Reason: "describe collection timeout!", }, errors.New("describe collection timeout!")
}, case result := <-dct.resultChan:
}, errors.New("describe collection timeout!") return result, nil
case result := <-dct.resultChan:
return result, nil
}
} }
} }
...@@ -228,21 +210,18 @@ func (p *Proxy) ShowCollections(ctx context.Context, req *commonpb.Empty) (*serv ...@@ -228,21 +210,18 @@ func (p *Proxy) ShowCollections(ctx context.Context, req *commonpb.Empty) (*serv
sct.ctx, sct.cancel = context.WithCancel(ctx) sct.ctx, sct.cancel = context.WithCancel(ctx)
defer sct.cancel() defer sct.cancel()
var t task = sct p.taskSch.DqQueue.Enqueue(sct)
p.taskSch.DqQueue.Enqueue(&t) select {
for { case <-ctx.Done():
select { log.Print("show collections timeout!")
case <-ctx.Done(): return &servicepb.StringListResponse{
log.Print("show collections timeout!") Status: &commonpb.Status{
return &servicepb.StringListResponse{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Status: &commonpb.Status{ Reason: "show collections timeout!",
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, },
Reason: "show collections timeout!", }, errors.New("show collections timeout!")
}, case result := <-sct.resultChan:
}, errors.New("show collections timeout!") return result, nil
case result := <-sct.resultChan:
return result, nil
}
} }
} }
......
...@@ -2,6 +2,7 @@ package proxy ...@@ -2,6 +2,7 @@ package proxy
import ( import (
"context" "context"
"google.golang.org/grpc"
"log" "log"
"math/rand" "math/rand"
"net" "net"
...@@ -14,7 +15,6 @@ import ( ...@@ -14,7 +15,6 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb" "github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"google.golang.org/grpc"
) )
type UniqueID = typeutil.UniqueID type UniqueID = typeutil.UniqueID
...@@ -157,7 +157,7 @@ func (p *Proxy) queryResultLoop() { ...@@ -157,7 +157,7 @@ func (p *Proxy) queryResultLoop() {
if len(queryResultBuf[reqId]) == 4 { if len(queryResultBuf[reqId]) == 4 {
// TODO: use the number of query node instead // TODO: use the number of query node instead
t := p.taskSch.getTaskByReqId(reqId) t := p.taskSch.getTaskByReqId(reqId)
qt := (*t).(*QueryTask) qt := t.(*QueryTask)
qt.resultBuf <- queryResultBuf[reqId] qt.resultBuf <- queryResultBuf[reqId]
delete(queryResultBuf, reqId) delete(queryResultBuf, reqId)
} }
......
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
type BaseTaskQueue struct { type BaseTaskQueue struct {
unissuedTasks *list.List unissuedTasks *list.List
activeTasks map[Timestamp]*task activeTasks map[Timestamp]task
utLock sync.Mutex utLock sync.Mutex
atLock sync.Mutex atLock sync.Mutex
} }
...@@ -24,23 +24,23 @@ func (queue *BaseTaskQueue) Empty() bool { ...@@ -24,23 +24,23 @@ func (queue *BaseTaskQueue) Empty() bool {
return queue.unissuedTasks.Len() <= 0 && len(queue.activeTasks) <= 0 return queue.unissuedTasks.Len() <= 0 && len(queue.activeTasks) <= 0
} }
func (queue *BaseTaskQueue) AddUnissuedTask(t *task) { func (queue *BaseTaskQueue) AddUnissuedTask(t task) {
queue.utLock.Lock() queue.utLock.Lock()
defer queue.utLock.Unlock() defer queue.utLock.Unlock()
queue.unissuedTasks.PushBack(t) queue.unissuedTasks.PushBack(t)
} }
func (queue *BaseTaskQueue) FrontUnissuedTask() *task { func (queue *BaseTaskQueue) FrontUnissuedTask() task {
queue.utLock.Lock() queue.utLock.Lock()
defer queue.utLock.Unlock() defer queue.utLock.Unlock()
if queue.unissuedTasks.Len() <= 0 { if queue.unissuedTasks.Len() <= 0 {
log.Fatal("sorry, but the unissued task list is empty!") log.Fatal("sorry, but the unissued task list is empty!")
return nil return nil
} }
return queue.unissuedTasks.Front().Value.(*task) return queue.unissuedTasks.Front().Value.(task)
} }
func (queue *BaseTaskQueue) PopUnissuedTask() *task { func (queue *BaseTaskQueue) PopUnissuedTask() task {
queue.utLock.Lock() queue.utLock.Lock()
defer queue.utLock.Unlock() defer queue.utLock.Unlock()
if queue.unissuedTasks.Len() <= 0 { if queue.unissuedTasks.Len() <= 0 {
...@@ -48,13 +48,13 @@ func (queue *BaseTaskQueue) PopUnissuedTask() *task { ...@@ -48,13 +48,13 @@ func (queue *BaseTaskQueue) PopUnissuedTask() *task {
return nil return nil
} }
ft := queue.unissuedTasks.Front() ft := queue.unissuedTasks.Front()
return queue.unissuedTasks.Remove(ft).(*task) return queue.unissuedTasks.Remove(ft).(task)
} }
func (queue *BaseTaskQueue) AddActiveTask(t *task) { func (queue *BaseTaskQueue) AddActiveTask(t task) {
queue.atLock.Lock() queue.atLock.Lock()
defer queue.atLock.Lock() defer queue.atLock.Lock()
ts := (*t).EndTs() ts := t.EndTs()
_, ok := queue.activeTasks[ts] _, ok := queue.activeTasks[ts]
if ok { if ok {
log.Fatalf("task with timestamp %v already in active task list!", ts) log.Fatalf("task with timestamp %v already in active task list!", ts)
...@@ -62,7 +62,7 @@ func (queue *BaseTaskQueue) AddActiveTask(t *task) { ...@@ -62,7 +62,7 @@ func (queue *BaseTaskQueue) AddActiveTask(t *task) {
queue.activeTasks[ts] = t queue.activeTasks[ts] = t
} }
func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) *task { func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) task {
queue.atLock.Lock() queue.atLock.Lock()
defer queue.atLock.Lock() defer queue.atLock.Lock()
t, ok := queue.activeTasks[ts] t, ok := queue.activeTasks[ts]
...@@ -74,19 +74,19 @@ func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) *task { ...@@ -74,19 +74,19 @@ func (queue *BaseTaskQueue) PopActiveTask(ts Timestamp) *task {
return nil return nil
} }
func (queue *BaseTaskQueue) getTaskByReqId(reqId UniqueID) *task { func (queue *BaseTaskQueue) getTaskByReqId(reqId UniqueID) task {
queue.utLock.Lock() queue.utLock.Lock()
defer queue.utLock.Lock() defer queue.utLock.Lock()
for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() { for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() {
if (*(e.Value.(*task))).Id() == reqId { if e.Value.(task).Id() == reqId {
return e.Value.(*task) return e.Value.(task)
} }
} }
queue.atLock.Lock() queue.atLock.Lock()
defer queue.atLock.Unlock() defer queue.atLock.Unlock()
for ats := range queue.activeTasks { for ats := range queue.activeTasks {
if (*(queue.activeTasks[ats])).Id() == reqId { if queue.activeTasks[ats].Id() == reqId {
return queue.activeTasks[ats] return queue.activeTasks[ats]
} }
} }
...@@ -98,7 +98,7 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool { ...@@ -98,7 +98,7 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool {
queue.utLock.Lock() queue.utLock.Lock()
defer queue.utLock.Unlock() defer queue.utLock.Unlock()
for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() { for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() {
if (*(e.Value.(*task))).EndTs() >= ts { if e.Value.(task).EndTs() >= ts {
return false return false
} }
} }
...@@ -114,20 +114,20 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool { ...@@ -114,20 +114,20 @@ func (queue *BaseTaskQueue) TaskDoneTest(ts Timestamp) bool {
return true return true
} }
type ddTaskQueue struct { type DdTaskQueue struct {
BaseTaskQueue BaseTaskQueue
lock sync.Mutex lock sync.Mutex
} }
type dmTaskQueue struct { type DmTaskQueue struct {
BaseTaskQueue BaseTaskQueue
} }
type dqTaskQueue struct { type DqTaskQueue struct {
BaseTaskQueue BaseTaskQueue
} }
func (queue *ddTaskQueue) Enqueue(t *task) error { func (queue *DdTaskQueue) Enqueue(t task) error {
queue.lock.Lock() queue.lock.Lock()
defer queue.lock.Unlock() defer queue.lock.Unlock()
// TODO: set Ts, ReqId, ProxyId // TODO: set Ts, ReqId, ProxyId
...@@ -135,22 +135,49 @@ func (queue *ddTaskQueue) Enqueue(t *task) error { ...@@ -135,22 +135,49 @@ func (queue *ddTaskQueue) Enqueue(t *task) error {
return nil return nil
} }
func (queue *dmTaskQueue) Enqueue(t *task) error { func (queue *DmTaskQueue) Enqueue(t task) error {
// TODO: set Ts, ReqId, ProxyId // TODO: set Ts, ReqId, ProxyId
queue.AddUnissuedTask(t) queue.AddUnissuedTask(t)
return nil return nil
} }
func (queue *dqTaskQueue) Enqueue(t *task) error { func (queue *DqTaskQueue) Enqueue(t task) error {
// TODO: set Ts, ReqId, ProxyId // TODO: set Ts, ReqId, ProxyId
queue.AddUnissuedTask(t) queue.AddUnissuedTask(t)
return nil return nil
} }
func NewDdTaskQueue() *DdTaskQueue {
return &DdTaskQueue{
BaseTaskQueue: BaseTaskQueue{
unissuedTasks: list.New(),
activeTasks: make(map[Timestamp]task),
},
}
}
func NewDmTaskQueue() *DmTaskQueue {
return &DmTaskQueue{
BaseTaskQueue: BaseTaskQueue{
unissuedTasks: list.New(),
activeTasks: make(map[Timestamp]task),
},
}
}
func NewDqTaskQueue() *DqTaskQueue {
return &DqTaskQueue{
BaseTaskQueue: BaseTaskQueue{
unissuedTasks: list.New(),
activeTasks: make(map[Timestamp]task),
},
}
}
type TaskScheduler struct { type TaskScheduler struct {
DdQueue *ddTaskQueue DdQueue *DdTaskQueue
DmQueue *dmTaskQueue DmQueue *DmTaskQueue
DqQueue *dqTaskQueue DqQueue *DqTaskQueue
idAllocator *allocator.IdAllocator idAllocator *allocator.IdAllocator
tsoAllocator *allocator.TimestampAllocator tsoAllocator *allocator.TimestampAllocator
...@@ -165,6 +192,9 @@ func NewTaskScheduler(ctx context.Context, ...@@ -165,6 +192,9 @@ func NewTaskScheduler(ctx context.Context,
tsoAllocator *allocator.TimestampAllocator) (*TaskScheduler, error) { tsoAllocator *allocator.TimestampAllocator) (*TaskScheduler, error) {
ctx1, cancel := context.WithCancel(ctx) ctx1, cancel := context.WithCancel(ctx)
s := &TaskScheduler{ s := &TaskScheduler{
DdQueue: NewDdTaskQueue(),
DmQueue: NewDmTaskQueue(),
DqQueue: NewDqTaskQueue(),
idAllocator: idAllocator, idAllocator: idAllocator,
tsoAllocator: tsoAllocator, tsoAllocator: tsoAllocator,
ctx: ctx1, ctx: ctx1,
...@@ -174,19 +204,19 @@ func NewTaskScheduler(ctx context.Context, ...@@ -174,19 +204,19 @@ func NewTaskScheduler(ctx context.Context,
return s, nil return s, nil
} }
func (sched *TaskScheduler) scheduleDdTask() *task { func (sched *TaskScheduler) scheduleDdTask() task {
return sched.DdQueue.PopUnissuedTask() return sched.DdQueue.PopUnissuedTask()
} }
func (sched *TaskScheduler) scheduleDmTask() *task { func (sched *TaskScheduler) scheduleDmTask() task {
return sched.DmQueue.PopUnissuedTask() return sched.DmQueue.PopUnissuedTask()
} }
func (sched *TaskScheduler) scheduleDqTask() *task { func (sched *TaskScheduler) scheduleDqTask() task {
return sched.DqQueue.PopUnissuedTask() return sched.DqQueue.PopUnissuedTask()
} }
func (sched *TaskScheduler) getTaskByReqId(reqId UniqueID) *task { func (sched *TaskScheduler) getTaskByReqId(reqId UniqueID) task {
if t := sched.DdQueue.getTaskByReqId(reqId); t != nil { if t := sched.DdQueue.getTaskByReqId(reqId); t != nil {
return t return t
} }
...@@ -211,22 +241,22 @@ func (sched *TaskScheduler) definitionLoop() { ...@@ -211,22 +241,22 @@ func (sched *TaskScheduler) definitionLoop() {
//sched.DdQueue.atLock.Lock() //sched.DdQueue.atLock.Lock()
t := sched.scheduleDdTask() t := sched.scheduleDdTask()
err := (*t).PreExecute() err := t.PreExecute()
if err != nil { if err != nil {
return return
} }
err = (*t).Execute() err = t.Execute()
if err != nil { if err != nil {
log.Printf("execute definition task failed, error = %v", err) log.Printf("execute definition task failed, error = %v", err)
} }
(*t).Notify(err) t.Notify(err)
sched.DdQueue.AddActiveTask(t) sched.DdQueue.AddActiveTask(t)
(*t).WaitToFinish() t.WaitToFinish()
(*t).PostExecute() t.PostExecute()
sched.DdQueue.PopActiveTask((*t).EndTs()) sched.DdQueue.PopActiveTask(t.EndTs())
} }
} }
...@@ -242,27 +272,27 @@ func (sched *TaskScheduler) manipulationLoop() { ...@@ -242,27 +272,27 @@ func (sched *TaskScheduler) manipulationLoop() {
sched.DmQueue.atLock.Lock() sched.DmQueue.atLock.Lock()
t := sched.scheduleDmTask() t := sched.scheduleDmTask()
if err := (*t).PreExecute(); err != nil { if err := t.PreExecute(); err != nil {
return return
} }
go func() { go func() {
err := (*t).Execute() err := t.Execute()
if err != nil { if err != nil {
log.Printf("execute manipulation task failed, error = %v", err) log.Printf("execute manipulation task failed, error = %v", err)
} }
(*t).Notify(err) t.Notify(err)
}() }()
sched.DmQueue.AddActiveTask(t) sched.DmQueue.AddActiveTask(t)
sched.DmQueue.atLock.Unlock() sched.DmQueue.atLock.Unlock()
go func() { go func() {
(*t).WaitToFinish() t.WaitToFinish()
(*t).PostExecute() t.PostExecute()
// remove from active list // remove from active list
sched.DmQueue.PopActiveTask((*t).EndTs()) sched.DmQueue.PopActiveTask(t.EndTs())
}() }()
} }
} }
...@@ -279,27 +309,27 @@ func (sched *TaskScheduler) queryLoop() { ...@@ -279,27 +309,27 @@ func (sched *TaskScheduler) queryLoop() {
sched.DqQueue.atLock.Lock() sched.DqQueue.atLock.Lock()
t := sched.scheduleDqTask() t := sched.scheduleDqTask()
if err := (*t).PreExecute(); err != nil { if err := t.PreExecute(); err != nil {
return return
} }
go func() { go func() {
err := (*t).Execute() err := t.Execute()
if err != nil { if err != nil {
log.Printf("execute query task failed, error = %v", err) log.Printf("execute query task failed, error = %v", err)
} }
(*t).Notify(err) t.Notify(err)
}() }()
sched.DqQueue.AddActiveTask(t) sched.DqQueue.AddActiveTask(t)
sched.DqQueue.atLock.Unlock() sched.DqQueue.atLock.Unlock()
go func() { go func() {
(*t).WaitToFinish() t.WaitToFinish()
(*t).PostExecute() t.PostExecute()
// remove from active list // remove from active list
sched.DqQueue.PopActiveTask((*t).EndTs()) sched.DqQueue.PopActiveTask(t.EndTs())
}() }()
} }
} }
......
...@@ -51,7 +51,6 @@ func newTimeTick(ctx context.Context, tsoAllocator *allocator.TimestampAllocator ...@@ -51,7 +51,6 @@ func newTimeTick(ctx context.Context, tsoAllocator *allocator.TimestampAllocator
return t return t
} }
func (tt *timeTick) tick() error { func (tt *timeTick) tick() error {
if tt.lastTick == tt.currentTick { if tt.lastTick == tt.currentTick {
......
...@@ -33,7 +33,7 @@ func TestTimeTick(t *testing.T) { ...@@ -33,7 +33,7 @@ func TestTimeTick(t *testing.T) {
tt := timeTick{ tt := timeTick{
interval: 200, interval: 200,
pulsarProducer: producer, pulsarProducer: producer,
peerID: 1, peerID: 1,
ctx: ctx, ctx: ctx,
areRequestsDelivered: func(ts Timestamp) bool { return true }, areRequestsDelivered: func(ts Timestamp) bool { return true },
} }
......
package tsoutil package tsoutil
import ( import (
"fmt"
"path"
"strconv"
"time" "time"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/kv"
"go.etcd.io/etcd/clientv3"
) )
const ( const (
...@@ -20,3 +27,15 @@ func ParseTS(ts uint64) (time.Time, uint64) { ...@@ -20,3 +27,15 @@ func ParseTS(ts uint64) (time.Time, uint64) {
physicalTime := time.Unix(int64(physical/1000), int64(physical)%1000*time.Millisecond.Nanoseconds()) physicalTime := time.Unix(int64(physical/1000), int64(physical)%1000*time.Millisecond.Nanoseconds())
return physicalTime, logical return physicalTime, logical
} }
func NewTSOKVBase(subPath string) *kv.EtcdKV {
etcdAddr := conf.Config.Etcd.Address
etcdAddr += ":"
etcdAddr += strconv.FormatInt(int64(conf.Config.Etcd.Port), 10)
fmt.Println("etcdAddr ::: ", etcdAddr)
client, _ := clientv3.New(clientv3.Config{
Endpoints: []string{etcdAddr},
DialTimeout: 5 * time.Second,
})
return kv.NewEtcdKV(client, path.Join(conf.Config.Etcd.Rootpath, subPath))
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册