未验证 提交 6bc2efe4 编写于 作者: Z zhenshan.cao 提交者: GitHub

Fixbug: IndexNode should panic when save meta failed to MetaKV (#15347)

Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 f3f46d3b
......@@ -120,6 +120,10 @@ build-cpp-with-unittest:
# Run the tests.
unittest: test-cpp test-go
test-indexnode:
@echo "Running go unittests..."
go test -race -coverpkg=./... -coverprofile=profile.out -covermode=atomic -timeout 5m github.com/milvus-io/milvus/internal/indexnode -v
test-proxy:
@echo "Running go unittests..."
go test -race -coverpkg=./... -coverprofile=profile.out -covermode=atomic -timeout 5m github.com/milvus-io/milvus/internal/proxy -v
......
......@@ -305,7 +305,7 @@ func TestIndexNode(t *testing.T) {
defer in.etcdKV.RemoveWithPrefix(metaPath2)
})
t.Run("Create Deleted_Index", func(t *testing.T) {
t.Run("Create DeletedIndex", func(t *testing.T) {
var insertCodec storage.InsertCodec
insertCodec.Schema = &etcdpb.CollectionMeta{
......@@ -398,19 +398,21 @@ func TestIndexNode(t *testing.T) {
status, err2 := in.CreateIndex(ctx, req)
assert.Nil(t, err2)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
time.Sleep(100 * time.Millisecond)
strValue, err3 := in.etcdKV.Load(metaPath3)
assert.Nil(t, err3)
indexMetaTmp := indexpb.IndexMeta{}
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
for indexMetaTmp.State != commonpb.IndexState_Finished {
time.Sleep(100 * time.Millisecond)
strValue, err := in.etcdKV.Load(metaPath3)
assert.Nil(t, err)
err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
assert.Nil(t, err)
}
assert.Equal(t, true, indexMetaTmp.MarkDeleted)
assert.Equal(t, int64(1), indexMetaTmp.Version)
//for indexMetaTmp.State != commonpb.IndexState_Finished {
// time.Sleep(100 * time.Millisecond)
// strValue, err := in.etcdKV.Load(metaPath3)
// assert.Nil(t, err)
// err = proto.Unmarshal([]byte(strValue), &indexMetaTmp)
// assert.Nil(t, err)
//}
defer in.kv.MultiRemove(indexMetaTmp.IndexFilePaths)
defer func() {
for k := range kvs {
......
......@@ -60,6 +60,8 @@ type task interface {
Notify(err error)
OnEnqueue() error
SetError(err error)
SetState(state TaskState)
GetState() TaskState
}
// BaseTask is an basic instance of task.
......@@ -69,6 +71,17 @@ type BaseTask struct {
id UniqueID
err error
internalErr error
state TaskState
}
// SetState sets task's state.
func (bt *BaseTask) SetState(state TaskState) {
bt.state = state
}
// GetState gets task's state.
func (bt *BaseTask) GetState() TaskState {
return bt.state
}
// SetError sets an error to task.
......@@ -142,90 +155,105 @@ func (bt *BaseTask) Name() string {
// OnEnqueue enqueues indexing tasks.
func (it *IndexBuildTask) OnEnqueue() error {
it.SetID(it.req.IndexBuildID)
it.SetState(TaskStateNormal)
log.Debug("IndexNode IndexBuilderTask Enqueue", zap.Int64("taskID", it.ID()), zap.Int64("index buildID", it.req.IndexBuildID))
it.tr = timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildTask %d", it.req.IndexBuildID))
return nil
}
// checkIndexMeta load meta from etcd to determine whether the task should continue execution.
func (it *IndexBuildTask) checkIndexMeta(ctx context.Context, pre bool) error {
// loadIndexMeta load meta from etcd.
func (it *IndexBuildTask) loadIndexMeta(ctx context.Context) (*indexpb.IndexMeta, int64, error) {
indexMeta := indexpb.IndexMeta{}
var source int64
fn := func() error {
//TODO error handling need to be optimized, return Unrecoverable to avoid retry
indexMeta := indexpb.IndexMeta{}
_, values, versions, err := it.etcdKV.LoadWithPrefix2(it.req.MetaPath)
if err != nil {
log.Error("IndexNode checkIndexMeta", zap.Any("load meta error with path", it.req.MetaPath),
zap.Error(err), zap.Any("pre", pre))
return err
}
if len(values) == 0 {
return fmt.Errorf("IndexNode checkIndexMeta the indexMeta is empty")
return fmt.Errorf("IndexNode loadIndexMeta get empty")
}
log.Debug("IndexNode checkIndexMeta load meta success", zap.Any("path", it.req.MetaPath), zap.Any("pre", pre))
err = proto.Unmarshal([]byte(values[0]), &indexMeta)
if err != nil {
log.Error("IndexNode failed to unmarshal index meta", zap.Error(err))
return err
}
log.Debug("IndexNode checkIndexMeta Unmarshal success", zap.Any("IndexMeta", indexMeta))
if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished {
log.Info("IndexNode checkIndexMeta version mismatch",
zap.Any("req version", it.req.Version),
zap.Any("index meta version", indexMeta.Version))
return nil
}
if indexMeta.MarkDeleted {
indexMeta.State = commonpb.IndexState_Finished
v, err := proto.Marshal(&indexMeta)
if err != nil {
return err
}
err = it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0], string(v))
if err != nil {
return err
}
errMsg := fmt.Sprintf("the index has been deleted with indexBuildID %d", indexMeta.IndexBuildID)
log.Warn(errMsg)
return fmt.Errorf(errMsg)
}
if pre {
return nil
}
indexMeta.IndexFilePaths = it.savePaths
indexMeta.State = commonpb.IndexState_Finished
indexMeta.SerializeSize = it.serializedSize
// Under normal circumstances, it.err and it.internalErr will not be non-nil at the same time, but for the sake of insurance, the else judgment is added.
if it.err != nil {
log.Error("IndexNode CreateIndex failed and can not be retried", zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Any("err", it.err))
indexMeta.State = commonpb.IndexState_Failed
indexMeta.FailReason = it.err.Error()
} else if it.internalErr != nil {
log.Error("IndexNode CreateIndex failed, but it can retried", zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Any("err", it.internalErr))
indexMeta.State = commonpb.IndexState_Unissued
}
source = versions[0]
return nil
}
err := retry.Do(ctx, fn, retry.Attempts(3))
if err != nil {
return nil, -1, err
}
return &indexMeta, source, nil
}
log.Debug("IndexNode", zap.Int64("indexBuildID", indexMeta.IndexBuildID), zap.Any("IndexState", indexMeta.State))
var metaValue []byte
metaValue, err = proto.Marshal(&indexMeta)
if err != nil {
log.Warn("IndexNode", zap.Int64("indexBuildID", indexMeta.IndexBuildID), zap.Any("IndexState", indexMeta.State),
zap.Any("proto.Marshal failed:", err))
return err
}
err = it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0], string(metaValue))
if err != nil {
log.Warn("IndexNode checkIndexMeta CompareVersionAndSwap", zap.Error(err))
}
func (it *IndexBuildTask) updateTaskState(indexMeta *indexpb.IndexMeta) TaskState {
if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished {
it.SetState(TaskStateAbandon)
} else if indexMeta.MarkDeleted {
it.SetState(TaskStateAbandon)
}
return it.GetState()
}
// saveIndexMeta try to save index meta to metaKV.
// if failed, IndexNode will panic to inform indexcoord.
func (it *IndexBuildTask) saveIndexMeta(ctx context.Context) error {
defer it.tr.Record("IndexNode IndexBuildTask saveIndexMeta")
indexMeta, version, err := it.loadIndexMeta(ctx)
if err != nil {
errMsg := fmt.Sprintf("IndexNode IndexBuildTask saveIndexMeta fail to load index meta, IndexBuildID=%d", indexMeta.IndexBuildID)
panic(errMsg)
}
taskState := it.updateTaskState(indexMeta)
if taskState == TaskStateAbandon {
log.Info("IndexNode IndexBuildTask saveIndexMeta", zap.String("TaskState", taskState.String()),
zap.Int64("IndexBuildID", indexMeta.IndexBuildID))
return nil
}
err := retry.Do(ctx, fn, retry.Attempts(3))
indexMeta.IndexFilePaths = it.savePaths
indexMeta.SerializeSize = it.serializedSize
if taskState == TaskStateFailed {
log.Error("IndexNode IndexBuildTask saveIndexMeta set indexMeta.state to IndexState_Failed",
zap.String("TaskState", taskState.String()),
zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Error(it.err))
indexMeta.State = commonpb.IndexState_Failed
indexMeta.FailReason = it.err.Error()
} else if taskState == TaskStateRetry {
log.Info("IndexNode IndexBuildTask saveIndexMeta set indexMeta.state to IndexState_Unissued",
zap.String("TaskState", taskState.String()),
zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Error(it.internalErr))
indexMeta.State = commonpb.IndexState_Unissued
} else { // TaskStateNormal
log.Info("IndexNode IndexBuildTask saveIndexmeta indexMeta.state to IndexState_Unissued",
zap.String("TaskState", taskState.String()),
zap.Int64("IndexBuildID", indexMeta.IndexBuildID))
indexMeta.State = commonpb.IndexState_Finished
}
var metaValue []byte
metaValue, err = proto.Marshal(indexMeta)
if err != nil {
errMsg := fmt.Sprintf("IndexNode IndexBuildTask saveIndexMeta fail to marshal index meta, IndexBuildID=%d, err=%s",
indexMeta.IndexBuildID, err.Error())
panic(errMsg)
}
strMetaValue := string(metaValue)
fn := func() error {
return it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, version, strMetaValue)
}
err = retry.Do(ctx, fn, retry.Attempts(3))
if err != nil {
log.Error("IndexNode failed to checkIndexMeta", zap.Error(err))
panic(err.Error())
}
msg := fmt.Sprintf("check index meta pre: %v", pre)
it.tr.Record(msg)
return err
return nil
}
// PreExecute does some checks before building the index, for example, whether the index has been deleted.
......@@ -233,7 +261,13 @@ func (it *IndexBuildTask) PreExecute(ctx context.Context) error {
log.Debug("IndexNode IndexBuildTask preExecute...", zap.Int64("buildId", it.req.IndexBuildID))
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-PreExecute")
defer sp.Finish()
return it.checkIndexMeta(ctx, true)
indexMeta, _, err := it.loadIndexMeta(ctx)
if err != nil {
// assume that we can loadIndexMeta later...
return nil
}
it.updateTaskState(indexMeta)
return nil
}
// PostExecute does some checks after building the index, for example, whether the index has been deleted or
......@@ -242,10 +276,10 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
log.Debug("IndexNode IndexBuildTask PostExecute...", zap.Int64("buildId", it.req.IndexBuildID))
sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-PostExecute")
defer sp.Finish()
return it.checkIndexMeta(ctx, false)
return it.saveIndexMeta(ctx)
}
func (it *IndexBuildTask) executePrepareParams(ctx context.Context) error {
func (it *IndexBuildTask) prepareParams(ctx context.Context) error {
typeParams := make(map[string]string)
for _, kvPair := range it.req.GetTypeParams() {
key, value := kvPair.GetKey(), kvPair.GetValue()
......@@ -290,7 +324,7 @@ func (it *IndexBuildTask) executePrepareParams(ctx context.Context) error {
return nil
}
func (it *IndexBuildTask) executeStepLoad(ctx context.Context) (storage.FieldID, storage.FieldData, error) {
func (it *IndexBuildTask) loadVector(ctx context.Context) (storage.FieldID, storage.FieldData, error) {
getValueByPath := func(path string) ([]byte, error) {
data, err := it.kv.Load(path)
if err != nil {
......@@ -369,12 +403,12 @@ func (it *IndexBuildTask) executeStepLoad(ctx context.Context) (storage.FieldID,
return fieldID, data, nil
}
func (it *IndexBuildTask) executeStepBuild(ctx context.Context) ([]*storage.Blob, error) {
func (it *IndexBuildTask) buildIndex(ctx context.Context) ([]*storage.Blob, error) {
var fieldID storage.FieldID
{
var err error
var fieldData storage.FieldData
fieldID, fieldData, err = it.executeStepLoad(ctx)
fieldID, fieldData, err = it.loadVector(ctx)
if err != nil {
return nil, err
}
......@@ -437,7 +471,7 @@ func (it *IndexBuildTask) executeStepBuild(ctx context.Context) ([]*storage.Blob
return serializedIndexBlobs, nil
}
func (it *IndexBuildTask) executeSave(ctx context.Context, blobs []*storage.Blob) error {
func (it *IndexBuildTask) saveIndex(ctx context.Context, blobs []*storage.Blob) error {
blobCnt := len(blobs)
it.serializedSize = 0
for i := range blobs {
......@@ -501,7 +535,11 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-Execute")
defer sp.Finish()
if err := it.executePrepareParams(ctx); err != nil {
if err := it.prepareParams(ctx); err != nil {
it.SetState(TaskStateFailed)
log.Error("IndexNode IndexBuildTask Execute prepareParams failed",
zap.Int64("buildId", it.req.IndexBuildID),
zap.Error(err))
return err
}
......@@ -510,6 +548,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
var err error
it.index, err = NewCIndex(it.newTypeParams, it.newIndexParams)
if err != nil {
it.SetState(TaskStateFailed)
log.Error("IndexNode IndexBuildTask Execute NewCIndex failed",
zap.Int64("buildId", it.req.IndexBuildID),
zap.Error(err))
......@@ -526,13 +565,18 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
}()
var blobs []*storage.Blob
blobs, err = it.executeStepBuild(ctx)
blobs, err = it.buildIndex(ctx)
if err != nil {
it.SetState(TaskStateFailed)
log.Error("IndexNode IndexBuildTask Execute buildIndex failed",
zap.Int64("buildId", it.req.IndexBuildID),
zap.Error(err))
return err
}
err = it.executeSave(ctx, blobs)
err = it.saveIndex(ctx, blobs)
if err != nil {
it.SetState(TaskStateRetry)
return err
}
it.tr.Record("index file save done")
......
......@@ -245,6 +245,13 @@ func (sched *TaskScheduler) processTask(t task, q TaskQueue) {
err := t.PreExecute(ctx)
t.SetError(err)
if t.GetState() == TaskStateAbandon {
log.Info("IndexNode scheduler abandon task",
zap.String("TaskState", t.GetState().String()),
zap.Int64("taskID", t.ID()))
return
}
defer func() {
span.LogFields(oplog.Int64("scheduler process PostExecute", t.ID()))
err := t.PostExecute(ctx)
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package indexnode
type TaskState int32
const (
TaskStateNormal TaskState = 0
TaskStateAbandon TaskState = 1
TaskStateRetry TaskState = 2
TaskStateFailed TaskState = 3
)
var TaskStateNames = map[TaskState]string{
0: "Normal",
1: "Abandon",
2: "Retry",
3: "Failed",
}
func (x TaskState) String() string {
ret, ok := TaskStateNames[x]
if !ok {
return "None"
}
return ret
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package indexnode
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestTaskState_String(t *testing.T) {
assert.Equal(t, TaskStateNormal.String(), "Normal")
assert.Equal(t, TaskStateAbandon.String(), "Abandon")
assert.Equal(t, TaskStateRetry.String(), "Retry")
assert.Equal(t, TaskStateFailed.String(), "Failed")
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册