提交 e704667b 编写于 作者: R rain 提交者: yefu.chen

Finish all feature and update the README of master

Signed-off-by: Nrain <boyan.wang@zilliz.com>
上级 cdf82235
package main
import (
"github.com/czs007/suvlim/pkg/master"
)
import "github.com/czs007/suvlim/pkg/master"
// func main() {
// ctx, cancel := context.WithCancel(context.Background())
......@@ -22,6 +20,7 @@ func init() {
// go mock.FakePulsarProducer()
}
func main() {
master.Run()
//master.SegmentStatsController()
master.CollectionController()
//master.CollectionController()
}
# How to start a master
## Requirements
### Start a etcdv3
```
./etcd -listen-peer-urls=http://192.168.1.10:12380 -advertise-client-urls=http://192.168.1.10:12379 -listen-client-urls http://0.0.0.0:12379,http://0.0.0.0:14001 -initial-advertise-peer-urls=http://192.168.1.10:12380
```
## Start from code
```
go run cmd/master.go
```
## Start with docker
......@@ -7,4 +7,6 @@ const (
PULSAR_MONITER_INTERVAL = 1 * time.Second
PULSAR_TOPIC = "monitor-topic"
ETCD_ROOT_PATH = "by-dev"
SEGMENT_THRESHOLE = 10000
DEFAULT_GRPC_PORT = ":53100"
)
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: master.proto
//option go_package = "github.com/czs007/suvilm/pkg/master/grpc";
package masterpb
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type CreateCollectionRequest struct {
CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName,proto3" json:"collection_name,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *CreateCollectionRequest) Reset() { *m = CreateCollectionRequest{} }
func (m *CreateCollectionRequest) String() string { return proto.CompactTextString(m) }
func (*CreateCollectionRequest) ProtoMessage() {}
func (*CreateCollectionRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f9c348dec43a6705, []int{0}
}
func (m *CreateCollectionRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CreateCollectionRequest.Unmarshal(m, b)
}
func (m *CreateCollectionRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_CreateCollectionRequest.Marshal(b, m, deterministic)
}
func (m *CreateCollectionRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_CreateCollectionRequest.Merge(m, src)
}
func (m *CreateCollectionRequest) XXX_Size() int {
return xxx_messageInfo_CreateCollectionRequest.Size(m)
}
func (m *CreateCollectionRequest) XXX_DiscardUnknown() {
xxx_messageInfo_CreateCollectionRequest.DiscardUnknown(m)
}
var xxx_messageInfo_CreateCollectionRequest proto.InternalMessageInfo
func (m *CreateCollectionRequest) GetCollectionName() string {
if m != nil {
return m.CollectionName
}
return ""
}
type CreateCollectionResponse struct {
CollectionName string `protobuf:"bytes,1,opt,name=collection_name,json=collectionName,proto3" json:"collection_name,omitempty"`
CollectionId uint64 `protobuf:"varint,2,opt,name=collection_id,json=collectionId,proto3" json:"collection_id,omitempty"`
SegmentIds []uint64 `protobuf:"varint,3,rep,packed,name=segment_ids,json=segmentIds,proto3" json:"segment_ids,omitempty"`
PartitionTags []string `protobuf:"bytes,4,rep,name=partition_tags,json=partitionTags,proto3" json:"partition_tags,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *CreateCollectionResponse) Reset() { *m = CreateCollectionResponse{} }
func (m *CreateCollectionResponse) String() string { return proto.CompactTextString(m) }
func (*CreateCollectionResponse) ProtoMessage() {}
func (*CreateCollectionResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f9c348dec43a6705, []int{1}
}
func (m *CreateCollectionResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CreateCollectionResponse.Unmarshal(m, b)
}
func (m *CreateCollectionResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_CreateCollectionResponse.Marshal(b, m, deterministic)
}
func (m *CreateCollectionResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_CreateCollectionResponse.Merge(m, src)
}
func (m *CreateCollectionResponse) XXX_Size() int {
return xxx_messageInfo_CreateCollectionResponse.Size(m)
}
func (m *CreateCollectionResponse) XXX_DiscardUnknown() {
xxx_messageInfo_CreateCollectionResponse.DiscardUnknown(m)
}
var xxx_messageInfo_CreateCollectionResponse proto.InternalMessageInfo
func (m *CreateCollectionResponse) GetCollectionName() string {
if m != nil {
return m.CollectionName
}
return ""
}
func (m *CreateCollectionResponse) GetCollectionId() uint64 {
if m != nil {
return m.CollectionId
}
return 0
}
func (m *CreateCollectionResponse) GetSegmentIds() []uint64 {
if m != nil {
return m.SegmentIds
}
return nil
}
func (m *CreateCollectionResponse) GetPartitionTags() []string {
if m != nil {
return m.PartitionTags
}
return nil
}
func init() {
proto.RegisterType((*CreateCollectionRequest)(nil), "masterpb.CreateCollectionRequest")
proto.RegisterType((*CreateCollectionResponse)(nil), "masterpb.CreateCollectionResponse")
}
func init() { proto.RegisterFile("master.proto", fileDescriptor_f9c348dec43a6705) }
var fileDescriptor_f9c348dec43a6705 = []byte{
// 225 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x90, 0xcf, 0x4a, 0x03, 0x31,
0x10, 0x87, 0x5d, 0x77, 0x29, 0x76, 0x6c, 0xab, 0xe4, 0x62, 0xf0, 0xe2, 0x1a, 0x11, 0x73, 0xda,
0x83, 0xbe, 0x81, 0x3d, 0xf5, 0xa0, 0x87, 0xe0, 0xcd, 0x43, 0x49, 0x9b, 0x61, 0x09, 0x34, 0x7f,
0xcc, 0x8c, 0xef, 0xe5, 0x23, 0x0a, 0x5b, 0x6d, 0x45, 0x51, 0x3c, 0xe6, 0xe3, 0xc7, 0x47, 0xe6,
0x83, 0x49, 0xb0, 0xc4, 0x58, 0xba, 0x5c, 0x12, 0x27, 0x71, 0xb4, 0x7d, 0xe5, 0x95, 0xba, 0x87,
0xb3, 0x79, 0x41, 0xcb, 0x38, 0x4f, 0x9b, 0x0d, 0xae, 0xd9, 0xa7, 0x68, 0xf0, 0xe5, 0x15, 0x89,
0xc5, 0x0d, 0x9c, 0xac, 0x77, 0x70, 0x19, 0x6d, 0x40, 0x59, 0xb5, 0x95, 0x1e, 0x9b, 0xd9, 0x1e,
0x3f, 0xda, 0x80, 0xea, 0xad, 0x02, 0xf9, 0x53, 0x42, 0x39, 0x45, 0xc2, 0x7f, 0x5b, 0xc4, 0x15,
0x4c, 0xbf, 0x0c, 0xbd, 0x93, 0x87, 0x6d, 0xa5, 0x1b, 0x33, 0xd9, 0xc3, 0x85, 0x13, 0x17, 0x70,
0x4c, 0xd8, 0x07, 0x8c, 0xbc, 0xf4, 0x8e, 0x64, 0xdd, 0xd6, 0xba, 0x31, 0xf0, 0x81, 0x16, 0x8e,
0xc4, 0x35, 0xcc, 0xb2, 0x2d, 0xec, 0x07, 0x09, 0xdb, 0x9e, 0x64, 0xd3, 0xd6, 0x7a, 0x6c, 0xa6,
0x3b, 0xfa, 0x64, 0x7b, 0xba, 0x45, 0x18, 0x3d, 0x0c, 0x09, 0xc4, 0x33, 0x9c, 0x7e, 0xff, 0xbb,
0xb8, 0xec, 0x3e, 0xfb, 0x74, 0xbf, 0xc4, 0x39, 0x57, 0x7f, 0x4d, 0xb6, 0xa7, 0xab, 0x83, 0xd5,
0x68, 0xc8, 0x7d, 0xf7, 0x1e, 0x00, 0x00, 0xff, 0xff, 0x0d, 0xb6, 0xf8, 0x4e, 0x7e, 0x01, 0x00,
0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// MasterClient is the client API for Master service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type MasterClient interface {
CreateCollection(ctx context.Context, in *CreateCollectionRequest, opts ...grpc.CallOption) (*CreateCollectionResponse, error)
}
type masterClient struct {
cc *grpc.ClientConn
}
func NewMasterClient(cc *grpc.ClientConn) MasterClient {
return &masterClient{cc}
}
func (c *masterClient) CreateCollection(ctx context.Context, in *CreateCollectionRequest, opts ...grpc.CallOption) (*CreateCollectionResponse, error) {
out := new(CreateCollectionResponse)
err := c.cc.Invoke(ctx, "/masterpb.Master/CreateCollection", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// MasterServer is the server API for Master service.
type MasterServer interface {
CreateCollection(context.Context, *CreateCollectionRequest) (*CreateCollectionResponse, error)
}
// UnimplementedMasterServer can be embedded to have forward compatible implementations.
type UnimplementedMasterServer struct {
}
func (*UnimplementedMasterServer) CreateCollection(ctx context.Context, req *CreateCollectionRequest) (*CreateCollectionResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method CreateCollection not implemented")
}
func RegisterMasterServer(s *grpc.Server, srv MasterServer) {
s.RegisterService(&_Master_serviceDesc, srv)
}
func _Master_CreateCollection_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CreateCollectionRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MasterServer).CreateCollection(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/masterpb.Master/CreateCollection",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MasterServer).CreateCollection(ctx, req.(*CreateCollectionRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Master_serviceDesc = grpc.ServiceDesc{
ServiceName: "masterpb.Master",
HandlerType: (*MasterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "CreateCollection",
Handler: _Master_CreateCollection_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "master.proto",
}
syntax = "proto3";
//option go_package = "github.com/czs007/suvilm/pkg/master/grpc";
package masterpb;
service Master {
rpc CreateCollection (CreateCollectionRequest) returns (CreateCollectionResponse) {}
}
message CreateCollectionRequest {
string collection_name = 1;
}
message CreateCollectionResponse {
string collection_name = 1;
uint64 collection_id = 2;
repeated uint64 segment_ids = 3;
repeated string partition_tags = 4;
}
\ No newline at end of file
......@@ -79,6 +79,12 @@ func (kv *etcdKVBase) Remove(key string) error {
return nil
}
func (kv *etcdKVBase) Watch(key string) clientv3.WatchChan {
key = path.Join(kv.rootPath, key)
rch := kv.client.Watch(context.Background(), key)
return rch
}
// SlowLogTxn wraps etcd transaction and log slow one.
type SlowLogTxn struct {
clientv3.Txn
......
package kv
import "go.etcd.io/etcd/clientv3"
type Base interface {
Load(key string) (string, error)
Save(key, value string) error
Remove(key string) error
Watch(key string) clientv3.WatchChan
}
......@@ -36,11 +36,11 @@ func Collection2JSON(c Collection) (string, error) {
return string(b), nil
}
func JSON2Collection(s string) (Collection, error) {
func JSON2Collection(s string) (*Collection, error) {
var c Collection
err := json.Unmarshal([]byte(s), &c)
if err != nil {
return Collection{}, err
return &Collection{}, err
}
return c, nil
return &c, nil
}
package mock
import (
"context"
"log"
"time"
pb "github.com/czs007/suvlim/pkg/master/grpc"
"google.golang.org/grpc"
)
// func main() {
// // Set up a connection to the server.
// conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
// if err != nil {
// log.Fatalf("did not connect: %v", err)
// }
// defer conn.Close()
// c := pb.NewGreeterClient(conn)
// // Contact the server and print out its response.
// name := defaultName
// if len(os.Args) > 1 {
// name = os.Args[1]
// }
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
// if err != nil {
// log.Fatalf("could not greet: %v", err)
// }
// log.Printf("Greeting: %s", r.GetMessage())
// }
const (
addr = "192.168.1.10:53100"
)
func FakeCreateCollectionByGRPC() (string, uint64) {
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewMasterClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.CreateCollection(ctx, &pb.CreateCollectionRequest{CollectionName: "grpc-client-test"})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("CreateCollection: %s, id: %d", r.GetCollectionName(), r.GetCollectionId())
return r.GetCollectionName(), r.GetCollectionId()
}
package mock
import (
"fmt"
"testing"
)
func TestFakeCreateCollectionByGRPC(t *testing.T) {
collectionName, segmentID := FakeCreateCollectionByGRPC()
if collectionName != "grpc-client-test" {
t.Error("Collection name wrong")
}
fmt.Println(collectionName)
fmt.Println(segmentID)
}
......@@ -51,13 +51,13 @@ func Segment2JSON(s Segment) (string, error) {
return string(b), nil
}
func JSON2Segment(s string) (Segment, error) {
func JSON2Segment(s string) (*Segment, error) {
var c Segment
err := json.Unmarshal([]byte(s), &c)
if err != nil {
return Segment{}, err
return &Segment{}, err
}
return c, nil
return &c, nil
}
func FakeCreateSegment(id uint64, cl Collection, opentime time.Time, closetime time.Time) Segment {
......
package master
import (
"context"
"fmt"
"log"
"net"
"strconv"
"time"
"github.com/czs007/suvlim/pkg/master/common"
pb "github.com/czs007/suvlim/pkg/master/grpc"
"github.com/czs007/suvlim/pkg/master/informer"
"github.com/czs007/suvlim/pkg/master/kv"
"github.com/czs007/suvlim/pkg/master/mock"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc"
)
func Run() {
go mock.FakePulsarProducer()
go GRPCServer()
go SegmentStatsController()
go CollectionController()
for {
}
}
func SegmentStatsController() {
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:12379"},
DialTimeout: 5 * time.Second,
})
defer cli.Close()
kvbase := kv.NewEtcdKVBase(cli, common.ETCD_ROOT_PATH)
ssChan := make(chan mock.SegmentStats, 10)
defer close(ssChan)
ssClient := informer.NewPulsarClient()
......@@ -21,7 +41,7 @@ func SegmentStatsController() {
for {
select {
case ss := <-ssChan:
fmt.Println(ss)
ComputeCloseTime(ss, kvbase)
case <-time.After(5 * time.Second):
fmt.Println("timeout")
return
......@@ -29,8 +49,51 @@ func SegmentStatsController() {
}
}
func GRPCServer() {
func ComputeCloseTime(ss mock.SegmentStats, kvbase kv.Base) error {
if int(ss.MemorySize) > common.SEGMENT_THRESHOLE*0.8 {
memRate := int(ss.MemoryRate)
if memRate == 0 {
memRate = 1
}
sec := common.SEGMENT_THRESHOLE * 0.2 / memRate
data, err := kvbase.Load(strconv.Itoa(int(ss.SegementID)))
if err != nil {
return err
}
seg, err := mock.JSON2Segment(data)
if err != nil {
return err
}
seg.CloseTimeStamp = time.Now().Add(time.Duration(sec) * time.Second)
updateData, err := mock.Segment2JSON(*seg)
if err != nil {
return err
}
kvbase.Save(strconv.Itoa(int(ss.SegementID)), updateData)
}
return nil
}
func GRPCServer() error {
lis, err := net.Listen("tcp", common.DEFAULT_GRPC_PORT)
if err != nil {
return err
}
s := grpc.NewServer()
pb.RegisterMasterServer(s, GRPCMasterServer{})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
return err
}
return nil
}
type GRPCMasterServer struct{}
func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *pb.CreateCollectionRequest) (*pb.CreateCollectionResponse, error) {
return &pb.CreateCollectionResponse{
CollectionName: in.CollectionName,
}, nil
}
func CollectionController() {
......@@ -56,7 +119,3 @@ func CollectionController() {
log.Fatal(err)
}
}
func Sync() {
}
......@@ -4,12 +4,6 @@
#include "utils/CommonUtil.h"
#include "config/ServerConfig.h"
namespace {
int64_t gen_channe_id(int64_t uid) {
// TODO: murmur3 hash from pulsar source code
return 0;
}
}
namespace milvus::message_client {
......@@ -142,7 +136,7 @@ milvus::grpc::QueryResult MsgClientV2::GetQueryResult(int64_t query_id) {
return Aggregation(total_results[query_id]);
}
Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request) {
Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request, uint64_t timestamp) {
// may have retry policy?
auto row_count = request.rows_data_size();
// TODO: Get the segment from master
......@@ -152,8 +146,10 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request) {
mut_msg.set_op(milvus::grpc::OpType::INSERT);
mut_msg.set_uid(GetUniqueQId());
mut_msg.set_client_id(client_id_);
auto channel_id = gen_channe_id(request.entity_id_array(i));
// TODO: add channel id
auto channel_id = 0;
mut_msg.set_channel_id(channel_id);
mut_msg.set_timestamp(timestamp);
mut_msg.set_collection_name(request.collection_name());
mut_msg.set_partition_tag(request.partition_tag());
mut_msg.set_segment_id(segment);
......@@ -169,7 +165,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::InsertParam &request) {
return Status::OK();
}
Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request) {
Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request, uint64_t timestamp) {
milvus::grpc::InsertOrDeleteMsg mut_msg;
for (auto id: request.id_array()) {
mut_msg.set_op(milvus::grpc::OpType::DELETE);
......@@ -177,6 +173,7 @@ Status MsgClientV2::SendMutMessage(const milvus::grpc::DeleteByIDParam &request)
mut_msg.set_client_id(client_id_);
mut_msg.set_uid(id);
mut_msg.set_collection_name(request.collection_name());
mut_msg.set_timestamp(timestamp);
auto result = insert_delete_producer_->send(mut_msg);
if (result != pulsar::ResultOk) {
......
......@@ -19,9 +19,9 @@ class MsgClientV2 {
const std::string &search_result);
// unpackage batch insert or delete request, and delivery message to pulsar per row
Status SendMutMessage(const milvus::grpc::InsertParam &request);
Status SendMutMessage(const milvus::grpc::InsertParam &request, uint64_t timestamp);
Status SendMutMessage(const milvus::grpc::DeleteByIDParam &request);
Status SendMutMessage(const milvus::grpc::DeleteByIDParam &request, uint64_t timestamp);
//
Status SendQueryMessage(const milvus::grpc::SearchParam &request);
......
......@@ -121,6 +121,11 @@ ReqScheduler::TakeToExecute(ReqQueuePtr req_queue) {
}
try {
if (req->type() == ReqType::kInsert || req->type() == ReqType::kDeleteEntityByID){
std::lock_guard lock(time_syc_mtx_);
sending_ = true;
req->SetTimestamp(TSOracle::GetInstance().GetTimeStamp());
}
auto status = req->Execute();
if (!status.ok()) {
LOG_SERVER_ERROR_ << "Req failed with code: " << status.ToString();
......@@ -135,9 +140,6 @@ Status
ReqScheduler::PutToQueue(const BaseReqPtr& req_ptr) {
std::lock_guard<std::mutex> lock(queue_mtx_);
auto &tso = TSOracle::GetInstance();
req_ptr->SetTimestamp(tso.GetTimeStamp());
std::string group_name = req_ptr->req_group();
if (req_groups_.count(group_name) > 0) {
req_groups_[group_name]->PutReq(req_ptr);
......@@ -156,12 +158,24 @@ ReqScheduler::PutToQueue(const BaseReqPtr& req_ptr) {
return Status::OK();
}
int64_t ReqScheduler::GetLatestReqDeliveredTime() {
return latest_req_time_.load();
int64_t ReqScheduler::GetLatestDeliveredReqTime() {
std::lock_guard lock(time_syc_mtx_);
if (sending_){
return latest_req_time_;
}
return TSOracle::GetInstance().GetTimeStamp();
}
void ReqScheduler::UpdateLatestDeliveredReqTime(int64_t time) {
latest_req_time_.store(time);
std::lock_guard lock(time_syc_mtx_);
// update pulsar synchronous time only if message has been sent to pulsar
assert(sending_);
sending_ = false;
latest_req_time_ = time;
}
uint64_t GetMessageTimeSyncTime(){
return ReqScheduler::GetInstance().GetLatestDeliveredReqTime();
}
} // namespace server
......
......@@ -47,8 +47,8 @@ class ReqScheduler {
void UpdateLatestDeliveredReqTime(int64_t time);
int64_t GetLatestReqDeliveredTime();
int64_t GetLatestDeliveredReqTime();
protected:
ReqScheduler();
......@@ -63,8 +63,11 @@ class ReqScheduler {
private:
mutable std::mutex queue_mtx_;
std::atomic<int64_t > latest_req_time_;
// for time synchronous
std::mutex time_syc_mtx_;
int64_t latest_req_time_;
bool sending_;
std::map<std::string, ReqQueuePtr> req_groups_;
......@@ -73,5 +76,7 @@ class ReqScheduler {
bool stopped_;
};
extern uint64_t GetMessageTimeSyncTime();
} // namespace server
} // namespace milvus
......@@ -42,7 +42,7 @@ DeleteEntityByIDReq::Create(const ContextPtr& context, const ::milvus::grpc::Del
Status
DeleteEntityByIDReq::OnExecute() {
auto &msg_client = message_client::MsgClientV2::GetInstance();
Status status = msg_client.SendMutMessage(*request_);
Status status = msg_client.SendMutMessage(*request_, timestamp_);
return status;
}
......
......@@ -37,9 +37,12 @@ class DeleteEntityByIDReq : public BaseReq {
Status
OnExecute() override;
Status
OnPostExecute() override;
private:
const ::milvus::grpc::DeleteByIDParam *request_;
Status OnPostExecute();
};
} // namespace server
......
......@@ -43,7 +43,7 @@ Status
InsertReq::OnExecute() {
LOG_SERVER_INFO_ << LogOut("[%s][%ld] ", "insert", 0) << "Execute InsertReq.";
auto &msg_client = message_client::MsgClientV2::GetInstance();
Status status = msg_client.SendMutMessage(*insert_param_);
Status status = msg_client.SendMutMessage(*insert_param_, timestamp_);
return status;
}
......
......@@ -30,6 +30,7 @@
#include <thread>
#include <utility>
#include <vector>
#include <src/server/delivery/ReqScheduler.h>
#include "GrpcRequestHandler.h"
#include "config/ServerConfig.h"
......@@ -39,6 +40,7 @@
#include "server/grpc_impl/interceptor/SpanInterceptor.h"
#include "utils/Log.h"
#include "message_client/ClientV2.h"
#include "server/timesync/TimeSync.h"
namespace milvus {
namespace server {
......@@ -122,7 +124,6 @@ GrpcServer::StartService() {
server_ptr_ = builder.BuildAndStart();
server_ptr_->Wait();
return Status::OK();
}
......
......@@ -24,8 +24,8 @@ uint64_t TSOracle::GetTimeStamp() {
}
uint64_t TSOracle::GetPhysical(const std::chrono::high_resolution_clock::time_point &t) {
auto nano_time = std::chrono::duration_cast<std::chrono::nanoseconds>(t.time_since_epoch());
return nano_time / std::chrono::microseconds(1);
auto nano_time = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch());
return nano_time.count();
}
uint64_t TSOracle::ComposeTs(uint64_t physical, uint64_t logical) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册