提交 1ba8e244 编写于 作者: Z zhenshan.cao 提交者: yefu.chen

Add tsLoop for indexservice

Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 a64c831e
......@@ -128,6 +128,9 @@ func (i *ServiceImpl) Init() error {
}
func (i *ServiceImpl) Start() error {
i.loopWg.Add(1)
go i.tsLoop()
i.sched.Start()
// Start callbacks
for _, cb := range i.startCallbacks {
......@@ -276,3 +279,24 @@ func (i *ServiceImpl) NotifyBuildIndex(nty *indexpb.BuildIndexNotification) (*co
i.nodeClients.IncPriority(nty.NodeID, -1)
return ret, nil
}
func (i *ServiceImpl) tsLoop() {
tsoTicker := time.NewTicker(UpdateTimestampStep)
defer tsoTicker.Stop()
ctx, cancel := context.WithCancel(i.loopCtx)
defer cancel()
defer i.loopWg.Done()
for {
select {
case <-tsoTicker.C:
if err := i.idAllocator.UpdateID(); err != nil {
log.Println("failed to update id", err)
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Println("tsLoop is closed")
return
}
}
}
......@@ -513,7 +513,7 @@ func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(se
return errors.New("unexpected segment type")
}
targetSegment, err := colReplica.getSegmentByIDPrivate(segment.ID())
if err == nil && targetSegment != nil {
if err != nil && targetSegment != nil {
if targetSegment.segmentType != segTypeGrowing {
// target segment has been a sealed segment
return nil
......@@ -521,7 +521,7 @@ func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(se
deleteSegment(targetSegment)
}
colReplica.segments[segment.ID()] = segment
targetSegment = segment
return nil
}
......
......@@ -242,36 +242,5 @@ func TestCollectionReplica_freeAll(t *testing.T) {
err := node.Stop()
assert.NoError(t, err)
}
func TestReplaceGrowingSegmentBySealedSegment(t *testing.T) {
node := newQueryNodeMock()
collectionID := UniqueID(0)
segmentID := UniqueID(520)
initTestMeta(t, node, collectionID, segmentID)
_, _, segIDs := node.replica.getSealedSegmentsBySegmentType(segTypeGrowing)
assert.Equal(t, len(segIDs), 1)
collection, err := node.replica.getCollectionByID(collectionID)
assert.NoError(t, err)
ns := newSegment(collection, segmentID, defaultPartitionID, collectionID, segTypeSealed)
err = node.replica.replaceGrowingSegmentBySealedSegment(ns)
assert.NoError(t, err)
segmentNums := node.replica.getSegmentNum()
assert.Equal(t, segmentNums, 1)
segment, err := node.replica.getSegmentByID(segmentID)
assert.NoError(t, err)
assert.Equal(t, segment.getType(), segTypeSealed)
_, _, segIDs = node.replica.getSealedSegmentsBySegmentType(segTypeGrowing)
assert.Equal(t, len(segIDs), 0)
_, _, segIDs = node.replica.getSealedSegmentsBySegmentType(segTypeSealed)
assert.Equal(t, len(segIDs), 1)
err = node.Stop()
assert.NoError(t, err)
}
......@@ -114,11 +114,6 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni
if err != nil {
return err
}
// replace segment
err = s.segLoader.replica.replaceGrowingSegmentBySealedSegment(segment)
if err != nil {
return err
}
if errIndex == nil {
fmt.Println("loading index...")
indexPaths, err := s.segLoader.indexLoader.getIndexPaths(buildID)
......@@ -130,7 +125,8 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni
return err
}
}
return nil
// replace segment
return s.segLoader.replica.replaceGrowingSegmentBySealedSegment(segment)
}
func newLoadService(ctx context.Context, masterClient MasterServiceInterface, dataClient DataServiceInterface, indexClient IndexServiceInterface, replica collectionReplica, dmStream msgstream.MsgStream) *loadService {
......
......@@ -12,7 +12,6 @@ package querynode
*/
import "C"
import (
"log"
"strconv"
"sync"
"unsafe"
......@@ -24,13 +23,13 @@ import (
)
const (
segTypeInvalid = 0
segTypeGrowing = 1
segTypeSealed = 2
segTypeIndexing = 3
segTypeInvalid = C.Invalid
segTypeGrowing = C.Growing
segTypeSealed = C.Sealed
segTypeIndexing = C.Indexing
)
type segmentType = int
type segmentType = C.SegmentType
type indexParam = map[string]string
type Segment struct {
......@@ -46,7 +45,7 @@ type Segment struct {
recentlyModified bool
typeMu sync.Mutex // guards builtIndex
segmentType int
segmentType C.SegmentType
paramMutex sync.RWMutex // guards index
indexParam map[int64]indexParam
......@@ -113,20 +112,7 @@ func newSegment(collection *Collection, segmentID int64, partitionID UniqueID, c
NewSegment(CCollection collection, uint64_t segment_id, SegmentType seg_type);
*/
initIndexParam := make(map[int64]indexParam)
var segmentPtr C.CSegmentInterface
switch segType {
case segTypeInvalid:
log.Println("illegal segment type when create segment")
return nil
case segTypeSealed:
segmentPtr = C.NewSegment(collection.collectionPtr, C.ulong(segmentID), C.Sealed)
case segTypeGrowing:
segmentPtr = C.NewSegment(collection.collectionPtr, C.ulong(segmentID), C.Growing)
default:
log.Println("illegal segment type when create segment")
return nil
}
segmentPtr := C.NewSegment(collection.collectionPtr, C.ulong(segmentID), segType)
var newSegment = &Segment{
segmentPtr: segmentPtr,
segmentType: segType,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册