提交 74c0e354 编写于 作者: F FluorineDog 提交者: yefu.chen

Remove tbb

Signed-off-by: NFluorineDog <guilin.gou@zilliz.com>
上级 fe9040f3
......@@ -4,6 +4,8 @@ import (
"github.com/czs007/suvlim/storage/pkg/types"
yaml "gopkg.in/yaml.v2"
"io/ioutil"
"path"
"runtime"
)
// yaml.MapSlice
......@@ -58,10 +60,14 @@ func init() {
load_config()
}
func getCurrentFileDir() string {
_, fpath, _, _ := runtime.Caller(0)
return path.Dir(fpath)
}
func load_config() {
//var config ServerConfig
filename := "../conf/config.yaml"
source, err := ioutil.ReadFile(filename)
filePath := path.Join(getCurrentFileDir(), "config.yaml")
source, err := ioutil.ReadFile(filePath)
if err != nil {
panic(err)
}
......
......@@ -15,8 +15,8 @@ master:
etcd:
address: localhost
port: 0
rootpath: a
port: 2379
rootpath: suvlim
segthreshold: 10000
timesync:
......
......@@ -8,7 +8,6 @@
#include <knowhere/index/vector_index/adapter/VectorAdapter.h>
#include <knowhere/index/vector_index/VecIndexFactory.h>
#include <faiss/utils/distances.h>
#include <tbb/iterators.h>
namespace milvus::dog_segment {
......@@ -335,21 +334,31 @@ void
merge_into(int64_t queries, int64_t topk, float *distances, int64_t *uids, const float *new_distances, const int64_t *new_uids) {
for(int64_t qn = 0; qn < queries; ++qn) {
auto base = qn * topk;
auto dst_dis = distances + base;
auto dst_uids = uids + base;
auto src_dis = new_distances + base;
auto src_uids = new_uids + base;
std::vector<float> buf_dis(2*topk);
std::vector<int64_t> buf_uids(2*topk);
auto zip_src = tbb::make_zip_iterator(src_dis, src_uids);
auto zip_dst = tbb::make_zip_iterator(dst_dis, dst_uids);
auto zip_buf = tbb::make_zip_iterator(buf_dis.data(), buf_uids.data());
auto fuck = zip_src + 1;
std::merge(zip_dst, zip_dst + topk, zip_src, zip_src + topk, zip_buf);
std::copy_n(zip_buf, topk, zip_dst);
auto src2_dis = distances + base;
auto src2_uids = uids + base;
auto src1_dis = new_distances + base;
auto src1_uids = new_uids + base;
std::vector<float> buf_dis(topk);
std::vector<int64_t> buf_uids(topk);
auto it1 = 0;
auto it2 = 0;
for(auto buf = 0; buf < topk; ++buf){
if(src1_dis[it1] <= src2_dis[it2]) {
buf_dis[buf] = src1_dis[it1];
buf_uids[buf] = src1_uids[it1];
++it1;
} else {
buf_dis[buf] = src2_dis[it2];
buf_uids[buf] = src2_uids[it2];
++it2;
}
}
std::copy_n(buf_dis.data(), topk, src2_dis);
std::copy_n(buf_uids.data(), topk, src2_uids);
}
}
......
......@@ -18,7 +18,6 @@
#include <knowhere/index/vector_index/VecIndexFactory.h>
#include <algorithm>
#include <tbb/iterators.h>
using std::cin;
using std::cout;
......@@ -59,22 +58,32 @@ void
merge_into(int64_t queries, int64_t topk, float *distances, int64_t *uids, const float *new_distances, const int64_t *new_uids) {
for(int64_t qn = 0; qn < queries; ++qn) {
auto base = qn * topk;
auto dst_dis = distances + base;
auto dst_uids = uids + base;
auto src_dis = new_distances + base;
auto src_uids = new_uids + base;
std::vector<float> buf_dis(2*topk);
std::vector<int64_t> buf_uids(2*topk);
auto zip_src = tbb::make_zip_iterator(src_dis, src_uids);
auto zip_dst = tbb::make_zip_iterator(dst_dis, dst_uids);
auto zip_buf = tbb::make_zip_iterator(buf_dis.data(), buf_uids.data());
auto fuck = zip_src + 1;
std::merge(zip_dst, zip_dst + topk, zip_src, zip_src + topk, zip_buf);
std::copy_n(zip_buf, topk, zip_dst);
}
auto src2_dis = distances + base;
auto src2_uids = uids + base;
auto src1_dis = new_distances + base;
auto src1_uids = new_uids + base;
std::vector<float> buf_dis(topk);
std::vector<int64_t> buf_uids(topk);
auto it1 = 0;
auto it2 = 0;
for(auto buf = 0; buf < topk; ++buf){
if(src1_dis[it1] <= src2_dis[it2]) {
buf_dis[buf] = src1_dis[it1];
buf_uids[buf] = src1_uids[it1];
++it1;
} else {
buf_dis[buf] = src2_dis[it2];
buf_uids[buf] = src2_uids[it2];
++it2;
}
}
std::copy_n(buf_dis.data(), topk, src2_dis);
std::copy_n(buf_uids.data(), topk, src2_uids);
}
}
......
......@@ -21,20 +21,43 @@ var (
errTxnFailed = errors.New("failed to commit transaction")
)
type etcdKVBase struct {
type EtcdKVBase struct {
client *clientv3.Client
rootPath string
}
// NewEtcdKVBase creates a new etcd kv.
func NewEtcdKVBase(client *clientv3.Client, rootPath string) *etcdKVBase {
return &etcdKVBase{
func NewEtcdKVBase(client *clientv3.Client, rootPath string) *EtcdKVBase {
return &EtcdKVBase{
client: client,
rootPath: rootPath,
}
}
func (kv *etcdKVBase) Load(key string) (string, error) {
func (kv *EtcdKVBase) LoadWithPrefix(key string) ( []string, []string) {
key = path.Join(kv.rootPath, key)
println("in loadWithPrefix,", key)
resp, err := etcdutil.EtcdKVGet(kv.client, key,clientv3.WithPrefix())
if err != nil {
return [] string {}, [] string {}
}
var keys []string
var values []string
for _,kvs := range resp.Kvs{
//println(len(kvs.))
if len(kvs.Key) <= 0{
println("KKK")
continue
}
keys = append(keys, string(kvs.Key))
values = append(values, string(kvs.Value))
}
//println(keys)
//println(values)
return keys, values
}
func (kv *EtcdKVBase) Load(key string) (string, error) {
key = path.Join(kv.rootPath, key)
resp, err := etcdutil.EtcdKVGet(kv.client, key)
......@@ -49,7 +72,7 @@ func (kv *etcdKVBase) Load(key string) (string, error) {
return string(resp.Kvs[0].Value), nil
}
func (kv *etcdKVBase) Save(key, value string) error {
func (kv *EtcdKVBase) Save(key, value string) error {
key = path.Join(kv.rootPath, key)
txn := NewSlowLogTxn(kv.client)
......@@ -64,7 +87,7 @@ func (kv *etcdKVBase) Save(key, value string) error {
return nil
}
func (kv *etcdKVBase) Remove(key string) error {
func (kv *EtcdKVBase) Remove(key string) error {
key = path.Join(kv.rootPath, key)
txn := NewSlowLogTxn(kv.client)
......@@ -79,12 +102,18 @@ func (kv *etcdKVBase) Remove(key string) error {
return nil
}
func (kv *etcdKVBase) Watch(key string) clientv3.WatchChan {
func (kv *EtcdKVBase) Watch(key string) clientv3.WatchChan {
key = path.Join(kv.rootPath, key)
rch := kv.client.Watch(context.Background(), key)
return rch
}
func (kv *EtcdKVBase) WatchWithPrefix(key string) clientv3.WatchChan {
key = path.Join(kv.rootPath, key)
rch := kv.client.Watch(context.Background(), key, clientv3.WithPrefix())
return rch
}
// SlowLogTxn wraps etcd transaction and log slow one.
type SlowLogTxn struct {
clientv3.Txn
......
......@@ -7,4 +7,6 @@ type Base interface {
Save(key, value string) error
Remove(key string) error
Watch(key string) clientv3.WatchChan
WatchWithPrefix(key string) clientv3.WatchChan
LoadWithPrefix(key string) ( []string, []string)
}
package main
import (
reader "github.com/czs007/suvlim/reader/read_node"
"sync"
)
func main() {
pulsarURL := "pulsar://localhost:6650"
numOfQueryNode := 2
go reader.StartQueryNode(pulsarURL, numOfQueryNode, 0)
reader.StartQueryNode(pulsarURL, numOfQueryNode, 1)
}
func main2() {
wg := sync.WaitGroup{}
//ctx, cancel := context.WithCancel(context.Background())
//defer cancel()
wg.Add(1)
reader.StartQueryNode2()
wg.Wait()
}
\ No newline at end of file
......@@ -2,9 +2,9 @@ package reader
/*
#cgo CFLAGS: -I../core/include
#cgo CFLAGS: -I${SRCDIR}/../../core/include
#cgo LDFLAGS: -L../core/lib -lmilvus_dog_segment -Wl,-rpath=../core/lib
#cgo LDFLAGS: -L${SRCDIR}/../../core/lib -lmilvus_dog_segment -Wl,-rpath=${SRCDIR}/../../core/lib
#include "collection_c.h"
#include "partition_c.h"
......@@ -16,6 +16,7 @@ import "C"
type Collection struct {
CollectionPtr C.CCollection
CollectionName string
CollectionID uint64
Partitions []*Partition
}
......
package reader
import (
"context"
"fmt"
"github.com/czs007/suvlim/pkg/master/mock"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/czs007/suvlim/conf"
"github.com/czs007/suvlim/pkg/master/kv"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
)
const (
CollectonPrefix = "/collection/"
SegmentPrefix = "/segment/"
)
func GetCollectionObjId(key string) string {
prefix := conf.Config.Etcd.Rootpath + CollectonPrefix
return strings.TrimPrefix(key, prefix)
}
func GetSegmentObjId(key string) string {
prefix := conf.Config.Etcd.Rootpath + SegmentPrefix
return strings.TrimPrefix(key, prefix)
}
func isCollectionObj(key string) bool {
prefix := conf.Config.Etcd.Rootpath + CollectonPrefix
prefix = strings.TrimSpace(prefix)
println("prefix is :$", prefix)
index := strings.Index(key, prefix)
println("index is :", index)
return index == 0
}
func isSegmentObj(key string) bool {
prefix := conf.Config.Etcd.Rootpath + SegmentPrefix
prefix = strings.TrimSpace(prefix)
index := strings.Index(key, prefix)
return index == 0
}
func printCollectionStruct(obj *mock.Collection){
v := reflect.ValueOf(obj)
v = reflect.Indirect(v)
typeOfS := v.Type()
for i := 0; i< v.NumField(); i++ {
if typeOfS.Field(i).Name == "GrpcMarshalString"{
continue
}
fmt.Printf("Field: %s\tValue: %v\n", typeOfS.Field(i).Name, v.Field(i).Interface())
}
}
func printSegmentStruct(obj *mock.Segment){
v := reflect.ValueOf(obj)
v = reflect.Indirect(v)
typeOfS := v.Type()
for i := 0; i< v.NumField(); i++ {
fmt.Printf("Field: %s\tValue: %v\n", typeOfS.Field(i).Name, v.Field(i).Interface())
}
}
func (node *QueryNode) processCollectionCreate(id string, value string) {
println(fmt.Sprintf("Create Collection:$%s$", id))
collection, err := mock.JSON2Collection(value)
if err != nil {
println("error of json 2 collection")
println(err.Error())
}
printCollectionStruct(collection)
}
func (node *QueryNode) processSegmentCreate(id string, value string) {
println("Create Segment: ", id)
segment, err := mock.JSON2Segment(value)
if err != nil {
println("error of json 2 segment")
println(err.Error())
}
printSegmentStruct(segment)
}
func (node *QueryNode) processCreate(key string, msg string) {
println("process create", key, ":", msg)
if isCollectionObj(key){
objID := GetCollectionObjId(key)
node.processCollectionCreate(objID, msg)
}else if isSegmentObj(key){
objID := GetSegmentObjId(key)
node.processSegmentCreate(objID, msg)
}else {
println("can not process create msg:", key)
}
}
func (node *QueryNode) processSegmentModify(id string, value string) {
println("Modify Segment: ", id)
segment, err := mock.JSON2Segment(value)
if err != nil {
println("error of json 2 segment")
println(err.Error())
}
printSegmentStruct(segment)
}
func (node *QueryNode) processCollectionModify(id string, value string) {
println("Modify Collection: ", id)
collection, err := mock.JSON2Collection(value)
if err != nil {
println("error of json 2 collection")
println(err.Error())
}
printCollectionStruct(collection)
}
func (node *QueryNode) processModify(key string, msg string){
println("process modify")
if isCollectionObj(key){
objID := GetCollectionObjId(key)
node.processCollectionModify(objID, msg)
}else if isSegmentObj(key){
objID := GetSegmentObjId(key)
node.processSegmentModify(objID, msg)
}else {
println("can not process modify msg:", key)
}
}
func (node *QueryNode) processSegmentDelete(id string){
println("Delete segment: ", id)
}
func (node *QueryNode) processCollectionDelete(id string){
println("Delete collection: ", id)
}
func (node *QueryNode) processDelete(key string){
println("process delete")
if isCollectionObj(key){
objID := GetCollectionObjId(key)
node.processCollectionDelete(objID)
}else if isSegmentObj(key){
objID := GetSegmentObjId(key)
node.processSegmentDelete(objID)
}else {
println("can not process delete msg:", key)
}
}
func (node *QueryNode) processResp(resp clientv3.WatchResponse) error {
err := resp.Err()
if err != nil {
return err
}
for _, ev := range resp.Events {
if ev.IsCreate() {
key := string(ev.Kv.Key)
msg := string(ev.Kv.Value)
node.processCreate(key, msg)
} else if ev.IsModify() {
key := string(ev.Kv.Key)
msg := string(ev.Kv.Value)
node.processModify(key, msg)
} else if ev.Type == mvccpb.DELETE {
key := string(ev.Kv.Key)
node.processDelete(key)
} else {
println("Unrecognized etcd msg!")
}
}
return nil
}
func (node *QueryNode) loadCollections() error {
keys, values := node.kvBase.LoadWithPrefix(CollectonPrefix)
for i:= range keys{
objID := GetCollectionObjId(keys[i])
node.processCollectionCreate(objID, values[i])
}
return nil
}
func (node *QueryNode) loadSegments() error {
keys, values := node.kvBase.LoadWithPrefix(SegmentPrefix)
for i:= range keys{
objID := GetSegmentObjId(keys[i])
node.processSegmentCreate(objID, values[i])
}
return nil
}
func (node *QueryNode) InitFromMeta() error {
//pass
etcdAddr := "http://"
etcdAddr += conf.Config.Etcd.Address
etcdPort := conf.Config.Etcd.Port
etcdAddr = etcdAddr + ":" + strconv.FormatInt(int64(etcdPort), 10)
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{etcdAddr},
DialTimeout: 5 * time.Second,
})
defer cli.Close()
node.kvBase = kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
node.loadCollections()
node.loadSegments()
return nil
}
func (node *QueryNode) RunMetaService(ctx context.Context, wg *sync.WaitGroup) {
node.InitFromMeta()
metaChan := node.kvBase.WatchWithPrefix("")
for {
select {
case <-ctx.Done():
wg.Done()
println("DONE!!!!!!")
return
case resp := <-metaChan:
node.processResp(resp)
}
}
}
\ No newline at end of file
......@@ -2,9 +2,9 @@ package reader
/*
#cgo CFLAGS: -I../core/include
#cgo CFLAGS: -I${SRCDIR}/../../core/include
#cgo LDFLAGS: -L../core/lib -lmilvus_dog_segment -Wl,-rpath=../core/lib
#cgo LDFLAGS: -L${SRCDIR}/../../core/lib -lmilvus_dog_segment -Wl,-rpath=${SRCDIR}/../../core/lib
#include "collection_c.h"
#include "partition_c.h"
......
......@@ -2,9 +2,9 @@ package reader
/*
#cgo CFLAGS: -I../core/include
#cgo CFLAGS: -I${SRCDIR}/../../core/include
#cgo LDFLAGS: -L../core/lib -lmilvus_dog_segment -Wl,-rpath=../core/lib
#cgo LDFLAGS: -L${SRCDIR}/../../core/lib -lmilvus_dog_segment -Wl,-rpath=${SRCDIR}/../../core/lib
#include "collection_c.h"
#include "partition_c.h"
......@@ -15,11 +15,14 @@ import "C"
import (
"fmt"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/czs007/suvlim/reader/message_client"
"sort"
"sync"
"sync/atomic"
msgPb "github.com/czs007/suvlim/pkg/master/grpc/message"
"github.com/czs007/suvlim/pkg/master/kv"
"github.com/czs007/suvlim/reader/message_client"
//"github.com/stretchr/testify/assert"
)
type InsertData struct {
......@@ -54,16 +57,17 @@ type QueryNodeDataBuffer struct {
}
type QueryNode struct {
QueryNodeId uint64
Collections []*Collection
SegmentsMap map[int64]*Segment
messageClient *message_client.MessageClient
QueryNodeId uint64
Collections []*Collection
SegmentsMap map[int64]*Segment
messageClient *message_client.MessageClient
//mc *message_client.MessageClient
queryNodeTimeSync *QueryNodeTime
buffer QueryNodeDataBuffer
deletePreprocessData DeletePreprocessData
deleteData DeleteData
insertData InsertData
kvBase *kv.EtcdKVBase
}
func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
......@@ -87,12 +91,12 @@ func NewQueryNode(queryNodeId uint64, timeSync uint64) *QueryNode {
}
return &QueryNode{
QueryNodeId: queryNodeId,
Collections: nil,
SegmentsMap: segmentsMap,
messageClient: &mc,
queryNodeTimeSync: queryNodeTimeSync,
buffer: buffer,
QueryNodeId: queryNodeId,
Collections: nil,
SegmentsMap: segmentsMap,
messageClient: &mc,
queryNodeTimeSync: queryNodeTimeSync,
buffer: buffer,
}
}
......@@ -119,12 +123,12 @@ func CreateQueryNode(queryNodeId uint64, timeSync uint64, mc *message_client.Mes
}
return &QueryNode{
QueryNodeId: queryNodeId,
Collections: nil,
SegmentsMap: segmentsMap,
messageClient: mc,
queryNodeTimeSync: queryNodeTimeSync,
buffer: buffer,
QueryNodeId: queryNodeId,
Collections: nil,
SegmentsMap: segmentsMap,
messageClient: mc,
queryNodeTimeSync: queryNodeTimeSync,
buffer: buffer,
}
}
......@@ -173,7 +177,7 @@ func (node *QueryNode) DeleteCollection(collection *Collection) {
////////////////////////////////////////////////////////////////////////////////////////////////////
func (node *QueryNode) PrepareBatchMsg() []int {
var msgLen= node.messageClient.PrepareBatchMsg()
var msgLen = node.messageClient.PrepareBatchMsg()
return msgLen
}
......@@ -189,7 +193,7 @@ func (node *QueryNode) InitQueryNodeCollection() {
////////////////////////////////////////////////////////////////////////////////////////////////////
func (node *QueryNode) RunInsertDelete(wg * sync.WaitGroup) {
func (node *QueryNode) RunInsertDelete(wg *sync.WaitGroup) {
for {
// TODO: get timeRange from message client
var msgLen = node.PrepareBatchMsg()
......@@ -271,7 +275,7 @@ func (node *QueryNode) MessagesPreprocess(insertDeleteMessages []*msgPb.InsertOr
}
// 2. Remove invalid messages from buffer.
tmpInsertOrDeleteBuffer := make([]*msgPb.InsertOrDeleteMsg ,0)
tmpInsertOrDeleteBuffer := make([]*msgPb.InsertOrDeleteMsg, 0)
for i, isValid := range node.buffer.validInsertDeleteBuffer {
if isValid {
tmpInsertOrDeleteBuffer = append(tmpInsertOrDeleteBuffer, node.buffer.InsertDeleteBuffer[i])
......
package reader
import (
"github.com/czs007/suvlim/reader/message_client"
"context"
"log"
"sync"
"github.com/czs007/suvlim/reader/message_client"
)
func StartQueryNode(pulsarURL string, numOfQueryNode int, messageClientID int) {
......@@ -32,3 +34,16 @@ func StartQueryNode(pulsarURL string, numOfQueryNode int, messageClientID int) {
wg.Wait()
qn.Close()
}
func StartQueryNode2() {
ctx := context.Background()
qn := CreateQueryNode(0, 0, nil)
//qn.InitQueryNodeCollection()
wg := sync.WaitGroup{}
wg.Add(1)
//go qn.RunInsertDelete(&wg)
//go qn.RunSearch(&wg)
go qn.RunMetaService(ctx, &wg)
wg.Wait()
qn.Close()
}
......@@ -2,9 +2,9 @@ package reader
/*
#cgo CFLAGS: -I../core/include
#cgo CFLAGS: -I${SRCDIR}/../../core/include
#cgo LDFLAGS: -L../core/lib -lmilvus_dog_segment -Wl,-rpath=../core/lib
#cgo LDFLAGS: -L${SRCDIR}/../../core/lib -lmilvus_dog_segment -Wl,-rpath=${SRCDIR}/../../core/lib
#include "collection_c.h"
#include "partition_c.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册