未验证 提交 6826964e 编写于 作者: X Xiaofan 提交者: GitHub

Rename some interface and files in id allocator (#19605)

Signed-off-by: Nxiaofan-luan <xiaofan.luan@zilliz.com>
Signed-off-by: Nxiaofan-luan <xiaofan.luan@zilliz.com>
上级 b2a8536c
......@@ -124,7 +124,7 @@ func (t *Ticker) Chan() <-chan time.Time {
}
// Allocator allocates from a global allocator by its given member functions
type Allocator struct {
type CachedAllocator struct {
Ctx context.Context
CancelFunc context.CancelFunc
......@@ -148,7 +148,7 @@ type Allocator struct {
}
// Start starts the loop of checking whether to synchronize with the global allocator.
func (ta *Allocator) Start() error {
func (ta *CachedAllocator) Start() error {
ta.TChan.Init()
ta.wg.Add(1)
go ta.mainLoop()
......@@ -156,12 +156,12 @@ func (ta *Allocator) Start() error {
}
// Init mainly initialize internal members.
func (ta *Allocator) Init() {
func (ta *CachedAllocator) Init() {
ta.ForceSyncChan = make(chan Request, maxConcurrentRequests)
ta.Reqs = make(chan Request, maxConcurrentRequests)
}
func (ta *Allocator) mainLoop() {
func (ta *CachedAllocator) mainLoop() {
defer ta.wg.Done()
loopCtx, loopCancel := context.WithCancel(ta.Ctx)
......@@ -207,14 +207,14 @@ func (ta *Allocator) mainLoop() {
}
}
func (ta *Allocator) pickCanDo() {
func (ta *CachedAllocator) pickCanDo() {
if ta.PickCanDoFunc == nil {
return
}
ta.PickCanDoFunc()
}
func (ta *Allocator) sync(timeout bool) bool {
func (ta *CachedAllocator) sync(timeout bool) bool {
if ta.SyncFunc == nil || ta.CheckSyncFunc == nil {
ta.CanDoReqs = ta.ToDoReqs
ta.ToDoReqs = nil
......@@ -236,7 +236,7 @@ func (ta *Allocator) sync(timeout bool) bool {
return ret
}
func (ta *Allocator) finishSyncRequest() {
func (ta *CachedAllocator) finishSyncRequest() {
for _, req := range ta.SyncReqs {
if req != nil {
req.Notify(nil)
......@@ -245,7 +245,7 @@ func (ta *Allocator) finishSyncRequest() {
ta.SyncReqs = nil
}
func (ta *Allocator) failRemainRequest() {
func (ta *CachedAllocator) failRemainRequest() {
var err error
if ta.SyncErr != nil {
err = fmt.Errorf("%s failRemainRequest err:%w", ta.Role, ta.SyncErr)
......@@ -266,7 +266,7 @@ func (ta *Allocator) failRemainRequest() {
ta.ToDoReqs = nil
}
func (ta *Allocator) finishRequest() {
func (ta *CachedAllocator) finishRequest() {
for _, req := range ta.CanDoReqs {
if req != nil {
err := ta.ProcessFunc(req)
......@@ -276,7 +276,7 @@ func (ta *Allocator) finishRequest() {
ta.CanDoReqs = []Request{}
}
func (ta *Allocator) revokeRequest(err error) {
func (ta *CachedAllocator) revokeRequest(err error) {
n := len(ta.Reqs)
for i := 0; i < n; i++ {
req := <-ta.Reqs
......@@ -285,7 +285,7 @@ func (ta *Allocator) revokeRequest(err error) {
}
// Close mainly stop the internal coroutine and recover resources.
func (ta *Allocator) Close() {
func (ta *CachedAllocator) Close() {
ta.CancelFunc()
ta.wg.Wait()
ta.TChan.Close()
......@@ -294,7 +294,7 @@ func (ta *Allocator) Close() {
}
// CleanCache is used to force synchronize with global allocator.
func (ta *Allocator) CleanCache() {
func (ta *CachedAllocator) CleanCache() {
req := &SyncRequest{
BaseRequest: BaseRequest{
Done: make(chan error),
......
......@@ -22,16 +22,6 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)
// GIDAllocator is an interface for GlobalIDAllocator.
// Alloc allocates the id of the count number.
// AllocOne allocates one id.
// UpdateID update timestamp of allocator.
type GIDAllocator interface {
Alloc(count uint32) (UniqueID, UniqueID, error)
AllocOne() (UniqueID, error)
UpdateID() error
}
// GlobalIDAllocator is the global single point TSO allocator.
type GlobalIDAllocator struct {
allocator tso.Allocator
......@@ -73,8 +63,3 @@ func (gia *GlobalIDAllocator) AllocOne() (typeutil.UniqueID, error) {
idStart := typeutil.UniqueID(timestamp)
return idStart, nil
}
// UpdateID updates timestamp of allocator.
func (gia *GlobalIDAllocator) UpdateID() error {
return gia.allocator.UpdateTSO()
}
......@@ -33,16 +33,12 @@ const (
// UniqueID is alias of typeutil.UniqueID
type UniqueID = typeutil.UniqueID
type idAllocatorInterface interface {
AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error)
}
// IDAllocator allocates Unique and monotonically increasing IDs from Root Coord.
// It could also batch allocate for less root coord server access
type IDAllocator struct {
Allocator
CachedAllocator
idAllocator idAllocatorInterface
remoteAllocator remoteInterface
countPerRPC uint32
......@@ -53,30 +49,30 @@ type IDAllocator struct {
}
// NewIDAllocator creates an ID Allocator allocate Unique and monotonically increasing IDs from RootCoord.
func NewIDAllocator(ctx context.Context, idAllocator idAllocatorInterface, peerID UniqueID) (*IDAllocator, error) {
func NewIDAllocator(ctx context.Context, remoteAllocator remoteInterface, peerID UniqueID) (*IDAllocator, error) {
ctx1, cancel := context.WithCancel(ctx)
a := &IDAllocator{
Allocator: Allocator{
CachedAllocator: CachedAllocator{
Ctx: ctx1,
CancelFunc: cancel,
Role: "IDAllocator",
},
countPerRPC: idCountPerRPC,
idAllocator: idAllocator,
PeerID: peerID,
countPerRPC: idCountPerRPC,
remoteAllocator: remoteAllocator,
PeerID: peerID,
}
a.TChan = &EmptyTicker{}
a.Allocator.SyncFunc = a.syncID
a.Allocator.ProcessFunc = a.processFunc
a.Allocator.CheckSyncFunc = a.checkSyncFunc
a.Allocator.PickCanDoFunc = a.pickCanDoFunc
a.CachedAllocator.SyncFunc = a.syncID
a.CachedAllocator.ProcessFunc = a.processFunc
a.CachedAllocator.CheckSyncFunc = a.checkSyncFunc
a.CachedAllocator.PickCanDoFunc = a.pickCanDoFunc
a.Init()
return a, nil
}
// Start creates some working goroutines of IDAllocator.
func (ia *IDAllocator) Start() error {
return ia.Allocator.Start()
return ia.CachedAllocator.Start()
}
func (ia *IDAllocator) gatherReqIDCount() uint32 {
......@@ -105,7 +101,7 @@ func (ia *IDAllocator) syncID() (bool, error) {
},
Count: need,
}
resp, err := ia.idAllocator.AllocID(ctx, req)
resp, err := ia.remoteAllocator.AllocID(ctx, req)
cancel()
if err != nil {
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package allocator
// Allocator interface is an interface for alloc id.
// Alloc allocates the id of the count number.
// AllocOne allocates one id.
// See GlobalIDAllocator for implementation details
type Interface interface {
Alloc(count uint32) (UniqueID, UniqueID, error)
AllocOne() (UniqueID, error)
}
package allocator
type MockGIDAllocator struct {
GIDAllocator
Interface
AllocF func(count uint32) (UniqueID, UniqueID, error)
AllocOneF func() (UniqueID, error)
UpdateIDF func() error
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package allocator
import (
"context"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
)
type remoteInterface interface {
AllocID(ctx context.Context, req *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error)
}
......@@ -120,10 +120,10 @@ type DataNode struct {
rootCoord types.RootCoord
dataCoord types.DataCoord
session *sessionutil.Session
watchKv kv.MetaKv
chunkManager storage.ChunkManager
idAllocator *allocator2.IDAllocator
session *sessionutil.Session
watchKv kv.MetaKv
chunkManager storage.ChunkManager
rowIDAllocator *allocator2.IDAllocator
closer io.Closer
......@@ -247,7 +247,7 @@ func (node *DataNode) Init() error {
zap.String("role", typeutil.DataNodeRole), zap.Int64("DataNodeID", Params.DataNodeCfg.GetNodeID()))
return err
}
node.idAllocator = idAllocator
node.rowIDAllocator = idAllocator
node.factory.Init(&Params)
log.Info("DataNode Init successfully",
......@@ -465,7 +465,7 @@ func (node *DataNode) BackGroundGC(vChannelCh <-chan string) {
// Start will update DataNode state to HEALTHY
func (node *DataNode) Start() error {
if err := node.idAllocator.Start(); err != nil {
if err := node.rowIDAllocator.Start(); err != nil {
log.Error("failed to start id allocator", zap.Error(err), zap.String("role", typeutil.DataNodeRole))
return err
}
......@@ -684,9 +684,9 @@ func (node *DataNode) Stop() error {
node.cancel()
node.flowgraphManager.dropAll()
if node.idAllocator != nil {
if node.rowIDAllocator != nil {
log.Info("close id allocator", zap.String("role", typeutil.DataNodeRole))
node.idAllocator.Close()
node.rowIDAllocator.Close()
}
if node.closer != nil {
......@@ -1042,7 +1042,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
// parse files and generate segments
segmentSize := int64(Params.DataCoordCfg.SegmentMaxSize) * 1024 * 1024
importWrapper := importutil.NewImportWrapper(ctx, colInfo.GetSchema(), colInfo.GetShardsNum(), segmentSize, node.idAllocator, node.chunkManager,
importWrapper := importutil.NewImportWrapper(ctx, colInfo.GetSchema(), colInfo.GetShardsNum(), segmentSize, node.rowIDAllocator, node.chunkManager,
importFlushReqFunc(node, req, importResult, colInfo.GetSchema(), ts), importResult, reportFunc)
err = importWrapper.Import(req.GetImportTask().GetFiles(), req.GetImportTask().GetRowBased(), false)
if err != nil {
......
......@@ -40,7 +40,7 @@ var once sync.Once
var params paramtable.BaseTable
// InitRmq is deprecate implementation of global rocksmq. will be removed later
func InitRmq(rocksdbName string, idAllocator allocator.GIDAllocator) error {
func InitRmq(rocksdbName string, idAllocator allocator.Interface) error {
var err error
params.Init()
Rmq, err = NewRocksMQ(params, rocksdbName, idAllocator)
......
......@@ -118,7 +118,7 @@ var topicMu = sync.Map{}
type rocksmq struct {
store *gorocksdb.DB
kv kv.BaseKV
idAllocator allocator.GIDAllocator
idAllocator allocator.Interface
storeMu *sync.Mutex
consumers sync.Map
consumersID sync.Map
......@@ -132,7 +132,7 @@ type rocksmq struct {
// 1. New rocksmq instance based on rocksdb with name and rocksdbkv with kvname
// 2. Init retention info, load retention info to memory
// 3. Start retention goroutine
func NewRocksMQ(params paramtable.BaseTable, name string, idAllocator allocator.GIDAllocator) (*rocksmq, error) {
func NewRocksMQ(params paramtable.BaseTable, name string, idAllocator allocator.Interface) (*rocksmq, error) {
// TODO we should use same rocksdb instance with different cfs
maxProcs := runtime.GOMAXPROCS(0)
parallelism := 1
......@@ -191,7 +191,7 @@ func NewRocksMQ(params paramtable.BaseTable, name string, idAllocator allocator.
return nil, err
}
var mqIDAllocator allocator.GIDAllocator
var mqIDAllocator allocator.Interface
// if user didn't specify id allocator, init one with kv
if idAllocator == nil {
allocator := allocator.NewGlobalIDAllocator("rmq_id", kv)
......
......@@ -107,7 +107,6 @@ func TestRocksmq_RegisterConsumer(t *testing.T) {
pMsgA := ProducerMessage{Payload: []byte(msgA)}
pMsgs[0] = pMsgA
_ = idAllocator.UpdateID()
_, err = rmq.Produce(topicName, pMsgs)
assert.Nil(t, err)
......@@ -155,7 +154,6 @@ func TestRocksmq_Basic(t *testing.T) {
pMsgA := ProducerMessage{Payload: []byte(msgA)}
pMsgs[0] = pMsgA
_ = idAllocator.UpdateID()
_, err = rmq.Produce(channelName, pMsgs)
assert.Nil(t, err)
......@@ -164,7 +162,6 @@ func TestRocksmq_Basic(t *testing.T) {
pMsgs[0] = pMsgB
pMsgs = append(pMsgs, pMsgC)
_ = idAllocator.UpdateID()
_, err = rmq.Produce(channelName, pMsgs)
assert.Nil(t, err)
......@@ -501,7 +498,6 @@ func TestRocksmq_Throughout(t *testing.T) {
for i := 0; i < entityNum; i++ {
msg := "message_" + strconv.Itoa(i)
pMsg := ProducerMessage{Payload: []byte(msg)}
assert.Nil(t, idAllocator.UpdateID())
ids, err := rmq.Produce(channelName, []ProducerMessage{pMsg})
assert.Nil(t, err)
assert.EqualValues(t, 1, len(ids))
......
......@@ -1701,22 +1701,6 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar
return spt.result, nil
}
func (node *Proxy) getMsgBase() (*commonpb.MsgBase, error) {
msgID, err := node.idAllocator.AllocOne()
if err != nil {
return nil, err
}
timestamp, err := node.tsoAllocator.AllocOne()
if err != nil {
return nil, err
}
return &commonpb.MsgBase{
MsgID: msgID,
Timestamp: timestamp,
SourceID: Params.ProxyCfg.GetNodeID(),
}, nil
}
func (node *Proxy) getCollectionProgress(ctx context.Context, request *milvuspb.GetLoadingProgressRequest, collectionID int64) (int64, error) {
resp, err := node.queryCoord.ShowCollections(ctx, &querypb.ShowCollectionsRequest{
Base: &commonpb.MsgBase{
......@@ -1803,9 +1787,11 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get
if err != nil {
return getErrResponse(err), nil
}
msgBase, err := node.getMsgBase()
if err != nil {
return getErrResponse(err), nil
msgBase := &commonpb.MsgBase{
MsgType: commonpb.MsgType_SystemInfo,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyCfg.GetNodeID(),
}
if request.Base == nil {
request.Base = msgBase
......@@ -2446,7 +2432,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
// RowData: transfer column based request to this
},
},
idAllocator: node.idAllocator,
idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
......@@ -3656,15 +3642,9 @@ func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReque
log.Debug("Proxy.GetMetrics",
zap.String("metric_type", metricType))
msgID := UniqueID(0)
msgID, err = node.idAllocator.AllocOne()
if err != nil {
log.Warn("Proxy.GetMetrics failed to allocate id",
zap.Error(err))
}
req.Base = &commonpb.MsgBase{
MsgType: commonpb.MsgType_SystemInfo,
MsgID: msgID,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyCfg.GetNodeID(),
}
......@@ -3744,16 +3724,11 @@ func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetrics
log.Debug("Proxy.GetProxyMetrics",
zap.String("metric_type", metricType))
msgID := UniqueID(0)
msgID, err = node.idAllocator.AllocOne()
if err != nil {
log.Warn("Proxy.GetProxyMetrics failed to allocate id",
zap.Error(err))
}
req.Base = &commonpb.MsgBase{
MsgType: commonpb.MsgType_SystemInfo,
MsgID: msgID,
SourceID: Params.ProxyCfg.GetNodeID(),
MsgType: commonpb.MsgType_SystemInfo,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyCfg.GetNodeID(),
}
if metricType == metricsinfo.SystemInfoMetrics {
......
......@@ -28,17 +28,7 @@ type tsoAllocator interface {
AllocOne() (Timestamp, error)
}
// use interface idAllocatorInterface to keep other components testable
// include: baseTaskQueue, taskScheduler
type idAllocatorInterface interface {
AllocOne() (UniqueID, error)
}
// use timestampAllocatorInterface to keep other components testable
type timestampAllocatorInterface interface {
AllocTimestamp(ctx context.Context, req *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error)
}
type getChannelsService interface {
GetChannels(collectionID UniqueID) (map[vChan]pChan, error)
}
......@@ -22,14 +22,14 @@ import (
"sync"
"time"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/uniquegenerator"
)
......@@ -82,7 +82,11 @@ func (m *mockIDAllocatorInterface) AllocOne() (UniqueID, error) {
return UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()), nil
}
func newMockIDAllocatorInterface() idAllocatorInterface {
func (m *mockIDAllocatorInterface) Alloc(count uint32) (UniqueID, UniqueID, error) {
return UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()), UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt() + int(count)), nil
}
func newMockIDAllocatorInterface() allocator.Interface {
return &mockIDAllocatorInterface{}
}
......
......@@ -90,9 +90,9 @@ type Proxy struct {
chTicker channelsTimeTicker
idAllocator *allocator.IDAllocator
tsoAllocator *timestampAllocator
segAssigner *segIDAssigner
rowIDAllocator *allocator.IDAllocator
tsoAllocator *timestampAllocator
segAssigner *segIDAssigner
metricsCacheManager *metricsinfo.MetricsCacheManager
......@@ -198,7 +198,7 @@ func (node *Proxy) Init() error {
zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.GetNodeID()))
return err
}
node.idAllocator = idAllocator
node.rowIDAllocator = idAllocator
log.Debug("create id allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.GetNodeID()))
log.Debug("create timestamp allocator", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", Params.ProxyCfg.GetNodeID()))
......@@ -231,7 +231,7 @@ func (node *Proxy) Init() error {
log.Debug("create channels manager done", zap.String("role", typeutil.ProxyRole))
log.Debug("create task scheduler", zap.String("role", typeutil.ProxyRole))
node.sched, err = newTaskScheduler(node.ctx, node.idAllocator, node.tsoAllocator, node.factory)
node.sched, err = newTaskScheduler(node.ctx, node.tsoAllocator, node.factory)
if err != nil {
log.Warn("failed to create task scheduler", zap.Error(err), zap.String("role", typeutil.ProxyRole))
return err
......@@ -336,7 +336,7 @@ func (node *Proxy) Start() error {
log.Debug("start task scheduler done", zap.String("role", typeutil.ProxyRole))
log.Debug("start id allocator", zap.String("role", typeutil.ProxyRole))
if err := node.idAllocator.Start(); err != nil {
if err := node.rowIDAllocator.Start(); err != nil {
log.Warn("failed to start id allocator", zap.Error(err), zap.String("role", typeutil.ProxyRole))
return err
}
......@@ -377,8 +377,8 @@ func (node *Proxy) Start() error {
func (node *Proxy) Stop() error {
node.cancel()
if node.idAllocator != nil {
node.idAllocator.Close()
if node.rowIDAllocator != nil {
node.rowIDAllocator.Close()
log.Info("close id allocator", zap.String("role", typeutil.ProxyRole))
}
......
......@@ -36,9 +36,6 @@ const (
segCountPerRPC = 20000
)
// Allocator is an alias for the allocator.Allocator type
type Allocator = allocator.Allocator
// DataCoord is a narrowed interface of DataCoordinator which only provide AssignSegmentID method
type DataCoord interface {
AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error)
......@@ -140,7 +137,7 @@ func (info *assignInfo) Assign(ts Timestamp, count uint32) (map[UniqueID]uint32,
}
type segIDAssigner struct {
Allocator
allocator.CachedAllocator
assignInfos map[UniqueID]*list.List // collectionID -> *list.List
segReqs []*datapb.SegmentIDRequest
getTickFunc func() Timestamp
......@@ -154,7 +151,7 @@ type segIDAssigner struct {
func newSegIDAssigner(ctx context.Context, dataCoord DataCoord, getTickFunc func() Timestamp) (*segIDAssigner, error) {
ctx1, cancel := context.WithCancel(ctx)
sa := &segIDAssigner{
Allocator: Allocator{
CachedAllocator: allocator.CachedAllocator{
Ctx: ctx1,
CancelFunc: cancel,
Role: "SegmentIDAllocator",
......@@ -167,10 +164,10 @@ func newSegIDAssigner(ctx context.Context, dataCoord DataCoord, getTickFunc func
sa.TChan = &allocator.Ticker{
UpdateInterval: time.Second,
}
sa.Allocator.SyncFunc = sa.syncSegments
sa.Allocator.ProcessFunc = sa.processFunc
sa.Allocator.CheckSyncFunc = sa.checkSyncFunc
sa.Allocator.PickCanDoFunc = sa.pickCanDoFunc
sa.CachedAllocator.SyncFunc = sa.syncSegments
sa.CachedAllocator.ProcessFunc = sa.processFunc
sa.CachedAllocator.CheckSyncFunc = sa.checkSyncFunc
sa.CachedAllocator.PickCanDoFunc = sa.pickCanDoFunc
sa.Init()
return sa, nil
}
......
......@@ -64,7 +64,6 @@ type baseTaskQueue struct {
utBufChan chan int // to block scheduler
tsoAllocatorIns tsoAllocator
idAllocatorIns idAllocatorInterface
}
func (queue *baseTaskQueue) utChan() <-chan int {
......@@ -176,11 +175,8 @@ func (queue *baseTaskQueue) Enqueue(t task) error {
}
t.SetTs(ts)
reqID, err := queue.idAllocatorIns.AllocOne()
if err != nil {
return err
}
t.SetID(reqID)
// we always use same msg id and ts for now.
t.SetID(UniqueID(ts))
return queue.addUnissuedTask(t)
}
......@@ -199,7 +195,7 @@ func (queue *baseTaskQueue) getMaxTaskNum() int64 {
return queue.maxTaskNum
}
func newBaseTaskQueue(tsoAllocatorIns tsoAllocator, idAllocatorIns idAllocatorInterface) *baseTaskQueue {
func newBaseTaskQueue(tsoAllocatorIns tsoAllocator) *baseTaskQueue {
return &baseTaskQueue{
unissuedTasks: list.New(),
activeTasks: make(map[UniqueID]task),
......@@ -208,7 +204,6 @@ func newBaseTaskQueue(tsoAllocatorIns tsoAllocator, idAllocatorIns idAllocatorIn
maxTaskNum: Params.ProxyCfg.MaxTaskNum,
utBufChan: make(chan int, Params.ProxyCfg.MaxTaskNum),
tsoAllocatorIns: tsoAllocatorIns,
idAllocatorIns: idAllocatorIns,
}
}
......@@ -349,22 +344,22 @@ func (queue *ddTaskQueue) Enqueue(t task) error {
return queue.baseTaskQueue.Enqueue(t)
}
func newDdTaskQueue(tsoAllocatorIns tsoAllocator, idAllocatorIns idAllocatorInterface) *ddTaskQueue {
func newDdTaskQueue(tsoAllocatorIns tsoAllocator) *ddTaskQueue {
return &ddTaskQueue{
baseTaskQueue: newBaseTaskQueue(tsoAllocatorIns, idAllocatorIns),
baseTaskQueue: newBaseTaskQueue(tsoAllocatorIns),
}
}
func newDmTaskQueue(tsoAllocatorIns tsoAllocator, idAllocatorIns idAllocatorInterface) *dmTaskQueue {
func newDmTaskQueue(tsoAllocatorIns tsoAllocator) *dmTaskQueue {
return &dmTaskQueue{
baseTaskQueue: newBaseTaskQueue(tsoAllocatorIns, idAllocatorIns),
baseTaskQueue: newBaseTaskQueue(tsoAllocatorIns),
pChanStatisticsInfos: make(map[pChan]*pChanStatInfo),
}
}
func newDqTaskQueue(tsoAllocatorIns tsoAllocator, idAllocatorIns idAllocatorInterface) *dqTaskQueue {
func newDqTaskQueue(tsoAllocatorIns tsoAllocator) *dqTaskQueue {
return &dqTaskQueue{
baseTaskQueue: newBaseTaskQueue(tsoAllocatorIns, idAllocatorIns),
baseTaskQueue: newBaseTaskQueue(tsoAllocatorIns),
}
}
......@@ -384,7 +379,6 @@ type taskScheduler struct {
type schedOpt func(*taskScheduler)
func newTaskScheduler(ctx context.Context,
idAllocatorIns idAllocatorInterface,
tsoAllocatorIns tsoAllocator,
factory msgstream.Factory,
opts ...schedOpt,
......@@ -395,9 +389,9 @@ func newTaskScheduler(ctx context.Context,
cancel: cancel,
msFactory: factory,
}
s.ddQueue = newDdTaskQueue(tsoAllocatorIns, idAllocatorIns)
s.dmQueue = newDmTaskQueue(tsoAllocatorIns, idAllocatorIns)
s.dqQueue = newDqTaskQueue(tsoAllocatorIns, idAllocatorIns)
s.ddQueue = newDdTaskQueue(tsoAllocatorIns)
s.dmQueue = newDmTaskQueue(tsoAllocatorIns)
s.dqQueue = newDqTaskQueue(tsoAllocatorIns)
for _, opt := range opts {
opt(s)
......
......@@ -33,8 +33,7 @@ func TestBaseTaskQueue(t *testing.T) {
var activeTask task
tsoAllocatorIns := newMockTsoAllocator()
idAllocatorIns := newMockIDAllocatorInterface()
queue := newBaseTaskQueue(tsoAllocatorIns, idAllocatorIns)
queue := newBaseTaskQueue(tsoAllocatorIns)
assert.NotNil(t, queue)
assert.True(t, queue.utEmpty())
......@@ -112,8 +111,7 @@ func TestDdTaskQueue(t *testing.T) {
var activeTask task
tsoAllocatorIns := newMockTsoAllocator()
idAllocatorIns := newMockIDAllocatorInterface()
queue := newDdTaskQueue(tsoAllocatorIns, idAllocatorIns)
queue := newDdTaskQueue(tsoAllocatorIns)
assert.NotNil(t, queue)
assert.True(t, queue.utEmpty())
......@@ -192,8 +190,7 @@ func TestDmTaskQueue_Basic(t *testing.T) {
var activeTask task
tsoAllocatorIns := newMockTsoAllocator()
idAllocatorIns := newMockIDAllocatorInterface()
queue := newDmTaskQueue(tsoAllocatorIns, idAllocatorIns)
queue := newDmTaskQueue(tsoAllocatorIns)
assert.NotNil(t, queue)
assert.True(t, queue.utEmpty())
......@@ -271,8 +268,7 @@ func TestDmTaskQueue_TimestampStatistics(t *testing.T) {
var unissuedTask task
tsoAllocatorIns := newMockTsoAllocator()
idAllocatorIns := newMockIDAllocatorInterface()
queue := newDmTaskQueue(tsoAllocatorIns, idAllocatorIns)
queue := newDmTaskQueue(tsoAllocatorIns)
assert.NotNil(t, queue)
st := newDefaultMockDmlTask()
......@@ -312,8 +308,7 @@ func TestDqTaskQueue(t *testing.T) {
var activeTask task
tsoAllocatorIns := newMockTsoAllocator()
idAllocatorIns := newMockIDAllocatorInterface()
queue := newDqTaskQueue(tsoAllocatorIns, idAllocatorIns)
queue := newDqTaskQueue(tsoAllocatorIns)
assert.NotNil(t, queue)
assert.True(t, queue.utEmpty())
......@@ -390,10 +385,9 @@ func TestTaskScheduler(t *testing.T) {
ctx := context.Background()
tsoAllocatorIns := newMockTsoAllocator()
idAllocatorIns := newMockIDAllocatorInterface()
factory := newSimpleMockMsgStreamFactory()
sched, err := newTaskScheduler(ctx, idAllocatorIns, tsoAllocatorIns, factory)
sched, err := newTaskScheduler(ctx, tsoAllocatorIns, factory)
assert.NoError(t, err)
assert.NotNil(t, sched)
......
......@@ -71,9 +71,9 @@ func (ta *timestampAllocator) alloc(count uint32) ([]Timestamp, error) {
return nil, fmt.Errorf("syncTimeStamp Failed:%s", resp.Status.Reason)
}
start, cnt := resp.Timestamp, resp.Count
var ret []Timestamp
ret := make([]Timestamp, cnt)
for i := uint32(0); i < cnt; i++ {
ret = append(ret, start+uint64(i))
ret[i] = start + uint64(i)
}
return ret, nil
......
......@@ -361,7 +361,7 @@ func withInvalidMeta() Opt {
return withMeta(meta)
}
func withIDAllocator(idAllocator allocator.GIDAllocator) Opt {
func withIDAllocator(idAllocator allocator.Interface) Opt {
return func(c *Core) {
c.idAllocator = idAllocator
}
......
......@@ -110,7 +110,7 @@ type Core struct {
chanTimeTick *timetickSync
idAllocator allocator.GIDAllocator
idAllocator allocator.Interface
tsoAllocator tso.Allocator
dataCoord types.DataCoord
......
......@@ -20,7 +20,7 @@ type scheduler struct {
cancel context.CancelFunc
wg sync.WaitGroup
idAllocator allocator.GIDAllocator
idAllocator allocator.Interface
tsoAllocator tso.Allocator
taskChan chan task
......@@ -28,7 +28,7 @@ type scheduler struct {
lock sync.Mutex
}
func newScheduler(ctx context.Context, idAllocator allocator.GIDAllocator, tsoAllocator tso.Allocator) *scheduler {
func newScheduler(ctx context.Context, idAllocator allocator.Interface, tsoAllocator tso.Allocator) *scheduler {
ctx1, cancel := context.WithCancel(ctx)
// TODO
n := 1024 * 10
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册