提交 46a14da3 编写于 作者: B bigsheeper 提交者: yefu.chen

Implement segment manager and loadSegment

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 16ff16b2
......@@ -13,7 +13,7 @@
#include <string>
#include <map>
#include "knowhere/index/vector_index/VecIndex.h"
#include "../index/knowhere/knowhere/index/vector_index/VecIndex.h"
struct LoadIndexInfo {
std::string field_name;
......@@ -21,9 +21,3 @@ struct LoadIndexInfo {
std::map<std::string, std::string> index_params;
milvus::knowhere::VecIndexPtr index;
};
struct LoadFieldDataInfo {
int64_t field_id;
void* blob;
int64_t row_count;
};
......@@ -18,7 +18,7 @@
#include "query/deprecated/GeneralQuery.h"
#include "query/Plan.h"
#include "common/LoadInfo.h"
#include "common/LoadIndex.h"
#include "segcore/SegmentInterface.h"
namespace milvus {
......
......@@ -10,20 +10,17 @@
// or implied. See the License for the specific language governing permissions and limitations under the License
#include "SegmentInterface.h"
#include "common/LoadInfo.h"
namespace milvus::segcore {
class SegmentSealed {
public:
virtual const Schema&
get_schema() = 0;
virtual int64_t
get_row_count() = 0;
virtual void
LoadIndex(const LoadIndexInfo& info) = 0;
virtual void
LoadFieldData(const LoadFieldDataInfo& info) = 0;
};
} // namespace milvus::segcore
// class SegmentSealed : public SegmentInternalInterface {
// public:
// const Schema& get_schema() = 0;
// int64_t get_num_chunk() = 0;
//
// explicit SegmentSealed(SchemaPtr schema);
// void set_size();
// void load_data(FieldId field_id, void* blob, int64_t blob_size);
//
//
// private:
// SchemaPtr schema_;
// }
......@@ -12,7 +12,7 @@
#include "index/knowhere/knowhere/common/BinarySet.h"
#include "index/knowhere/knowhere/index/vector_index/VecIndexFactory.h"
#include "segcore/load_index_c.h"
#include "common/LoadInfo.h"
#include "common/LoadIndex.h"
#include "utils/EasyAssert.h"
CStatus
......
......@@ -19,7 +19,7 @@
#include <knowhere/index/vector_index/VecIndexFactory.h>
#include <cstdint>
#include <boost/concept_check.hpp>
#include "common/LoadInfo.h"
#include "common/LoadIndex.h"
CSegmentBase
NewSegment(CCollection collection, uint64_t segment_id) {
......
......@@ -18,7 +18,6 @@ set(MILVUS_TEST_FILES
test_reduce.cpp
test_interface.cpp
test_span.cpp
test_load.cpp
)
add_executable(all_tests
${MILVUS_TEST_FILES}
......
......@@ -23,7 +23,7 @@
#include <index/knowhere/knowhere/index/vector_index/adapter/VectorAdapter.h>
#include <index/knowhere/knowhere/index/vector_index/VecIndexFactory.h>
#include <index/knowhere/knowhere/index/vector_index/IndexIVFPQ.h>
#include <common/LoadInfo.h>
#include <common/LoadIndex.h>
#include <utils/Types.h>
#include <segcore/Collection.h>
#include "test_utils/DataGen.h"
......
#include <gtest/gtest.h>
#include "segcore/SegmentSealed.h"
TEST(Load, Naive) {
}
\ No newline at end of file
......@@ -3,10 +3,60 @@ package querynode
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
)
type Client struct {
ctx context.Context
querypb.QueryNodeClient
ctx context.Context
grpcClient querypb.QueryNodeClient
}
func (c *Client) Init() {
panic("implement me")
}
func (c *Client) Start() {
panic("implement me")
}
func (c *Client) Stop() {
panic("implement me")
}
func (c *Client) GetServiceStates() (internalpb2.ServiceStates, error) {
panic("implement me")
}
func (c *Client) GetTimeTickChannel() (string, error) {
panic("implement me")
}
func (c *Client) GetStatisticsChannel() (string, error) {
panic("implement me")
}
func (c *Client) AddQueryChannel(in *querypb.AddQueryChannelsRequest) (*commonpb.Status, error) {
return c.grpcClient.AddQueryChannel(context.TODO(), in)
}
func (c *Client) RemoveQueryChannel(in *querypb.RemoveQueryChannelsRequest) (*commonpb.Status, error) {
return c.grpcClient.RemoveQueryChannel(context.TODO(), in)
}
func (c *Client) WatchDmChannels(in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
return c.grpcClient.WatchDmChannels(context.TODO(), in)
}
func (c *Client) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status, error) {
return c.grpcClient.LoadSegments(context.TODO(), in)
}
func (c *Client) ReleaseSegments(in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error) {
return c.grpcClient.ReleaseSegments(context.TODO(), in)
}
func (c *Client) GetPartitionState(in *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) {
return c.grpcClient.GetPartitionState(context.TODO(), in)
}
......@@ -44,25 +44,31 @@ func (s *Server) Start() {
}
func (s *Server) AddQueryChannel(ctx context.Context, in *querypb.AddQueryChannelsRequest) (*commonpb.Status, error) {
return s.node.AddQueryChannel(ctx, in)
// ignore ctx
return s.node.AddQueryChannel(in)
}
func (s *Server) RemoveQueryChannel(ctx context.Context, in *querypb.RemoveQueryChannelsRequest) (*commonpb.Status, error) {
return s.node.RemoveQueryChannel(ctx, in)
// ignore ctx
return s.node.RemoveQueryChannel(in)
}
func (s *Server) WatchDmChannels(ctx context.Context, in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
return s.node.WatchDmChannels(ctx, in)
// ignore ctx
return s.node.WatchDmChannels(in)
}
func (s *Server) LoadSegments(ctx context.Context, in *querypb.LoadSegmentRequest) (*commonpb.Status, error) {
return s.node.LoadSegments(ctx, in)
// ignore ctx
return s.node.LoadSegments(in)
}
func (s *Server) ReleaseSegments(ctx context.Context, in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error) {
return s.node.ReleaseSegments(ctx, in)
// ignore ctx
return s.node.ReleaseSegments(in)
}
func (s *Server) GetPartitionState(ctx context.Context, in *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) {
return s.node.GetPartitionState(ctx, in)
// ignore ctx
return s.node.GetPartitionState(in)
}
package querynode
import (
"context"
"errors"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
......@@ -9,187 +8,168 @@ import (
queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
)
func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) {
select {
case <-ctx.Done():
errMsg := "context exceeded"
func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) {
if node.searchService == nil || node.searchService.searchMsgStream == nil {
errMsg := "null search service or null search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
default:
if node.searchService == nil || node.searchService.searchMsgStream == nil {
errMsg := "null search service or null search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
}
return status, errors.New(errMsg)
searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
return status, errors.New(errMsg)
resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
return status, errors.New(errMsg)
}
// add request channel
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := []string{in.RequestChannelID}
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
// add request channel
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := []string{in.RequestChannelID}
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
// add result channel
producerChannels := []string{in.ResultChannelID}
resultStream.CreatePulsarProducers(producerChannels)
// add result channel
producerChannels := []string{in.ResultChannelID}
resultStream.CreatePulsarProducers(producerChannels)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
return status, nil
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
return status, nil
}
func (node *QueryNode) RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) {
select {
case <-ctx.Done():
errMsg := "context exceeded"
func (node *QueryNode) RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) {
if node.searchService == nil || node.searchService.searchMsgStream == nil {
errMsg := "null search service or null search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
default:
if node.searchService == nil || node.searchService.searchMsgStream == nil {
errMsg := "null search service or null search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
}
return status, errors.New(errMsg)
searchStream, ok := node.searchService.searchMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
}
return status, errors.New(errMsg)
resultStream, ok := node.searchService.searchResultMsgStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for search result message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
// remove request channel
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := []string{in.RequestChannelID}
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
// TODO: searchStream.RemovePulsarConsumers(producerChannels)
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
return status, errors.New(errMsg)
}
// remove result channel
producerChannels := []string{in.ResultChannelID}
// TODO: resultStream.RemovePulsarProducer(producerChannels)
resultStream.CreatePulsarProducers(producerChannels)
// remove request channel
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := []string{in.RequestChannelID}
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
// TODO: searchStream.RemovePulsarConsumers(producerChannels)
searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
// remove result channel
producerChannels := []string{in.ResultChannelID}
// TODO: resultStream.RemovePulsarProducer(producerChannels)
resultStream.CreatePulsarProducers(producerChannels)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
return status, nil
}
func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
if node.dataSyncService == nil || node.dataSyncService.dmStream == nil {
errMsg := "null data sync service or null data manipulation stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, nil
return status, errors.New(errMsg)
}
}
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
select {
case <-ctx.Done():
errMsg := "context exceeded"
fgDMMsgStream, ok := node.dataSyncService.dmStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for dm message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
return status, errors.New(errMsg)
default:
if node.dataSyncService == nil || node.dataSyncService.dmStream == nil {
errMsg := "null data sync service or null data manipulation stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
}
}
return status, errors.New(errMsg)
}
// add request channel
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := in.ChannelIDs
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
fgDMMsgStream, ok := node.dataSyncService.dmStream.(*msgstream.PulsarMsgStream)
if !ok {
errMsg := "type assertion failed for dm message stream"
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
return status, nil
}
func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) {
// TODO: support db
for _, segmentID := range in.SegmentIDs {
hasBeenBuiltIndex := segmentID > 0 // TODO: ???
indexID := UniqueID(0) // TODO: ???
err := node.segManager.loadSegment(segmentID, hasBeenBuiltIndex, indexID, in.FieldIDs)
if err != nil {
// TODO: return or continue
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: errMsg,
Reason: err.Error(),
}
return status, errors.New(errMsg)
}
// add request channel
pulsarBufSize := Params.SearchPulsarBufSize
consumeChannels := in.ChannelIDs
consumeSubName := Params.MsgChannelSubName
unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
return status, err
}
return status, nil
}
}
func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) {
// TODO: implement
return nil, nil
}
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) {
func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) {
// TODO: implement
return nil, nil
}
func (node *QueryNode) GetPartitionState(ctx context.Context, in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error) {
func (node *QueryNode) GetPartitionState(in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error) {
// TODO: implement
return nil, nil
}
......@@ -29,12 +29,12 @@ type Node interface {
Start() error
Close()
AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error)
RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error)
WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error)
LoadSegments(ctx context.Context, in *queryPb.LoadSegmentRequest) (*commonpb.Status, error)
ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error)
GetPartitionState(ctx context.Context, in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error)
AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error)
RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error)
WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error)
LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error)
ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error)
GetPartitionState(in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error)
}
type QueryNode struct {
......@@ -53,6 +53,8 @@ type QueryNode struct {
loadIndexService *loadIndexService
statsService *statsService
segManager *segmentManager
//opentracing
tracer opentracing.Tracer
closer io.Closer
......
package querynode
import (
"context"
"errors"
"fmt"
"github.com/zilliztech/milvus-distributed/internal/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/querynode/client"
"github.com/zilliztech/milvus-distributed/internal/storage"
)
type segmentManager struct {
replica collectionReplica
// TODO: replace by client instead of grpc client
dataClient datapb.DataServiceClient
indexBuilderClient indexpb.IndexServiceClient
queryNodeClient *client.Client
kv kv.Base // minio kv
iCodec storage.InsertCodec
}
func (s *segmentManager) loadSegment(segmentID UniqueID, hasBeenBuiltIndex bool, indexID UniqueID, vecFieldIDs []int64) error {
// 1. load segment
req := &datapb.InsertBinlogPathRequest{
SegmentID: segmentID,
}
pathResponse, err := s.dataClient.GetInsertBinlogPaths(context.TODO(), req)
if err != nil {
return err
}
if len(pathResponse.FieldIDs) != len(pathResponse.Paths) {
return errors.New("illegal InsertBinlogPathsResponse")
}
for fieldID, i := range pathResponse.FieldIDs {
paths := pathResponse.Paths[i].Values
blobs := make([]*storage.Blob, 0)
for _, path := range paths {
binLog, err := s.kv.Load(path)
if err != nil {
// TODO: return or continue?
return err
}
blobs = append(blobs, &storage.Blob{
Key: "", // TODO: key???
Value: []byte(binLog),
})
}
_, _, insertData, err := s.iCodec.Deserialize(blobs)
if err != nil {
// TODO: return or continue
return err
}
if len(insertData.Data) != 1 {
return errors.New("we expect only one field in deserialized insert data")
}
for _, value := range insertData.Data {
switch fieldData := value.(type) {
case storage.BoolFieldData:
numRows := fieldData.NumRows
data := fieldData.Data
fmt.Println(numRows, data, fieldID)
// TODO: s.replica.addSegment()
case storage.Int8FieldData:
// TODO: s.replica.addSegment()
case storage.Int16FieldData:
// TODO: s.replica.addSegment()
case storage.Int32FieldData:
// TODO: s.replica.addSegment()
case storage.Int64FieldData:
// TODO: s.replica.addSegment()
case storage.FloatFieldData:
// TODO: s.replica.addSegment()
case storage.DoubleFieldData:
// TODO: s.replica.addSegment()
default:
// TODO: what if the index has not been built ?
// does the info from hasBeenBuiltIndex is synced with the dataService?
return errors.New("unsupported field data type")
}
}
}
// 2. load index
// does the info from hasBeenBuiltIndex is synced with the dataService?
if !hasBeenBuiltIndex {
req := &indexpb.IndexFilePathRequest{
IndexID: indexID,
}
pathResponse, err := s.indexBuilderClient.GetIndexFilePaths(context.TODO(), req)
if err != nil || pathResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return err
}
targetSegment, err := s.replica.getSegmentByID(segmentID)
if err != nil {
return err
}
for _, vecFieldID := range vecFieldIDs {
targetIndexParam, ok := targetSegment.indexParam[vecFieldID]
if !ok {
return errors.New(fmt.Sprint("cannot found index params in segment ", segmentID, " with field = ", vecFieldID))
}
err := s.queryNodeClient.LoadIndex(pathResponse.IndexFilePaths, segmentID, vecFieldID, "", targetIndexParam)
if err != nil {
return err
}
}
}
return nil
}
func (s *segmentManager) releaseSegment(in *queryPb.ReleaseSegmentRequest) error {
// TODO: implement
// TODO: release specific field, we need segCore supply relevant interface
return nil
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册