未验证 提交 4cd7680f 编写于 作者: G godchen 提交者: GitHub

Reschedule load segments (#11391)

Signed-off-by: Ngodchen <qingxiang.chen@zilliz.com>
上级 d38043f2
......@@ -53,6 +53,7 @@ type Cluster interface {
getNumDmChannels(nodeID int64) (int, error)
hasWatchedQueryChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool
hasWatchedDeltaChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool
getCollectionInfosByID(ctx context.Context, nodeID int64) []*querypb.CollectionInfo
addQueryChannel(ctx context.Context, nodeID int64, in *querypb.AddQueryChannelRequest) error
removeQueryChannel(ctx context.Context, nodeID int64, in *querypb.RemoveQueryChannelRequest) error
......@@ -295,11 +296,24 @@ func (c *queryNodeCluster) watchDeltaChannels(ctx context.Context, nodeID int64,
log.Debug("WatchDeltaChannels: queryNode watch dm channel error", zap.String("error", err.Error()))
return err
}
err = c.clusterMeta.setDeltaChannel(in.CollectionID, in.Infos)
if err != nil {
log.Debug("WatchDeltaChannels: queryNode watch delta channel error", zap.String("error", err.Error()))
return err
}
return nil
}
return errors.New("WatchDeltaChannels: Can't find query node by nodeID ")
}
func (c *queryNodeCluster) hasWatchedDeltaChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool {
c.Lock()
defer c.Unlock()
return c.nodes[nodeID].hasWatchedDeltaChannel(collectionID)
}
func (c *queryNodeCluster) hasWatchedQueryChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool {
c.Lock()
defer c.Unlock()
......
......@@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
......@@ -36,6 +37,7 @@ const (
collectionMetaPrefix = "queryCoord-collectionMeta"
segmentMetaPrefix = "queryCoord-segmentMeta"
queryChannelMetaPrefix = "queryCoord-queryChannel"
deltaChannelMetaPrefix = "queryCoord-deltaChannel"
sealedSegmentChangeInfoPrefix = "queryCoord-sealedSegmentChangeInfo"
globalQuerySeekPositionPrefix = "queryCoord-globalQuerySeekPosition"
)
......@@ -72,6 +74,9 @@ type Meta interface {
addDmChannel(collectionID UniqueID, nodeID int64, channels []string) error
removeDmChannel(collectionID UniqueID, nodeID int64, channels []string) error
getDeltaChannelsByCollectionID(collectionID UniqueID) ([]*datapb.VchannelInfo, error)
setDeltaChannel(collectionID UniqueID, info []*datapb.VchannelInfo) error
getQueryChannelInfoByID(collectionID UniqueID) (*querypb.QueryChannelInfo, error)
getQueryStreamByID(collectionID UniqueID) (msgstream.MsgStream, error)
......@@ -99,6 +104,8 @@ type MetaReplica struct {
segmentMu sync.RWMutex
queryChannelInfos map[UniqueID]*querypb.QueryChannelInfo
channelMu sync.RWMutex
deltaChannelInfos map[UniqueID][]*datapb.VchannelInfo
deltaChannelMu sync.RWMutex
queryStreams map[UniqueID]msgstream.MsgStream
streamMu sync.RWMutex
......@@ -111,6 +118,7 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll
collectionInfos := make(map[UniqueID]*querypb.CollectionInfo)
segmentInfos := make(map[UniqueID]*querypb.SegmentInfo)
queryChannelInfos := make(map[UniqueID]*querypb.QueryChannelInfo)
deltaChannelInfos := make(map[UniqueID][]*datapb.VchannelInfo)
queryMsgStream := make(map[UniqueID]msgstream.MsgStream)
position := &internalpb.MsgPosition{}
......@@ -124,6 +132,7 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll
collectionInfos: collectionInfos,
segmentInfos: segmentInfos,
queryChannelInfos: queryChannelInfos,
deltaChannelInfos: deltaChannelInfos,
queryStreams: queryMsgStream,
globalSeekPosition: position,
}
......@@ -187,6 +196,25 @@ func (m *MetaReplica) reloadFromKV() error {
}
m.queryChannelInfos[collectionID] = queryChannelInfo
}
deltaChannelKeys, deltaChannelValues, err := m.client.LoadWithPrefix(deltaChannelMetaPrefix)
if err != nil {
return nil
}
for index, value := range deltaChannelValues {
collectionIDString, _ := filepath.Split(deltaChannelKeys[index])
collectionID, err := strconv.ParseInt(collectionIDString, 10, 64)
if err != nil {
return err
}
deltaChannelInfo := &datapb.VchannelInfo{}
err = proto.Unmarshal([]byte(value), deltaChannelInfo)
if err != nil {
return err
}
m.deltaChannelInfos[collectionID] = append(m.deltaChannelInfos[collectionID], deltaChannelInfo)
}
globalSeekPosValue, err := m.client.Load(globalQuerySeekPositionPrefix)
if err == nil {
position := &internalpb.MsgPosition{}
......@@ -950,6 +978,33 @@ func createQueryChannel(collectionID UniqueID) *querypb.QueryChannelInfo {
return info
}
// Get delta channel info for collection, so far all the collection share the same query channel 0
func (m *MetaReplica) getDeltaChannelsByCollectionID(collectionID UniqueID) ([]*datapb.VchannelInfo, error) {
m.deltaChannelMu.RLock()
defer m.deltaChannelMu.RUnlock()
if infos, ok := m.deltaChannelInfos[collectionID]; ok {
return infos, nil
}
return nil, fmt.Errorf("delta channel not exist in meta")
}
func (m *MetaReplica) setDeltaChannel(collectionID UniqueID, infos []*datapb.VchannelInfo) error {
m.deltaChannelMu.Lock()
defer m.deltaChannelMu.Unlock()
_, ok := m.deltaChannelInfos[collectionID]
if ok {
return nil
}
err := saveDeltaChannelInfo(collectionID, infos, m.client)
if err != nil {
return err
}
m.deltaChannelInfos[collectionID] = infos
return nil
}
// Get Query channel info for collection, so far all the collection share the same query channel 0
func (m *MetaReplica) getQueryChannelInfoByID(collectionID UniqueID) (*querypb.QueryChannelInfo, error) {
m.channelMu.Lock()
......@@ -1149,3 +1204,17 @@ func saveQueryChannelInfo(collectionID UniqueID, info *querypb.QueryChannelInfo,
key := fmt.Sprintf("%s/%d", queryChannelMetaPrefix, collectionID)
return kv.Save(key, string(infoBytes))
}
func saveDeltaChannelInfo(collectionID UniqueID, infos []*datapb.VchannelInfo, kv kv.MetaKv) error {
kvs := make(map[string]string)
for _, info := range infos {
infoBytes, err := proto.Marshal(info)
if err != nil {
return err
}
key := fmt.Sprintf("%s/%d/%s", deltaChannelMetaPrefix, collectionID, info.ChannelName)
kvs[key] = string(infoBytes)
}
return kv.MultiSave(kvs)
}
......@@ -20,6 +20,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
......@@ -38,6 +39,7 @@ type queryNodeClientMock struct {
func newQueryNodeTest(ctx context.Context, address string, id UniqueID, kv *etcdkv.EtcdKV) (Node, error) {
collectionInfo := make(map[UniqueID]*querypb.CollectionInfo)
watchedChannels := make(map[UniqueID]*querypb.QueryChannelInfo)
watchedDeltaChannels := make(map[UniqueID][]*datapb.VchannelInfo)
childCtx, cancel := context.WithCancel(ctx)
client, err := newQueryNodeClientMock(childCtx, address)
if err != nil {
......@@ -53,6 +55,7 @@ func newQueryNodeTest(ctx context.Context, address string, id UniqueID, kv *etcd
kvClient: kv,
collectionInfos: collectionInfo,
watchedQueryChannels: watchedChannels,
watchedDeltaChannels: watchedDeltaChannels,
}
return node, nil
......
......@@ -24,6 +24,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
......@@ -51,6 +52,7 @@ type Node interface {
watchDeltaChannels(ctx context.Context, in *querypb.WatchDeltaChannelsRequest) error
//removeDmChannel(collectionID UniqueID, channels []string) error
hasWatchedDeltaChannel(collectionID UniqueID) bool
hasWatchedQueryChannel(collectionID UniqueID) bool
//showWatchedQueryChannels() []*querypb.QueryChannelInfo
addQueryChannel(ctx context.Context, in *querypb.AddQueryChannelRequest) error
......@@ -80,6 +82,7 @@ type queryNode struct {
sync.RWMutex
collectionInfos map[UniqueID]*querypb.CollectionInfo
watchedQueryChannels map[UniqueID]*querypb.QueryChannelInfo
watchedDeltaChannels map[UniqueID][]*datapb.VchannelInfo
state nodeState
stateLock sync.RWMutex
......@@ -92,6 +95,7 @@ type queryNode struct {
func newQueryNode(ctx context.Context, address string, id UniqueID, kv *etcdkv.EtcdKV) (Node, error) {
collectionInfo := make(map[UniqueID]*querypb.CollectionInfo)
watchedChannels := make(map[UniqueID]*querypb.QueryChannelInfo)
watchedDeltaChannels := make(map[UniqueID][]*datapb.VchannelInfo)
childCtx, cancel := context.WithCancel(ctx)
client, err := nodeclient.NewClient(childCtx, address)
if err != nil {
......@@ -107,6 +111,7 @@ func newQueryNode(ctx context.Context, address string, id UniqueID, kv *etcdkv.E
kvClient: kv,
collectionInfos: collectionInfo,
watchedQueryChannels: watchedChannels,
watchedDeltaChannels: watchedDeltaChannels,
state: disConnect,
}
......@@ -329,6 +334,14 @@ func (qn *queryNode) hasWatchedQueryChannel(collectionID UniqueID) bool {
return false
}
func (qn *queryNode) hasWatchedDeltaChannel(collectionID UniqueID) bool {
qn.RLock()
defer qn.RUnlock()
_, ok := qn.watchedDeltaChannels[collectionID]
return ok
}
//func (qn *queryNode) showWatchedQueryChannels() []*querypb.QueryChannelInfo {
// qn.RLock()
// defer qn.RUnlock()
......@@ -341,6 +354,13 @@ func (qn *queryNode) hasWatchedQueryChannel(collectionID UniqueID) bool {
// return results
//}
func (qn *queryNode) setDeltaChannelInfo(collectionID int64, infos []*datapb.VchannelInfo) {
qn.Lock()
defer qn.Unlock()
qn.watchedDeltaChannels[collectionID] = infos
}
func (qn *queryNode) setQueryChannelInfo(info *querypb.QueryChannelInfo) {
qn.Lock()
defer qn.Unlock()
......@@ -433,6 +453,7 @@ func (qn *queryNode) watchDeltaChannels(ctx context.Context, in *querypb.WatchDe
if status.ErrorCode != commonpb.ErrorCode_Success {
return errors.New(status.Reason)
}
qn.setDeltaChannelInfo(in.CollectionID, in.Infos)
return err
}
......
......@@ -351,7 +351,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
watchDmChannelReqs := make([]*querypb.WatchDmChannelsRequest, 0)
channelsToWatch := make([]string, 0)
segmentsToLoad := make([]UniqueID, 0)
watchDeltaChannelReqs := make([]*querypb.WatchDeltaChannelsRequest, 0)
var watchDeltaChannels []*datapb.VchannelInfo
for _, partitionID := range toLoadPartitionIDs {
getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequest{
Base: lct.Base,
......@@ -390,25 +390,16 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq)
}
// init delta channels for sealed segments.
if len(loadSegmentReqs) != 0 && len(watchDeltaChannelReqs) != len(recoveryInfo.Channels) {
if len(watchDeltaChannels) != len(recoveryInfo.Channels) {
for _, info := range recoveryInfo.Channels {
deltaChannel, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix)
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix)
if err != nil {
return err
}
deltaInfo := proto.Clone(info).(*datapb.VchannelInfo)
deltaInfo.ChannelName = deltaChannel
msgBase := proto.Clone(lct.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels
watchDeltaRequest := &querypb.WatchDeltaChannelsRequest{
Base: msgBase,
CollectionID: collectionID,
Infos: []*datapb.VchannelInfo{deltaInfo},
}
watchDeltaChannelReqs = append(watchDeltaChannelReqs, watchDeltaRequest)
deltaChannel := proto.Clone(info).(*datapb.VchannelInfo)
deltaChannel.ChannelName = deltaChannelName
watchDeltaChannels = append(watchDeltaChannels, deltaChannel)
}
}
for _, info := range recoveryInfo.Channels {
......@@ -452,8 +443,17 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
}
}
msgBase := proto.Clone(lct.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels
watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{
Base: msgBase,
CollectionID: collectionID,
Infos: watchDeltaChannels,
}
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
lct.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
internalTasks, err := assignInternalTask(ctx, collectionID, lct, lct.meta, lct.cluster, loadSegmentReqs, watchDmChannelReqs, watchDeltaChannelReqs, false, nil, nil)
internalTasks, err := assignInternalTask(ctx, collectionID, lct, lct.meta, lct.cluster, loadSegmentReqs, watchDmChannelReqs, watchDeltaChannelReq, false, nil, nil)
if err != nil {
log.Warn("loadCollectionTask: assign child task failed", zap.Int64("collectionID", collectionID))
lct.setResultInfo(err)
......@@ -707,7 +707,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0)
channelsToWatch := make([]string, 0)
watchDmReqs := make([]*querypb.WatchDmChannelsRequest, 0)
watchDeltaReqs := make([]*querypb.WatchDeltaChannelsRequest, 0)
var watchDeltaChannels []*datapb.VchannelInfo
for _, partitionID := range partitionIDs {
getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequest{
Base: lpt.Base,
......@@ -745,23 +745,15 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq)
}
// init delta channels for sealed segments.
if len(loadSegmentReqs) != 0 && len(watchDeltaReqs) != len(recoveryInfo.Channels) {
if len(watchDeltaChannels) != len(recoveryInfo.Channels) {
for _, info := range recoveryInfo.Channels {
deltaChannel, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix)
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix)
if err != nil {
return err
}
deltaInfo := proto.Clone(info).(*datapb.VchannelInfo)
deltaInfo.ChannelName = deltaChannel
msgBase := proto.Clone(lpt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels
watchDeltaRequest := &querypb.WatchDeltaChannelsRequest{
Base: msgBase,
CollectionID: collectionID,
Infos: []*datapb.VchannelInfo{deltaInfo},
}
watchDeltaReqs = append(watchDeltaReqs, watchDeltaRequest)
deltaChannel := proto.Clone(info).(*datapb.VchannelInfo)
deltaChannel.ChannelName = deltaChannelName
watchDeltaChannels = append(watchDeltaChannels, deltaChannel)
}
}
......@@ -783,7 +775,16 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
}
}
internalTasks, err := assignInternalTask(ctx, collectionID, lpt, lpt.meta, lpt.cluster, loadSegmentReqs, watchDmReqs, watchDeltaReqs, false, nil, nil)
msgBase := proto.Clone(lpt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels
watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{
Base: msgBase,
CollectionID: collectionID,
Infos: watchDeltaChannels,
}
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
lpt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
internalTasks, err := assignInternalTask(ctx, collectionID, lpt, lpt.meta, lpt.cluster, loadSegmentReqs, watchDmReqs, watchDeltaChannelReq, false, nil, nil)
if err != nil {
log.Warn("loadPartitionTask: assign child task failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
lpt.setResultInfo(err)
......@@ -1081,6 +1082,19 @@ func (lst *loadSegmentTask) reschedule(ctx context.Context) ([]task, error) {
lst.excludeNodeIDs = []int64{}
}
lst.excludeNodeIDs = append(lst.excludeNodeIDs, lst.DstNodeID)
deltaChannelInfos, err := lst.meta.getDeltaChannelsByCollectionID(collectionID)
if err != nil {
return nil, err
}
msgBase := proto.Clone(lst.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels
watchDeltaRequest := &querypb.WatchDeltaChannelsRequest{
Base: msgBase,
CollectionID: collectionID,
Infos: deltaChannelInfos,
}
log.Debug("assignInternalTask: add a watchDeltaChannelTask childTask", zap.Any("task", watchDeltaRequest))
//TODO:: wait or not according msgType
reScheduledTasks, err := assignInternalTask(ctx, collectionID, lst.getParentTask(), lst.meta, lst.cluster, loadSegmentReqs, nil, nil, false, lst.excludeNodeIDs, nil)
if err != nil {
......@@ -1509,7 +1523,7 @@ func (ht *handoffTask) execute(ctx context.Context) error {
findBinlog := false
var loadSegmentReq *querypb.LoadSegmentsRequest
var watchDeltaChannelReqs []*querypb.WatchDeltaChannelsRequest
var watchDeltaChannels []*datapb.VchannelInfo
for _, segmentBinlogs := range recoveryInfo.Binlogs {
if segmentBinlogs.SegmentID == segmentID {
findBinlog = true
......@@ -1532,23 +1546,15 @@ func (ht *handoffTask) execute(ctx context.Context) error {
}
}
}
// init delta channels for sealed segments.
if loadSegmentReq != nil && len(watchDeltaChannelReqs) != len(recoveryInfo.Channels) {
if len(watchDeltaChannels) != len(recoveryInfo.Channels) {
for _, info := range recoveryInfo.Channels {
deltaChannel, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix)
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix)
if err != nil {
return err
}
deltaInfo := proto.Clone(info).(*datapb.VchannelInfo)
deltaInfo.ChannelName = deltaChannel
msgBase := proto.Clone(ht.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels
watchDeltaChannelsRequest := &querypb.WatchDeltaChannelsRequest{
Base: msgBase,
CollectionID: collectionID,
Infos: []*datapb.VchannelInfo{deltaInfo},
}
watchDeltaChannelReqs = append(watchDeltaChannelReqs, watchDeltaChannelsRequest)
deltaChannel := proto.Clone(info).(*datapb.VchannelInfo)
deltaChannel.ChannelName = deltaChannelName
watchDeltaChannels = append(watchDeltaChannels, deltaChannel)
}
}
......@@ -1557,7 +1563,16 @@ func (ht *handoffTask) execute(ctx context.Context) error {
ht.setResultInfo(err)
return err
}
internalTasks, err := assignInternalTask(ctx, collectionID, ht, ht.meta, ht.cluster, []*querypb.LoadSegmentsRequest{loadSegmentReq}, nil, watchDeltaChannelReqs, true, nil, nil)
msgBase := proto.Clone(ht.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels
watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{
Base: msgBase,
CollectionID: collectionID,
Infos: watchDeltaChannels,
}
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
ht.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
internalTasks, err := assignInternalTask(ctx, collectionID, ht, ht.meta, ht.cluster, []*querypb.LoadSegmentsRequest{loadSegmentReq}, nil, watchDeltaChannelReq, true, nil, nil)
if err != nil {
log.Error("handoffTask: assign child task failed", zap.Any("segmentInfo", segmentInfo))
ht.setResultInfo(err)
......@@ -1664,7 +1679,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0)
channelsToWatch := make([]string, 0)
watchDmChannelReqs := make([]*querypb.WatchDmChannelsRequest, 0)
watchDeltaChannelReqs := make([]*querypb.WatchDeltaChannelsRequest, 0)
var watchDeltaChannels []*datapb.VchannelInfo
dmChannels, err := lbt.meta.getDmChannelsByNodeID(collectionID, nodeID)
if err != nil {
......@@ -1712,20 +1727,15 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq)
}
// init delta channels for sealed segments.
if len(loadSegmentReqs) != 0 && len(watchDeltaChannelReqs) != len(recoveryInfo.Channels) {
if len(watchDeltaChannels) != len(recoveryInfo.Channels) {
for _, info := range recoveryInfo.Channels {
deltaChannel := info.ChannelName
deltaInfo := proto.Clone(info).(*datapb.VchannelInfo)
deltaInfo.ChannelName = deltaChannel
msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels
watchDeltaChannelsRequest := &querypb.WatchDeltaChannelsRequest{
Base: msgBase,
CollectionID: collectionID,
Infos: []*datapb.VchannelInfo{info},
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix)
if err != nil {
return err
}
watchDeltaChannelReqs = append(watchDeltaChannelReqs, watchDeltaChannelsRequest)
deltaChannel := proto.Clone(info).(*datapb.VchannelInfo)
deltaChannel.ChannelName = deltaChannelName
watchDeltaChannels = append(watchDeltaChannels, deltaChannel)
}
}
......@@ -1773,8 +1783,17 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
}
}
}
msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels
watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{
Base: msgBase,
CollectionID: collectionID,
Infos: watchDeltaChannels,
}
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
lbt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
internalTasks, err := assignInternalTask(ctx, collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs, watchDeltaChannelReqs, true, lbt.SourceNodeIDs, lbt.DstNodeIDs)
internalTasks, err := assignInternalTask(ctx, collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs, watchDeltaChannelReq, true, lbt.SourceNodeIDs, lbt.DstNodeIDs)
if err != nil {
log.Warn("loadBalanceTask: assign child task failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
lbt.setResultInfo(err)
......@@ -1849,7 +1868,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
for collectionID, partitionIDs := range col2PartitionIDs {
segmentsToLoad := make([]UniqueID, 0)
loadSegmentReqs := make([]*querypb.LoadSegmentsRequest, 0)
watchDeltaChannelReqs := make([]*querypb.WatchDeltaChannelsRequest, 0)
var watchDeltaChannels []*datapb.VchannelInfo
collectionInfo, err := lbt.meta.getCollectionInfoByID(collectionID)
if err != nil {
log.Error("loadBalanceTask: can't find collectionID in meta", zap.Int64("collectionID", collectionID), zap.Error(err))
......@@ -1903,29 +1922,30 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
loadSegmentReqs = append(loadSegmentReqs, loadSegmentReq)
}
// init delta channels for sealed segments.
if len(loadSegmentReqs) != 0 && len(watchDeltaChannelReqs) != len(recoveryInfo.Channels) {
if len(watchDeltaChannels) != len(recoveryInfo.Channels) {
for _, info := range recoveryInfo.Channels {
deltaChannel, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix)
deltaChannelName, err := rootcoord.ConvertChannelName(info.ChannelName, Params.DmlChannelPrefix, Params.DeltaChannelPrefix)
if err != nil {
return err
}
deltaInfo := proto.Clone(info).(*datapb.VchannelInfo)
deltaInfo.ChannelName = deltaChannel
msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels
watchDeltaRequest := &querypb.WatchDeltaChannelsRequest{
Base: msgBase,
CollectionID: collectionID,
Infos: []*datapb.VchannelInfo{deltaInfo},
}
watchDeltaChannelReqs = append(watchDeltaChannelReqs, watchDeltaRequest)
deltaChannel := proto.Clone(info).(*datapb.VchannelInfo)
deltaChannel.ChannelName = deltaChannelName
watchDeltaChannels = append(watchDeltaChannels, deltaChannel)
}
}
}
msgBase := proto.Clone(lbt.Base).(*commonpb.MsgBase)
msgBase.MsgType = commonpb.MsgType_WatchDeltaChannels
watchDeltaChannelReq := &querypb.WatchDeltaChannelsRequest{
Base: msgBase,
CollectionID: collectionID,
Infos: watchDeltaChannels,
}
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
lbt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
// TODO:: assignInternalTask with multi collection
internalTasks, err := assignInternalTask(ctx, collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, nil, watchDeltaChannelReqs, false, lbt.SourceNodeIDs, lbt.DstNodeIDs)
internalTasks, err := assignInternalTask(ctx, collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, nil, watchDeltaChannelReq, false, lbt.SourceNodeIDs, lbt.DstNodeIDs)
if err != nil {
log.Warn("loadBalanceTask: assign child task failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
lbt.setResultInfo(err)
......@@ -2005,7 +2025,7 @@ func assignInternalTask(ctx context.Context,
collectionID UniqueID, parentTask task, meta Meta, cluster Cluster,
loadSegmentRequests []*querypb.LoadSegmentsRequest,
watchDmChannelRequests []*querypb.WatchDmChannelsRequest,
watchDeltaChannelRequests []*querypb.WatchDeltaChannelsRequest,
watchDeltaChannelRequest *querypb.WatchDeltaChannelsRequest,
wait bool, excludeNodeIDs []int64, includeNodeIDs []int64) ([]task, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
......@@ -2067,9 +2087,9 @@ func assignInternalTask(ctx context.Context,
internalTasks = append(internalTasks, loadSegmentTask)
}
for _, req := range watchDeltaChannelRequests {
if watchDeltaChannelRequest != nil {
ctx = opentracing.ContextWithSpan(context.Background(), sp)
watchDeltaRequest := proto.Clone(req).(*querypb.WatchDeltaChannelsRequest)
watchDeltaRequest := proto.Clone(watchDeltaChannelRequest).(*querypb.WatchDeltaChannelsRequest)
watchDeltaRequest.NodeID = nodeID
baseTask := newBaseTask(ctx, parentTask.getTriggerCondition())
baseTask.setParentTask(parentTask)
......
......@@ -181,9 +181,10 @@ func genLoadSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int6
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadSegments,
},
DstNodeID: nodeID,
Schema: schema,
Infos: []*querypb.SegmentLoadInfo{segmentInfo},
DstNodeID: nodeID,
Schema: schema,
Infos: []*querypb.SegmentLoadInfo{segmentInfo},
CollectionID: defaultCollectionID,
}
baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest)
baseTask.taskID = 100
......@@ -594,6 +595,7 @@ func Test_RescheduleSegmentWithWatchQueryChannel(t *testing.T) {
node1.loadSegment = returnFailedResult
loadSegmentTask := genLoadSegmentTask(ctx, queryCoord, node1.queryNodeID)
loadSegmentTask.meta.setDeltaChannel(defaultCollectionID, []*datapb.VchannelInfo{})
loadCollectionTask := loadSegmentTask.parentTask
queryCoord.scheduler.triggerTaskQueue.addTask(loadCollectionTask)
......
......@@ -412,6 +412,16 @@ func (w *watchDeltaChannelsTask) Execute(ctx context.Context) error {
if err != nil {
return err
}
// Check if the same deltaChannel has been watched
for _, dstChan := range vDeltaChannels {
for _, srcChan := range hCol.vDeltaChannels {
if dstChan == srcChan {
return nil
}
}
}
hCol.addVDeltaChannels(vDeltaChannels)
hCol.addPDeltaChannels(pDeltaChannels)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册