提交 2e2e059f 编写于 作者: Z zhenshan.cao 提交者: yefu.chen

Add msgheader and change ReqType to MsgType

Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 1bd7938d
...@@ -44,12 +44,12 @@ type showCollectionsTask struct { ...@@ -44,12 +44,12 @@ type showCollectionsTask struct {
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
func (t *createCollectionTask) Type() internalpb.ReqType { func (t *createCollectionTask) Type() internalpb.MsgType {
if t.req == nil { if t.req == nil {
log.Printf("null request") log.Printf("null request")
return 0 return 0
} }
return t.req.ReqType return t.req.MsgType
} }
func (t *createCollectionTask) Ts() (Timestamp, error) { func (t *createCollectionTask) Ts() (Timestamp, error) {
...@@ -106,12 +106,12 @@ func (t *createCollectionTask) Execute() error { ...@@ -106,12 +106,12 @@ func (t *createCollectionTask) Execute() error {
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
func (t *dropCollectionTask) Type() internalpb.ReqType { func (t *dropCollectionTask) Type() internalpb.MsgType {
if t.req == nil { if t.req == nil {
log.Printf("null request") log.Printf("null request")
return 0 return 0
} }
return t.req.ReqType return t.req.MsgType
} }
func (t *dropCollectionTask) Ts() (Timestamp, error) { func (t *dropCollectionTask) Ts() (Timestamp, error) {
...@@ -149,12 +149,12 @@ func (t *dropCollectionTask) Execute() error { ...@@ -149,12 +149,12 @@ func (t *dropCollectionTask) Execute() error {
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
func (t *hasCollectionTask) Type() internalpb.ReqType { func (t *hasCollectionTask) Type() internalpb.MsgType {
if t.req == nil { if t.req == nil {
log.Printf("null request") log.Printf("null request")
return 0 return 0
} }
return t.req.ReqType return t.req.MsgType
} }
func (t *hasCollectionTask) Ts() (Timestamp, error) { func (t *hasCollectionTask) Ts() (Timestamp, error) {
...@@ -181,12 +181,12 @@ func (t *hasCollectionTask) Execute() error { ...@@ -181,12 +181,12 @@ func (t *hasCollectionTask) Execute() error {
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
func (t *describeCollectionTask) Type() internalpb.ReqType { func (t *describeCollectionTask) Type() internalpb.MsgType {
if t.req == nil { if t.req == nil {
log.Printf("null request") log.Printf("null request")
return 0 return 0
} }
return t.req.ReqType return t.req.MsgType
} }
func (t *describeCollectionTask) Ts() (Timestamp, error) { func (t *describeCollectionTask) Ts() (Timestamp, error) {
...@@ -223,12 +223,12 @@ func (t *describeCollectionTask) Execute() error { ...@@ -223,12 +223,12 @@ func (t *describeCollectionTask) Execute() error {
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
func (t *showCollectionsTask) Type() internalpb.ReqType { func (t *showCollectionsTask) Type() internalpb.MsgType {
if t.req == nil { if t.req == nil {
log.Printf("null request") log.Printf("null request")
return 0 return 0
} }
return t.req.ReqType return t.req.MsgType
} }
func (t *showCollectionsTask) Ts() (Timestamp, error) { func (t *showCollectionsTask) Ts() (Timestamp, error) {
......
...@@ -42,12 +42,12 @@ type showPartitionTask struct { ...@@ -42,12 +42,12 @@ type showPartitionTask struct {
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
func (t *createPartitionTask) Type() internalpb.ReqType { func (t *createPartitionTask) Type() internalpb.MsgType {
if t.req == nil { if t.req == nil {
log.Printf("null request") log.Printf("null request")
return 0 return 0
} }
return t.req.ReqType return t.req.MsgType
} }
func (t *createPartitionTask) Ts() (Timestamp, error) { func (t *createPartitionTask) Ts() (Timestamp, error) {
...@@ -91,12 +91,12 @@ func (t *createPartitionTask) Execute() error { ...@@ -91,12 +91,12 @@ func (t *createPartitionTask) Execute() error {
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
func (t *dropPartitionTask) Type() internalpb.ReqType { func (t *dropPartitionTask) Type() internalpb.MsgType {
if t.req == nil { if t.req == nil {
log.Printf("null request") log.Printf("null request")
return 0 return 0
} }
return t.req.ReqType return t.req.MsgType
} }
func (t *dropPartitionTask) Ts() (Timestamp, error) { func (t *dropPartitionTask) Ts() (Timestamp, error) {
...@@ -143,12 +143,12 @@ func (t *dropPartitionTask) Execute() error { ...@@ -143,12 +143,12 @@ func (t *dropPartitionTask) Execute() error {
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
func (t *hasPartitionTask) Type() internalpb.ReqType { func (t *hasPartitionTask) Type() internalpb.MsgType {
if t.req == nil { if t.req == nil {
log.Printf("null request") log.Printf("null request")
return 0 return 0
} }
return t.req.ReqType return t.req.MsgType
} }
func (t *hasPartitionTask) Ts() (Timestamp, error) { func (t *hasPartitionTask) Ts() (Timestamp, error) {
...@@ -173,12 +173,12 @@ func (t *hasPartitionTask) Execute() error { ...@@ -173,12 +173,12 @@ func (t *hasPartitionTask) Execute() error {
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
func (t *describePartitionTask) Type() internalpb.ReqType { func (t *describePartitionTask) Type() internalpb.MsgType {
if t.req == nil { if t.req == nil {
log.Printf("null request") log.Printf("null request")
return 0 return 0
} }
return t.req.ReqType return t.req.MsgType
} }
func (t *describePartitionTask) Ts() (Timestamp, error) { func (t *describePartitionTask) Ts() (Timestamp, error) {
...@@ -210,12 +210,12 @@ func (t *describePartitionTask) Execute() error { ...@@ -210,12 +210,12 @@ func (t *describePartitionTask) Execute() error {
} }
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
func (t *showPartitionTask) Type() internalpb.ReqType { func (t *showPartitionTask) Type() internalpb.MsgType {
if t.req == nil { if t.req == nil {
log.Printf("null request") log.Printf("null request")
return 0 return 0
} }
return t.req.ReqType return t.req.MsgType
} }
func (t *showPartitionTask) Ts() (Timestamp, error) { func (t *showPartitionTask) Ts() (Timestamp, error) {
......
...@@ -18,7 +18,7 @@ type baseTask struct { ...@@ -18,7 +18,7 @@ type baseTask struct {
} }
type task interface { type task interface {
Type() internalpb.ReqType Type() internalpb.MsgType
Ts() (Timestamp, error) Ts() (Timestamp, error)
Execute() error Execute() error
WaitToFinish(ctx context.Context) error WaitToFinish(ctx context.Context) error
......
...@@ -29,7 +29,7 @@ func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg { ...@@ -29,7 +29,7 @@ func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg {
switch msgType { switch msgType {
case KInsert: case KInsert:
insertRequest := internalPb.InsertRequest{ insertRequest := internalPb.InsertRequest{
ReqType: internalPb.ReqType_kInsert, MsgType: internalPb.MsgType_kInsert,
ReqId: reqId, ReqId: reqId,
CollectionName: "Collection", CollectionName: "Collection",
PartitionTag: "Partition", PartitionTag: "Partition",
...@@ -45,7 +45,7 @@ func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg { ...@@ -45,7 +45,7 @@ func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg {
tsMsg = insertMsg tsMsg = insertMsg
case KDelete: case KDelete:
deleteRequest := internalPb.DeleteRequest{ deleteRequest := internalPb.DeleteRequest{
ReqType: internalPb.ReqType_kDelete, MsgType: internalPb.MsgType_kDelete,
ReqId: reqId, ReqId: reqId,
CollectionName: "Collection", CollectionName: "Collection",
ChannelId: 1, ChannelId: 1,
...@@ -60,7 +60,7 @@ func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg { ...@@ -60,7 +60,7 @@ func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg {
tsMsg = deleteMsg tsMsg = deleteMsg
case KSearch: case KSearch:
searchRequest := internalPb.SearchRequest{ searchRequest := internalPb.SearchRequest{
ReqType: internalPb.ReqType_kSearch, MsgType: internalPb.MsgType_kSearch,
ReqId: reqId, ReqId: reqId,
ProxyId: 1, ProxyId: 1,
Timestamp: 1, Timestamp: 1,
...@@ -97,7 +97,7 @@ func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg { ...@@ -97,7 +97,7 @@ func getTsMsg(msgType MsgType, reqId int64, hashValue int32) *TsMsg {
tsMsg = timeSyncMsg tsMsg = timeSyncMsg
case KTimeTick: case KTimeTick:
insertRequest := internalPb.InsertRequest{ insertRequest := internalPb.InsertRequest{
ReqType: internalPb.ReqType_kTimeTick, MsgType: internalPb.MsgType_kTimeTick,
ReqId: reqId, ReqId: reqId,
CollectionName: "Collection", CollectionName: "Collection",
PartitionTag: "Partition", PartitionTag: "Partition",
......
...@@ -207,7 +207,7 @@ func TestNewStream_Insert_TimeTick(t *testing.T) { ...@@ -207,7 +207,7 @@ func TestNewStream_Insert_TimeTick(t *testing.T) {
msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KInsert, 1, 1)) msgPack.Msgs = append(msgPack.Msgs, getTsMsg(KInsert, 1, 1))
insertRequest := internalPb.InsertRequest{ insertRequest := internalPb.InsertRequest{
ReqType: internalPb.ReqType_kTimeTick, MsgType: internalPb.MsgType_kTimeTick,
ReqId: 2, ReqId: 2,
CollectionName: "Collection", CollectionName: "Collection",
PartitionTag: "Partition", PartitionTag: "Partition",
......
...@@ -67,7 +67,7 @@ func (it InsertTask) EndTs() Timestamp { ...@@ -67,7 +67,7 @@ func (it InsertTask) EndTs() Timestamp {
} }
func (it InsertTask) Type() MsgType { func (it InsertTask) Type() MsgType {
if it.ReqType == internalPb.ReqType_kTimeTick { if it.MsgType == internalPb.MsgType_kTimeTick {
return KTimeSync return KTimeSync
} }
return KInsert return KInsert
...@@ -118,7 +118,7 @@ func (dt DeleteTask) EndTs() Timestamp { ...@@ -118,7 +118,7 @@ func (dt DeleteTask) EndTs() Timestamp {
} }
func (dt DeleteTask) Type() MsgType { func (dt DeleteTask) Type() MsgType {
if dt.ReqType == internalPb.ReqType_kTimeTick { if dt.MsgType == internalPb.MsgType_kTimeTick {
return KTimeSync return KTimeSync
} }
return KDelete return KDelete
...@@ -147,7 +147,7 @@ func (st SearchTask) EndTs() Timestamp { ...@@ -147,7 +147,7 @@ func (st SearchTask) EndTs() Timestamp {
} }
func (st SearchTask) Type() MsgType { func (st SearchTask) Type() MsgType {
if st.ReqType == internalPb.ReqType_kTimeTick { if st.MsgType == internalPb.MsgType_kTimeTick {
return KTimeSync return KTimeSync
} }
return KSearch return KSearch
......
...@@ -6,7 +6,7 @@ import "common.proto"; ...@@ -6,7 +6,7 @@ import "common.proto";
import "service_msg.proto"; import "service_msg.proto";
enum ReqType { enum MsgType {
kNone = 0; kNone = 0;
/* Definition Requests: collection */ /* Definition Requests: collection */
kCreateCollection = 100; kCreateCollection = 100;
...@@ -72,7 +72,7 @@ message TsoResponse { ...@@ -72,7 +72,7 @@ message TsoResponse {
message CreateCollectionRequest { message CreateCollectionRequest {
ReqType req_type = 1; MsgType msg_type = 1;
int64 req_id = 2; int64 req_id = 2;
uint64 timestamp = 3; uint64 timestamp = 3;
int64 proxy_id = 4; int64 proxy_id = 4;
...@@ -81,7 +81,7 @@ message CreateCollectionRequest { ...@@ -81,7 +81,7 @@ message CreateCollectionRequest {
message DropCollectionRequest { message DropCollectionRequest {
ReqType req_type = 1; MsgType msg_type = 1;
int64 req_id = 2; int64 req_id = 2;
uint64 timestamp = 3; uint64 timestamp = 3;
int64 proxy_id = 4; int64 proxy_id = 4;
...@@ -90,7 +90,7 @@ message DropCollectionRequest { ...@@ -90,7 +90,7 @@ message DropCollectionRequest {
message HasCollectionRequest { message HasCollectionRequest {
ReqType req_type = 1; MsgType msg_type = 1;
int64 req_id = 2; int64 req_id = 2;
uint64 timestamp = 3; uint64 timestamp = 3;
int64 proxy_id = 4; int64 proxy_id = 4;
...@@ -99,7 +99,7 @@ message HasCollectionRequest { ...@@ -99,7 +99,7 @@ message HasCollectionRequest {
message DescribeCollectionRequest { message DescribeCollectionRequest {
ReqType req_type = 1; MsgType msg_type = 1;
int64 req_id = 2; int64 req_id = 2;
uint64 timestamp = 3; uint64 timestamp = 3;
int64 proxy_id = 4; int64 proxy_id = 4;
...@@ -108,7 +108,7 @@ message DescribeCollectionRequest { ...@@ -108,7 +108,7 @@ message DescribeCollectionRequest {
message ShowCollectionRequest { message ShowCollectionRequest {
ReqType req_type = 1; MsgType msg_type = 1;
int64 req_id = 2; int64 req_id = 2;
uint64 timestamp = 3; uint64 timestamp = 3;
int64 proxy_id = 4; int64 proxy_id = 4;
...@@ -116,7 +116,7 @@ message ShowCollectionRequest { ...@@ -116,7 +116,7 @@ message ShowCollectionRequest {
message CreatePartitionRequest { message CreatePartitionRequest {
ReqType req_type = 1; MsgType msg_type = 1;
int64 req_id = 2; int64 req_id = 2;
uint64 timestamp = 3; uint64 timestamp = 3;
int64 proxy_id = 4; int64 proxy_id = 4;
...@@ -125,7 +125,7 @@ message CreatePartitionRequest { ...@@ -125,7 +125,7 @@ message CreatePartitionRequest {
message DropPartitionRequest { message DropPartitionRequest {
ReqType req_type = 1; MsgType msg_type = 1;
int64 req_id = 2; int64 req_id = 2;
uint64 timestamp = 3; uint64 timestamp = 3;
int64 proxy_id = 4; int64 proxy_id = 4;
...@@ -134,7 +134,7 @@ message DropPartitionRequest { ...@@ -134,7 +134,7 @@ message DropPartitionRequest {
message HasPartitionRequest { message HasPartitionRequest {
ReqType req_type = 1; MsgType msg_type = 1;
int64 req_id = 2; int64 req_id = 2;
uint64 timestamp = 3; uint64 timestamp = 3;
int64 proxy_id = 4; int64 proxy_id = 4;
...@@ -143,7 +143,7 @@ message HasPartitionRequest { ...@@ -143,7 +143,7 @@ message HasPartitionRequest {
message DescribePartitionRequest { message DescribePartitionRequest {
ReqType req_type = 1; MsgType msg_type = 1;
int64 req_id = 2; int64 req_id = 2;
uint64 timestamp = 3; uint64 timestamp = 3;
int64 proxy_id = 4; int64 proxy_id = 4;
...@@ -152,7 +152,7 @@ message DescribePartitionRequest { ...@@ -152,7 +152,7 @@ message DescribePartitionRequest {
message ShowPartitionRequest { message ShowPartitionRequest {
ReqType req_type = 1; MsgType msg_type = 1;
int64 req_id = 2; int64 req_id = 2;
uint64 timestamp = 3; uint64 timestamp = 3;
int64 proxy_id = 4; int64 proxy_id = 4;
...@@ -161,7 +161,7 @@ message ShowPartitionRequest { ...@@ -161,7 +161,7 @@ message ShowPartitionRequest {
message InsertRequest { message InsertRequest {
ReqType req_type = 1; MsgType msg_type = 1;
int64 req_id = 2; int64 req_id = 2;
string collection_name = 3; string collection_name = 3;
string partition_tag = 4; string partition_tag = 4;
...@@ -175,7 +175,7 @@ message InsertRequest { ...@@ -175,7 +175,7 @@ message InsertRequest {
message DeleteRequest { message DeleteRequest {
ReqType req_type = 1; MsgType msg_type = 1;
int64 req_id = 2; int64 req_id = 2;
string collection_name = 3; string collection_name = 3;
int64 channel_id = 4; int64 channel_id = 4;
...@@ -186,7 +186,7 @@ message DeleteRequest { ...@@ -186,7 +186,7 @@ message DeleteRequest {
message SearchRequest { message SearchRequest {
ReqType req_type = 1; MsgType msg_type = 1;
int64 req_id = 2; int64 req_id = 2;
int64 proxy_id = 3; int64 proxy_id = 3;
uint64 timestamp = 4; uint64 timestamp = 4;
...@@ -205,7 +205,6 @@ message SearchResult { ...@@ -205,7 +205,6 @@ message SearchResult {
repeated service.Hits hits = 7; repeated service.Hits hits = 7;
} }
message TimeTickMsg { message TimeTickMsg {
int64 peer_id = 1; int64 peer_id = 1;
uint64 timestamp = 2; uint64 timestamp = 2;
......
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: msg_header.proto
package internalpb
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
any "github.com/golang/protobuf/ptypes/any"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type MsgHeader struct {
MsgType MsgType `protobuf:"varint,1,opt,name=msg_type,json=msgType,proto3,enum=milvus.proto.internal.MsgType" json:"msg_type,omitempty"`
Message *any.Any `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MsgHeader) Reset() { *m = MsgHeader{} }
func (m *MsgHeader) String() string { return proto.CompactTextString(m) }
func (*MsgHeader) ProtoMessage() {}
func (*MsgHeader) Descriptor() ([]byte, []int) {
return fileDescriptor_4712536c36da8833, []int{0}
}
func (m *MsgHeader) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_MsgHeader.Unmarshal(m, b)
}
func (m *MsgHeader) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_MsgHeader.Marshal(b, m, deterministic)
}
func (m *MsgHeader) XXX_Merge(src proto.Message) {
xxx_messageInfo_MsgHeader.Merge(m, src)
}
func (m *MsgHeader) XXX_Size() int {
return xxx_messageInfo_MsgHeader.Size(m)
}
func (m *MsgHeader) XXX_DiscardUnknown() {
xxx_messageInfo_MsgHeader.DiscardUnknown(m)
}
var xxx_messageInfo_MsgHeader proto.InternalMessageInfo
func (m *MsgHeader) GetMsgType() MsgType {
if m != nil {
return m.MsgType
}
return MsgType_kNone
}
func (m *MsgHeader) GetMessage() *any.Any {
if m != nil {
return m.Message
}
return nil
}
func init() {
proto.RegisterType((*MsgHeader)(nil), "milvus.proto.internal.MsgHeader")
}
func init() { proto.RegisterFile("msg_header.proto", fileDescriptor_4712536c36da8833) }
var fileDescriptor_4712536c36da8833 = []byte{
// 222 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x8f, 0x31, 0x4b, 0xc4, 0x40,
0x10, 0x85, 0xc9, 0x15, 0x9e, 0x46, 0x10, 0x09, 0x0a, 0xe7, 0x15, 0x72, 0x58, 0xa5, 0x71, 0x16,
0x62, 0x65, 0x69, 0xb0, 0xb0, 0xb9, 0xe6, 0xb0, 0xb2, 0x39, 0xb2, 0x97, 0x71, 0xb2, 0xb0, 0x9b,
0x0d, 0x3b, 0xbb, 0xc2, 0xe6, 0xd7, 0x8b, 0x59, 0xd6, 0xea, 0xba, 0xf7, 0xcd, 0x1b, 0xe6, 0x63,
0xca, 0x5b, 0xc3, 0x74, 0x1c, 0xb0, 0xeb, 0xd1, 0xc1, 0xe4, 0xac, 0xb7, 0xd5, 0xbd, 0x51, 0xfa,
0x27, 0x70, 0x22, 0x50, 0xa3, 0x47, 0x37, 0x76, 0x7a, 0x5b, 0xe5, 0x74, 0x34, 0x4c, 0xa9, 0xdc,
0x3e, 0x90, 0xb5, 0xa4, 0x51, 0x2c, 0x24, 0xc3, 0xb7, 0xe8, 0xc6, 0x98, 0xaa, 0xa7, 0xb9, 0xbc,
0xda, 0x33, 0x7d, 0x2c, 0x87, 0xab, 0xd7, 0xf2, 0xf2, 0x4f, 0xe3, 0xe3, 0x84, 0x9b, 0x62, 0x57,
0xd4, 0x37, 0xcd, 0x23, 0x9c, 0xb5, 0xc0, 0x9e, 0xe9, 0x33, 0x4e, 0x78, 0x58, 0x9b, 0x14, 0xaa,
0xa6, 0x5c, 0x1b, 0x64, 0xee, 0x08, 0x37, 0xab, 0x5d, 0x51, 0x5f, 0x37, 0x77, 0x90, 0xa4, 0x90,
0xa5, 0xf0, 0x36, 0xc6, 0x76, 0x55, 0x17, 0x87, 0xbc, 0xd8, 0xbe, 0x7f, 0xb5, 0xa4, 0xfc, 0x10,
0x24, 0x9c, 0xac, 0x11, 0xb3, 0xd2, 0x5a, 0xcd, 0x1e, 0x4f, 0x83, 0x48, 0xce, 0xe7, 0x5e, 0xb1,
0x77, 0x4a, 0x06, 0x8f, 0xbd, 0xc8, 0xe6, 0xf4, 0xc3, 0x3f, 0x4e, 0x52, 0x5e, 0x2c, 0x93, 0x97,
0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x01, 0x87, 0xfd, 0x10, 0x22, 0x01, 0x00, 0x00,
}
syntax = "proto3";
package milvus.proto.internal;
option go_package="github.com/zilliztech/milvus-distributed/internal/proto/internalpb";
import "internal_msg.proto";
import "google/protobuf/any.proto";
message MsgHeader {
MsgType msg_type = 1;
google.protobuf.Any message = 2[lazy=true];
}
...@@ -22,7 +22,7 @@ func (it *insertTask) PreExecute() error { ...@@ -22,7 +22,7 @@ func (it *insertTask) PreExecute() error {
func (it *insertTask) Execute() error { func (it *insertTask) Execute() error {
ts := it.GetTs() ts := it.GetTs()
insertRequest := internalpb.InsertRequest{ insertRequest := internalpb.InsertRequest{
ReqType: internalpb.ReqType_kInsert, MsgType: internalpb.MsgType_kInsert,
ReqId: it.ReqId, ReqId: it.ReqId,
CollectionName: it.rowBatch.CollectionName, CollectionName: it.rowBatch.CollectionName,
PartitionTag: it.rowBatch.PartitionTag, PartitionTag: it.rowBatch.PartitionTag,
......
...@@ -52,7 +52,7 @@ func (ins *proxyInstance) restartSchedulerRoutine(bufSize int) error { ...@@ -52,7 +52,7 @@ func (ins *proxyInstance) restartSchedulerRoutine(bufSize int) error {
select { select {
case t := <-ins.taskChan: case t := <-ins.taskChan:
switch (*t).Type() { switch (*t).Type() {
case internalpb.ReqType_kInsert: case internalpb.MsgType_kInsert:
ins.taskSch.DmQueue.Enqueue(t) ins.taskSch.DmQueue.Enqueue(t)
default: default:
return return
......
...@@ -15,7 +15,7 @@ import ( ...@@ -15,7 +15,7 @@ import (
) )
type BaseRequest interface { type BaseRequest interface {
Type() internalpb.ReqType Type() internalpb.MsgType
PreExecute() commonpb.Status PreExecute() commonpb.Status
Execute() commonpb.Status Execute() commonpb.Status
PostExecute() commonpb.Status PostExecute() commonpb.Status
......
...@@ -19,8 +19,8 @@ type queryReq struct { ...@@ -19,8 +19,8 @@ type queryReq struct {
} }
// BaseRequest interfaces // BaseRequest interfaces
func (req *queryReq) Type() internalpb.ReqType { func (req *queryReq) Type() internalpb.MsgType {
return req.ReqType return req.MsgType
} }
func (req *queryReq) PreExecute() commonpb.Status { func (req *queryReq) PreExecute() commonpb.Status {
......
...@@ -145,38 +145,6 @@ func (s *proxyServer) ShowPartitions(ctx context.Context, req *servicepb.Collect ...@@ -145,38 +145,6 @@ func (s *proxyServer) ShowPartitions(ctx context.Context, req *servicepb.Collect
}, nil }, nil
} }
func (s *proxyServer) DeleteByID(ctx context.Context, req *pb.DeleteByIDParam) (*commonpb.Status, error) {
log.Printf("delete entites, total = %d", len(req.IdArray))
mReqMsg := pb.ManipulationReqMsg{
CollectionName: req.CollectionName,
ReqType: pb.ReqType_kDeleteEntityByID,
ProxyId: s.proxyId,
}
for _, id := range req.IdArray {
mReqMsg.PrimaryKeys = append(mReqMsg.PrimaryKeys, id)
}
if len(mReqMsg.PrimaryKeys) > 1 {
mReq := &manipulationReq{
stats: make([]commonpb.Status, 1),
msgs: append([]*pb.ManipulationReqMsg{}, &mReqMsg),
proxy: s,
}
if st := mReq.PreExecute(); st.ErrorCode != commonpb.ErrorCode_SUCCESS {
return &st, nil
}
if st := mReq.Execute(); st.ErrorCode != commonpb.ErrorCode_SUCCESS {
return &st, nil
}
if st := mReq.PostExecute(); st.ErrorCode != commonpb.ErrorCode_SUCCESS {
return &st, nil
}
if st := mReq.WaitToFinish(); st.ErrorCode != commonpb.ErrorCode_SUCCESS {
return &st, nil
}
}
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, nil
}
func (s *proxyServer) Insert(ctx context.Context, req *servicepb.RowBatch) (*servicepb.IntegerRangeResponse, error) { func (s *proxyServer) Insert(ctx context.Context, req *servicepb.RowBatch) (*servicepb.IntegerRangeResponse, error) {
log.Printf("Insert Entities, total = %d", len(req.RowData)) log.Printf("Insert Entities, total = %d", len(req.RowData))
msgMap := make(map[uint32]*pb.ManipulationReqMsg) msgMap := make(map[uint32]*pb.ManipulationReqMsg)
...@@ -262,7 +230,7 @@ func (s *proxyServer) Insert(ctx context.Context, req *servicepb.RowBatch) (*ser ...@@ -262,7 +230,7 @@ func (s *proxyServer) Insert(ctx context.Context, req *servicepb.RowBatch) (*ser
func (s *proxyServer) Search(ctx context.Context, req *servicepb.Query) (*servicepb.QueryResult, error) { func (s *proxyServer) Search(ctx context.Context, req *servicepb.Query) (*servicepb.QueryResult, error) {
qm := &queryReq{ qm := &queryReq{
SearchRequest: internalpb.SearchRequest{ SearchRequest: internalpb.SearchRequest{
ReqType: internalpb.ReqType_kSearch, MsgType: internalpb.MsgType_kSearch,
ProxyId: s.proxyId, ProxyId: s.proxyId,
ReqId: s.queryId.Add(1), ReqId: s.queryId.Add(1),
Timestamp: 0, Timestamp: 0,
......
...@@ -9,7 +9,7 @@ import ( ...@@ -9,7 +9,7 @@ import (
type task interface { type task interface {
Id() int64 // return ReqId Id() int64 // return ReqId
Type() internalpb.ReqType Type() internalpb.MsgType
GetTs() typeutil.Timestamp GetTs() typeutil.Timestamp
SetTs(ts typeutil.Timestamp) SetTs(ts typeutil.Timestamp)
PreExecute() error PreExecute() error
...@@ -20,7 +20,7 @@ type task interface { ...@@ -20,7 +20,7 @@ type task interface {
} }
type baseTask struct { type baseTask struct {
ReqType internalpb.ReqType ReqType internalpb.MsgType
ReqId int64 ReqId int64
Ts typeutil.Timestamp Ts typeutil.Timestamp
ProxyId int64 ProxyId int64
...@@ -30,7 +30,7 @@ func (bt *baseTask) Id() int64 { ...@@ -30,7 +30,7 @@ func (bt *baseTask) Id() int64 {
return bt.ReqId return bt.ReqId
} }
func (bt *baseTask) Type() internalpb.ReqType { func (bt *baseTask) Type() internalpb.MsgType {
return bt.ReqType return bt.ReqType
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册