提交 5a57b62f 编写于 作者: B bigsheeper 提交者: yefu.chen

Refactor collection and partition, wrap cgo interfaces

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 b6cc1df9
......@@ -11,7 +11,4 @@ Collection::set_index() {}
void
Collection::parse() {}
void
Collection::AddNewPartition() {}
}
......@@ -15,15 +15,19 @@ public:
// TODO: config to schema
void parse();
void AddNewPartition();
public:
SchemaPtr& get_schema() {
return schema_;
}
private:
// TODO: add Index ptr
// IndexPtr index_ = nullptr;
std::string collection_name_;
std::string schema_json_;
milvus::dog_segment::SchemaPtr schema_;
std::vector<PartitionPtr> partitions_;
SchemaPtr schema_;
};
using CollectionPtr = std::unique_ptr<Collection>;
}
......@@ -2,19 +2,7 @@
namespace milvus::dog_segment {
Partition::Partition(std::string& partition_name):
partition_name_(partition_name) {}
void
Partition::AddNewSegment(uint64_t segment_id) {
auto segment = CreateSegment();
segment->set_segment_id(segment_id);
segments_.emplace_back(segment);
}
Partition*
CreatePartition() {
}
Partition::Partition(std::string& partition_name, SchemaPtr& schema):
partition_name_(partition_name), schema_(schema) {}
}
......@@ -6,21 +6,18 @@ namespace milvus::dog_segment {
class Partition {
public:
explicit Partition(std::string& partition_name);
explicit Partition(std::string& partition_name, SchemaPtr& schema);
const std::vector<SegmentBasePtr> &segments() const {
return segments_;
public:
SchemaPtr& get_schema() {
return schema_;
}
void AddNewSegment(uint64_t segment_id);
private:
std::string partition_name_;
std::vector<SegmentBasePtr> segments_;
SchemaPtr schema_;
};
using PartitionPtr = std::shared_ptr<Partition>;
Partition* CreatePartiton();
using PartitionPtr = std::unique_ptr<Partition>;
}
\ No newline at end of file
......@@ -102,12 +102,9 @@ class SegmentBase {
uint64_t segment_id_;
};
using SegmentBasePtr = std::shared_ptr<SegmentBase>;
using SegmentBasePtr = std::unique_ptr<SegmentBase>;
std::unique_ptr<SegmentBase> CreateSegment(SchemaPtr ptr);
// TODO: Delete this after schema parse function done
SegmentBase* CreateSegment();
SegmentBasePtr CreateSegment(SchemaPtr& ptr);
} // namespace engine
} // namespace milvus
......@@ -128,11 +128,8 @@ class SegmentNaive : public SegmentBase {
}
public:
friend std::unique_ptr<SegmentBase>
CreateSegment(SchemaPtr schema);
friend SegmentBase*
CreateSegment();
friend SegmentBasePtr
CreateSegment(SchemaPtr& schema);
private:
SchemaPtr schema_;
......@@ -147,24 +144,19 @@ class SegmentNaive : public SegmentBase {
tbb::concurrent_unordered_multimap<int, Timestamp> delete_logs_;
};
std::unique_ptr<SegmentBase>
CreateSegment(SchemaPtr schema) {
SegmentBasePtr
CreateSegment(SchemaPtr& schema) {
// TODO: remove hard code
auto schema_tmp = std::make_shared<Schema>();
schema_tmp->AddField("fakevec", DataType::VECTOR_FLOAT, 16);
schema_tmp->AddField("age", DataType::INT32);
auto segment = std::make_unique<SegmentNaive>();
segment->schema_ = schema;
segment->entity_vecs_.resize(schema->size());
segment->schema_ = schema_tmp;
segment->entity_vecs_.resize(schema_tmp->size());
return segment;
}
SegmentBase* CreateSegment() {
auto segment = new SegmentNaive();
auto schema = std::make_shared<Schema>();
schema->AddField("fakevec", DataType::VECTOR_FLOAT, 16);
schema->AddField("age", DataType::INT32);
segment->schema_ = schema;
segment->entity_vecs_.resize(schema->size());
return segment;
}
Status
SegmentNaive::Insert(int64_t size, const uint64_t* primary_keys, const Timestamp* timestamps,
const DogDataChunk& row_values) {
......
#include "collection_c.h"
#include "Collection.h"
CCollection
NewCollection(const char* collection_name) {
NewCollection(const char* collection_name, const char* schema_conf) {
auto name = std::string(collection_name);
auto conf = std::string(schema_conf);
auto collection = std::make_unique<milvus::dog_segment::Collection>(name, conf);
return (void*)collection.release();
}
void
DeleteCollection(CCollection collection) {
auto col = (milvus::dog_segment::Collection*)collection;
delete col;
}
......@@ -4,7 +4,7 @@ extern "C" {
typedef void* CCollection;
CCollection NewCollection(const char* collection_name);
CCollection NewCollection(const char* collection_name, const char* schema_conf);
void DeleteCollection(CCollection collection);
......
......@@ -4,9 +4,19 @@
CPartition
NewPartition(CCollection collection, const char* partition_name) {
auto c = (milvus::dog_segment::Collection*)collection;
auto name = std::string(partition_name);
auto partition = new milvus::dog_segment::Partition(name);
auto co = (milvus::dog_segment::Collection*)collection;
co->AddNewPartition();
auto schema = c->get_schema();
auto partition = std::make_unique<milvus::dog_segment::Partition>(name, schema);
return (void*)partition.release();
}
void DeletePartition(CPartition partition) {
auto p = (milvus::dog_segment::Partition*)partition;
delete p;
}
#include "SegmentBase.h"
#include "segment_c.h"
#include "Partition.h"
CSegmentBase
SegmentBaseInit(unsigned long segment_id) {
std::cout << "Hello milvus" << std::endl;
auto seg = milvus::dog_segment::CreateSegment();
seg->set_segment_id(segment_id);
return (void*)seg;
NewSegment(CPartition partition, unsigned long segment_id) {
auto p = (milvus::dog_segment::Partition*)partition;
auto segment = milvus::dog_segment::CreateSegment(p->get_schema());
segment->set_segment_id(segment_id);
return (void*)segment.release();
}
//int32_t Insert(CSegmentBase c_segment, signed long int size, const unsigned long* primary_keys, const unsigned long int* timestamps, DogDataChunk values) {
// auto segment = (milvus::dog_segment::SegmentBase*)c_segment;
// milvus::dog_segment::DogDataChunk dataChunk{};
//
// dataChunk.raw_data = values.raw_data;
// dataChunk.sizeof_per_row = values.sizeof_per_row;
// dataChunk.count = values.count;
//
// auto res = segment->Insert(size, primary_keys, timestamps, dataChunk);
// return res.code();
//}
void DeleteSegment(CSegmentBase segment) {
auto s = (milvus::dog_segment::SegmentBase*)segment;
delete s;
}
int Insert(CSegmentBase c_segment,
signed long int size,
......
......@@ -2,17 +2,13 @@
extern "C" {
#endif
//struct DogDataChunk {
// void* raw_data; // schema
// int sizeof_per_row; // alignment
// signed long int count;
//};
#include "partition_c.h"
typedef void* CSegmentBase;
CSegmentBase SegmentBaseInit(unsigned long segment_id);
CSegmentBase NewSegment(CPartition partition, unsigned long segment_id);
//int32_t Insert(CSegmentBase c_segment, signed long int size, const unsigned long* primary_keys, const unsigned long int* timestamps, DogDataChunk values);
void DeleteSegment(CSegmentBase segment);
int Insert(CSegmentBase c_segment,
signed long int size,
......
......@@ -2,13 +2,14 @@
#include <string>
#include <random>
#include <gtest/gtest.h>
#include <dog_segment/SegmentBase.h>
#include "dog_segment/segment_c.h"
#include "dog_segment/collection_c.h"
TEST(SegmentTest, InsertTest) {
auto segment_id = 0;
auto s = SegmentBaseInit(segment_id);
auto fake_schema = std::make_shared<milvus::dog_segment::Schema>();
auto s = milvus::dog_segment::CreateSegment(fake_schema).release();
std::vector<char> raw_data;
std::vector<uint64_t> timestamps;
......
......@@ -11,28 +11,28 @@ type Collection struct {
Partitions []*Partition
}
// TODO: Schema
type CollectionSchema string
func NewCollection(collectionName string, schema CollectionSchema) (*Collection, error) {
cName := C.CString(collectionName)
cSchema := C.CString(schema)
collection, status := C.NewCollection(cName, cSchema)
func (c *Collection) NewPartition(partitionName string) (*Partition, error) {
cName := C.CString(partitionName)
partitionPtr, status := C.NewPartition(c.CollectionPtr, cName)
if status != 0 {
return nil, errors.New("create collection failed")
return nil, errors.New("create partition failed")
}
return &Collection{CollectionPtr: collection, CollectionName: collectionName}, nil
var newPartition = &Partition{PartitionPtr: partitionPtr, PartitionName: partitionName}
c.Partitions = append(c.Partitions, newPartition)
return newPartition, nil
}
func DeleteCollection(collection *Collection) error {
status := C.DeleteCollection(collection.CollectionPtr)
func (c *Collection) DeletePartition(partitionName string) error {
cName := C.CString(partitionName)
status := C.DeletePartition(c.CollectionPtr, cName)
if status != 0 {
return errors.New("delete collection failed")
return errors.New("create partition failed")
}
// TODO: remove from c.Partitions
return nil
}
......
......@@ -9,24 +9,25 @@ type Partition struct {
Segments []*Segment
}
func (c *Collection) NewPartition(partitionName string) (*Partition, error) {
cName := C.CString(partitionName)
partitionPtr, status := C.NewPartition(c.CollectionPtr, cName)
func (p *Partition) NewSegment(segmentId uint64) (*Segment, error) {
segmentPtr, status := C.NewSegment(p.PartitionPtr, segmentId)
if status != 0 {
return nil, errors.New("create partition failed")
return nil, errors.New("create segment failed")
}
return &Partition{PartitionPtr: partitionPtr, PartitionName: partitionName}, nil
var newSegment = &Segment{SegmentPtr: segmentPtr, SegmentId: segmentId}
p.Segments = append(p.Segments, newSegment)
return newSegment, nil
}
func (c *Collection) DeletePartition(partitionName string) error {
cName := C.CString(partitionName)
status := C.DeletePartition(c.CollectionPtr, cName)
func (p *Partition) DeleteSegment() error {
status := C.DeleteSegment(p.PartitionPtr)
if status != 0 {
return errors.New("create partition failed")
return errors.New("delete segment failed")
}
// TODO: remove from p.Segments
return nil
}
......@@ -48,7 +48,38 @@ func NewQueryNode(timeSync uint64) *QueryNode {
}
}
func (node *QueryNode)doQueryNode(wg *sync.WaitGroup) {
// TODO: Schema
type CollectionSchema string
func (node *QueryNode) NewCollection(collectionName string, schema CollectionSchema) (*Collection, error) {
cName := C.CString(collectionName)
cSchema := C.CString(schema)
collection, status := C.NewCollection(cName, cSchema)
if status != 0 {
return nil, errors.New("create collection failed")
}
var newCollection = &Collection{CollectionPtr: collection, CollectionName: collectionName}
node.Collections = append(node.Collections, newCollection)
return newCollection, nil
}
func (node *QueryNode) DeleteCollection(collection *Collection) error {
status := C.DeleteCollection(collection.CollectionPtr)
if status != 0 {
return errors.New("delete collection failed")
}
// TODO: remove from node.Collections
return nil
}
////////////////////////////////////////////////////////////////////////////////////////////////////
func (node *QueryNode) doQueryNode (wg *sync.WaitGroup) {
wg.Add(3)
go node.Insert(node.messageClient.InsertMsg, wg)
go node.Delete(node.messageClient.DeleteMsg, wg)
......@@ -67,12 +98,6 @@ func (node *QueryNode) StartMessageClient() {
go node.messageClient.ReceiveMessage()
}
func (node *QueryNode) AddNewCollection(collectionName string, schema CollectionSchema) error {
var collection, err = NewCollection(collectionName, schema)
node.Collections = append(node.Collections, collection)
return err
}
func (node *QueryNode) GetSegmentByEntityId(entityId int64) *Segment {
// TODO: get id2segment info from pulsar
return nil
......@@ -115,13 +140,11 @@ func (node *QueryNode) GetTimeSync() uint64 {
func (node *QueryNode) InitQueryNodeCollection() {
// TODO: remove hard code, add collection creation request
var collection, _ = NewCollection("collection1", "fakeSchema")
node.Collections = append(node.Collections, collection)
var partition, _ = collection.NewPartition("partition1")
collection.Partitions = append(collection.Partitions, partition)
// TODO: error handle
var newCollection, _ = node.NewCollection("collection1", "fakeSchema")
var newPartition, _ = newCollection.NewPartition("partition1")
// TODO: add segment id
var segment, _ = partition.NewSegment(0)
partition.Segments = append(partition.Segments, segment)
var _, _ = newPartition.NewSegment(0)
}
func (node *QueryNode) SegmentsManagement() {
......
......@@ -11,7 +11,6 @@ package reader
*/
import "C"
import (
"errors"
"suvlim/pulsar/schema"
)
......@@ -23,27 +22,6 @@ type Segment struct {
SegmentCloseTime uint64
}
func (p *Partition) NewSegment(segmentId uint64) (*Segment, error) {
// TODO: add segment id
segmentPtr, status := C.SegmentBaseInit(p.PartitionPtr)
if status != 0 {
return nil, errors.New("create segment failed")
}
return &Segment{SegmentPtr: segmentPtr}, nil
}
func (p *Partition) DeleteSegment() error {
status := C.DeleteSegment(p.PartitionPtr)
if status != 0 {
return errors.New("delete segment failed")
}
return nil
}
func (s *Segment) GetRowCount() int64 {
// TODO: C type to go type
return C.GetRowCount(s)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册