未验证 提交 ef0fc37a 编写于 作者: C cai.zhang 提交者: GitHub

Fixed the bug that IndexCoord lost some meta information (#17818)

Signed-off-by: NCai.Zhang <cai.zhang@zilliz.com>
上级 46e0e265
......@@ -28,6 +28,8 @@ import (
"syscall"
"time"
v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"github.com/golang/protobuf/proto"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
......@@ -859,8 +861,7 @@ func (i *IndexCoord) watchMetaLoop() {
defer i.loopWg.Done()
log.Debug("IndexCoord watchMetaLoop start")
watchChan := i.metaTable.client.WatchWithPrefix(indexFilePrefix)
watchChan := i.metaTable.client.WatchWithRevision(indexFilePrefix, i.metaTable.revision)
for {
select {
case <-ctx.Done():
......@@ -871,6 +872,18 @@ func (i *IndexCoord) watchMetaLoop() {
return
}
if err := resp.Err(); err != nil {
if err == v3rpc.ErrCompacted {
newMetaTable, err := NewMetaTable(i.metaTable.client)
if err != nil {
log.Error("Constructing new meta table fails when etcd has a compaction error",
zap.String("path", indexFilePrefix), zap.String("etcd error", err.Error()), zap.Error(err))
panic("failed to handle etcd request, exit..")
}
i.metaTable = newMetaTable
i.loopWg.Add(1)
go i.watchMetaLoop()
return
}
log.Error("received error event from etcd watcher", zap.String("path", indexFilePrefix), zap.Error(err))
panic("failed to handle etcd request, exit..")
}
......
......@@ -18,6 +18,7 @@ package indexcoord
import (
"context"
"fmt"
"math/rand"
"os"
"os/signal"
......@@ -26,6 +27,9 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/kv"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
......@@ -324,6 +328,102 @@ func TestIndexCoord_watchNodeLoop(t *testing.T) {
assert.True(t, closed)
}
type mockEtcdKv struct {
kv.MetaKv
watchWithRevision func(string, int64) clientv3.WatchChan
loadWithRevisionAndVersions func(string) ([]string, []string, []int64, int64, error)
}
func (mek *mockEtcdKv) WatchWithRevision(key string, revision int64) clientv3.WatchChan {
return mek.watchWithRevision(key, revision)
}
func (mek *mockEtcdKv) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) {
return mek.loadWithRevisionAndVersions(key)
}
func TestIndexCoord_watchMetaLoop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ic := &IndexCoord{
loopCtx: ctx,
loopWg: sync.WaitGroup{},
}
watchChan := make(chan clientv3.WatchResponse, 1024)
client := &mockEtcdKv{
watchWithRevision: func(s string, i int64) clientv3.WatchChan {
return watchChan
},
}
mt := &metaTable{
client: client,
indexBuildID2Meta: map[UniqueID]Meta{},
revision: 0,
lock: sync.RWMutex{},
}
ic.metaTable = mt
t.Run("watch chan panic", func(t *testing.T) {
ic.loopWg.Add(1)
watchChan <- clientv3.WatchResponse{Canceled: true}
assert.Panics(t, func() {
ic.watchMetaLoop()
})
ic.loopWg.Wait()
})
t.Run("watch chan new meta table panic", func(t *testing.T) {
client = &mockEtcdKv{
watchWithRevision: func(s string, i int64) clientv3.WatchChan {
return watchChan
},
loadWithRevisionAndVersions: func(s string) ([]string, []string, []int64, int64, error) {
return []string{}, []string{}, []int64{}, 0, fmt.Errorf("error occurred")
},
}
mt = &metaTable{
client: client,
indexBuildID2Meta: map[UniqueID]Meta{},
revision: 0,
lock: sync.RWMutex{},
}
ic.metaTable = mt
ic.loopWg.Add(1)
watchChan <- clientv3.WatchResponse{CompactRevision: 10}
assert.Panics(t, func() {
ic.watchMetaLoop()
})
ic.loopWg.Wait()
})
t.Run("watch chan new meta success", func(t *testing.T) {
ic.loopWg = sync.WaitGroup{}
client = &mockEtcdKv{
watchWithRevision: func(s string, i int64) clientv3.WatchChan {
return watchChan
},
loadWithRevisionAndVersions: func(s string) ([]string, []string, []int64, int64, error) {
return []string{}, []string{}, []int64{}, 0, nil
},
}
mt = &metaTable{
client: client,
indexBuildID2Meta: map[UniqueID]Meta{},
revision: 0,
lock: sync.RWMutex{},
}
ic.metaTable = mt
ic.loopWg.Add(1)
watchChan <- clientv3.WatchResponse{CompactRevision: 10}
go ic.watchMetaLoop()
cancel()
ic.loopWg.Wait()
})
}
func TestIndexCoord_GetComponentStates(t *testing.T) {
n := &IndexCoord{}
n.stateCode.Store(internalpb.StateCode_Healthy)
......
......@@ -25,12 +25,13 @@ import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/metrics"
"go.uber.org/zap"
"github.com/golang/protobuf/proto"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
......@@ -47,14 +48,16 @@ type Meta struct {
// metaTable records the mapping of IndexBuildID to Meta.
type metaTable struct {
client *etcdkv.EtcdKV // client of a reliable kv service, i.e. etcd client
client kv.MetaKv // client of a reliable kv service, i.e. etcd client
indexBuildID2Meta map[UniqueID]Meta // index build id to index meta
revision int64
lock sync.RWMutex
}
// NewMetaTable is used to create a new meta table.
func NewMetaTable(kv *etcdkv.EtcdKV) (*metaTable, error) {
func NewMetaTable(kv kv.MetaKv) (*metaTable, error) {
mt := &metaTable{
client: kv,
lock: sync.RWMutex{},
......@@ -73,11 +76,13 @@ func (mt *metaTable) reloadFromKV() error {
key := indexFilePrefix
log.Debug("IndexCoord metaTable LoadWithPrefix ", zap.String("prefix", key))
_, values, versions, err := mt.client.LoadWithPrefix2(key)
_, values, versions, revision, err := mt.client.LoadWithRevisionAndVersions(key)
if err != nil {
return err
}
mt.revision = revision
for i := 0; i < len(values); i++ {
indexMeta := indexpb.IndexMeta{}
err = proto.Unmarshal([]byte(values[i]), &indexMeta)
......
......@@ -147,6 +147,27 @@ func (kv *EmbedEtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64,
return keys, values, versions, nil
}
func (kv *EmbedEtcdKV) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) {
key = path.Join(kv.rootPath, key)
log.Debug("LoadWithPrefix ", zap.String("prefix", key))
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, nil, 0, err
}
keys := make([]string, 0, resp.Count)
values := make([]string, 0, resp.Count)
versions := make([]int64, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, string(kv.Value))
versions = append(versions, kv.Version)
}
return keys, values, versions, resp.Header.Revision, nil
}
// LoadBytesWithPrefix2 returns all the keys and values with versions by the given key prefix
func (kv *EmbedEtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) {
key = path.Join(kv.rootPath, key)
......
......@@ -128,6 +128,13 @@ func TestEmbedEtcd(te *testing.T) {
assert.ElementsMatch(t, test.expectedValues, actualValues)
assert.NotZero(t, versions)
assert.Equal(t, test.expectedError, err)
actualKeys, actualValues, versions, revision, err := metaKv.LoadWithRevisionAndVersions(test.prefix)
assert.ElementsMatch(t, test.expectedKeys, actualKeys)
assert.ElementsMatch(t, test.expectedValues, actualValues)
assert.NotZero(t, versions)
assert.NotZero(t, revision)
assert.Equal(t, test.expectedError, err)
}
removeTests := []struct {
......
......@@ -124,6 +124,28 @@ func (kv *EtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, erro
return keys, values, versions, nil
}
func (kv *EtcdKV) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) {
start := time.Now()
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, nil, 0, err
}
keys := make([]string, 0, resp.Count)
values := make([]string, 0, resp.Count)
versions := make([]int64, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, string(kv.Value))
versions = append(versions, kv.Version)
}
CheckElapseAndWarn(start, "Slow etcd operation load with prefix2")
return keys, values, versions, resp.Header.Revision, nil
}
// LoadBytesWithPrefix2 returns all the the keys,values and key versions with the given key prefix.
func (kv *EtcdKV) LoadBytesWithPrefix2(key string) ([]string, [][]byte, []int64, error) {
start := time.Now()
......
......@@ -126,6 +126,13 @@ func TestEtcdKV_Load(te *testing.T) {
assert.ElementsMatch(t, test.expectedValues, actualValues)
assert.NotZero(t, versions)
assert.Equal(t, test.expectedError, err)
actualKeys, actualValues, versions, revision, err := etcdKV.LoadWithRevisionAndVersions(test.prefix)
assert.ElementsMatch(t, test.expectedKeys, actualKeys)
assert.ElementsMatch(t, test.expectedValues, actualValues)
assert.NotZero(t, versions)
assert.NotZero(t, revision)
assert.Equal(t, test.expectedError, err)
}
removeTests := []struct {
......
......@@ -64,6 +64,7 @@ type MetaKv interface {
GetPath(key string) string
LoadWithPrefix(key string) ([]string, []string, error)
LoadWithPrefix2(key string) ([]string, []string, []int64, error)
LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error)
LoadWithRevision(key string) ([]string, []string, int64, error)
Watch(key string) clientv3.WatchChan
WatchWithPrefix(key string) clientv3.WatchChan
......
......@@ -107,6 +107,10 @@ func (m *MockMetaKV) LoadWithPrefix2(key string) ([]string, []string, []int64, e
panic("not implemented") // TODO: Implement
}
func (m *MockMetaKV) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) {
panic("not implemented") // TODO: Implement
}
func (m *MockMetaKV) LoadWithRevision(key string) ([]string, []string, int64, error) {
panic("not implemented") // TODO: Implement
}
......
......@@ -90,6 +90,10 @@ func TestMockKV_MetaKV(t *testing.T) {
mockKv.LoadWithPrefix2(testKey)
})
assert.Panics(t, func() {
mockKv.LoadWithRevisionAndVersions(testKey)
})
assert.Panics(t, func() {
mockKv.LoadWithRevision(testKey)
})
......
......@@ -89,6 +89,10 @@ func (m *mockMetaKV) LoadWithPrefix2(key string) ([]string, []string, []int64, e
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) LoadWithRevisionAndVersions(key string) ([]string, []string, []int64, int64, error) {
panic("not implemented") // TODO: Implement
}
func (m *mockMetaKV) LoadWithRevision(key string) ([]string, []string, int64, error) {
panic("not implemented") // TODO: Implement
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册