提交 23d9ddb4 编写于 作者: D dragondriver 提交者: yefu.chen

Add unittest to grpc service that Proxy provide

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 9f948d30
......@@ -178,12 +178,16 @@ func (p *Proxy) queryResultLoop() {
return
}
log.Print("Consume message from query result stream...")
log.Printf("message pack: %v", msgPack)
if msgPack == nil {
continue
}
tsMsg := msgPack.Msgs[0]
searchResultMsg, _ := (*tsMsg).(*msgstream.SearchResultMsg)
reqID := searchResultMsg.GetReqID()
log.Printf("ts msg: %v", tsMsg)
log.Printf("search result message: %v", searchResultMsg)
log.Printf("req id: %v", reqID)
_, ok = queryResultBuf[reqID]
if !ok {
queryResultBuf[reqID] = make([]*internalpb.SearchResult, 0)
......@@ -193,8 +197,10 @@ func (p *Proxy) queryResultLoop() {
// TODO: use the number of query node instead
t := p.taskSch.getTaskByReqID(reqID)
qt := t.(*QueryTask)
qt.resultBuf <- queryResultBuf[reqID]
delete(queryResultBuf, reqID)
if qt != nil {
qt.resultBuf <- queryResultBuf[reqID]
delete(queryResultBuf, reqID)
}
}
case <-p.proxyLoopCtx.Done():
log.Print("proxy server is closed ...")
......
......@@ -10,17 +10,16 @@ import (
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/stretchr/testify/assert"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/master"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"go.uber.org/zap"
"google.golang.org/grpc"
gparams "github.com/zilliztech/milvus-distributed/internal/util/paramtableutil"
)
var ctx context.Context
......@@ -33,6 +32,8 @@ var proxyServer *Proxy
var masterServer *master.Master
var testNum = 10
func startMaster(ctx context.Context) {
etcdAddr := conf.Config.Etcd.Address
etcdAddr += ":"
......@@ -98,16 +99,18 @@ func shutdown() {
func TestProxy_CreateCollection(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 1; i++ {
for i := 0; i < testNum; i++ {
i := i
cs := &schemapb.CollectionSchema{
Name: "CreateCollection" + strconv.FormatInt(int64(i), 10),
collectionName := "CreateCollection" + strconv.FormatInt(int64(i), 10)
req := &schemapb.CollectionSchema{
Name: collectionName,
Description: "no description",
AutoID: true,
Fields: make([]*schemapb.FieldSchema, 1),
}
cs.Fields[0] = &schemapb.FieldSchema{
Name: "Field" + strconv.FormatInt(int64(i), 10),
fieldName := "Field" + strconv.FormatInt(int64(i), 10)
req.Fields[0] = &schemapb.FieldSchema{
Name: fieldName,
Description: "no description",
DataType: schemapb.DataType_INT32,
}
......@@ -115,12 +118,261 @@ func TestProxy_CreateCollection(t *testing.T) {
wg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
resp, err := proxyClient.CreateCollection(ctx, cs)
bool, err := proxyClient.HasCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName})
if err != nil {
t.Error(err)
}
msg := "Has Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
if !bool.Value {
resp, err := proxyClient.CreateCollection(ctx, req)
if err != nil {
t.Error(err)
}
msg := "Create Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
}
}(&wg)
}
wg.Wait()
}
func TestProxy_HasCollection(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < testNum; i++ {
i := i
collectionName := "CreateCollection" + strconv.FormatInt(int64(i), 10)
wg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
bool, err := proxyClient.HasCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName})
if err != nil {
t.Error(err)
}
msg := "Has Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
t.Logf("Has Collection %v: %v", i, bool.Value)
}(&wg)
}
wg.Wait()
}
func TestProxy_DescribeCollection(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < testNum; i++ {
i := i
collectionName := "CreateCollection" + strconv.FormatInt(int64(i), 10)
wg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
bool, err := proxyClient.HasCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName})
if err != nil {
t.Error(err)
}
msg := "Has Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
if bool.Value {
resp, err := proxyClient.DescribeCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName})
if err != nil {
t.Error(err)
}
msg := "Describe Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, resp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
t.Logf("Describe Collection %v: %v", i, resp)
}
}(&wg)
}
wg.Wait()
}
func TestProxy_ShowCollections(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < testNum; i++ {
i := i
collectionName := "CreateCollection" + strconv.FormatInt(int64(i), 10)
wg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
bool, err := proxyClient.HasCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName})
if err != nil {
t.Error(err)
}
msg := "Create Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
msg := "Has Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
if bool.Value {
resp, err := proxyClient.ShowCollections(ctx, &commonpb.Empty{})
if err != nil {
t.Error(err)
}
msg := "Show collections " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, resp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
t.Logf("Show collections %v: %v", i, resp)
}
}(&wg)
}
wg.Wait()
}
func TestProxy_Insert(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < testNum; i++ {
i := i
collectionName := "CreateCollection" + strconv.FormatInt(int64(i), 10)
req := &servicepb.RowBatch{
CollectionName: collectionName,
PartitionTag: "",
RowData: make([]*commonpb.Blob, 0),
HashKeys: make([]int32, 0),
}
wg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
bool, err := proxyClient.HasCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName})
if err != nil {
t.Error(err)
}
msg := "Has Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
if bool.Value {
resp, err := proxyClient.Insert(ctx, req)
if err != nil {
t.Error(err)
}
msg := "Insert into Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, resp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
}
}(&wg)
}
wg.Wait()
}
/*
func TestProxy_Search(t *testing.T) {
var wg sync.WaitGroup
//buf := make(chan int, testNum)
buf := make(chan int, 1)
wg.Add(1)
func(group *sync.WaitGroup) {
defer wg.Done()
queryResultChannels := []string{"QueryResult"}
bufSize := 1024
queryResultMsgStream := msgstream.NewPulsarMsgStream(ctx, int64(bufSize))
pulsarAddress := "pulsar://localhost:6650"
queryResultMsgStream.SetPulsarCient(pulsarAddress)
assert.NotEqual(t, queryResultMsgStream, nil, "query result message stream should not be nil!")
queryResultMsgStream.CreatePulsarProducers(queryResultChannels)
for {
select {
case <-ctx.Done():
t.Logf("query result message stream is closed ...")
queryResultMsgStream.Close()
case i := <- buf:
log.Printf("receive query request, reqID: %v", i)
for j := 0; j < 4; j++ {
searchResultMsg := &msgstream.SearchResultMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []int32{1},
},
SearchResult: internalpb.SearchResult{
MsgType: internalpb.MsgType_kSearchResult,
ReqID: int64(i),
},
}
msgPack := &msgstream.MsgPack{
Msgs: make([]*msgstream.TsMsg, 1),
}
var tsMsg msgstream.TsMsg = searchResultMsg
msgPack.Msgs[0] = &tsMsg
log.Printf("proxy_test, produce message...")
queryResultMsgStream.Produce(msgPack)
}
}
}
}(&wg)
for i := 0; i < testNum; i++ {
i := i
collectionName := "CreateCollection" + strconv.FormatInt(int64(i), 10)
req := &servicepb.Query{
CollectionName: collectionName,
}
wg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
bool, err := proxyClient.HasCollection(ctx, &servicepb.CollectionName{CollectionName: collectionName})
if err != nil {
t.Error(err)
}
msg := "Has Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
if bool.Value {
log.Printf("Search: %v", collectionName)
fn := func() error {
buf <- i
resp, err := proxyClient.Search(ctx, req)
t.Logf("response of search collection %v: %v", i, resp)
return err
}
err := Retry(10, time.Millisecond, fn)
if err != nil {
t.Error(err)
}
}
}(&wg)
}
wg.Wait()
}
*/
func TestProxy_DropCollection(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < testNum; i++ {
i := i
collectionName := "CreateCollection" + strconv.FormatInt(int64(i), 10)
req := &servicepb.CollectionName{
CollectionName: collectionName,
}
wg.Add(1)
go func(group *sync.WaitGroup) {
defer group.Done()
bool, err := proxyClient.HasCollection(ctx, req)
if err != nil {
t.Error(err)
}
msg := "Has Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, bool.Status.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
if bool.Value {
resp, err := proxyClient.DropCollection(ctx, req)
if err != nil {
t.Error(err)
}
msg := "Drop Collection " + strconv.Itoa(i) + " should succeed!"
assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_SUCCESS, msg)
t.Logf("response of insert collection %v: %v", i, resp)
}
}(&wg)
}
wg.Wait()
......
......@@ -69,7 +69,16 @@ func (it *InsertTask) Execute() error {
Msgs: make([]*msgstream.TsMsg, 1),
}
msgPack.Msgs[0] = &tsMsg
it.manipulationMsgStream.Produce(msgPack)
err := it.manipulationMsgStream.Produce(msgPack)
it.result = &servicepb.IntegerRangeResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
if err != nil {
it.result.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
it.result.Status.Reason = err.Error()
}
return nil
}
......@@ -78,7 +87,6 @@ func (it *InsertTask) PostExecute() error {
}
func (it *InsertTask) WaitToFinish() error {
defer it.cancel()
for {
select {
case err := <-it.done:
......@@ -146,7 +154,6 @@ func (cct *CreateCollectionTask) PostExecute() error {
}
func (cct *CreateCollectionTask) WaitToFinish() error {
defer cct.cancel()
for {
select {
case err := <-cct.done:
......@@ -214,7 +221,6 @@ func (dct *DropCollectionTask) PostExecute() error {
}
func (dct *DropCollectionTask) WaitToFinish() error {
defer dct.cancel()
for {
select {
case err := <-dct.done:
......@@ -287,7 +293,6 @@ func (qt *QueryTask) PostExecute() error {
}
func (qt *QueryTask) WaitToFinish() error {
defer qt.cancel()
for {
select {
case err := <-qt.done:
......@@ -300,7 +305,6 @@ func (qt *QueryTask) WaitToFinish() error {
}
func (qt *QueryTask) Notify(err error) {
defer qt.cancel()
defer func() {
qt.done <- err
}()
......@@ -414,7 +418,6 @@ func (hct *HasCollectionTask) PostExecute() error {
}
func (hct *HasCollectionTask) WaitToFinish() error {
defer hct.cancel()
for {
select {
case err := <-hct.done:
......@@ -484,7 +487,6 @@ func (dct *DescribeCollectionTask) PostExecute() error {
}
func (dct *DescribeCollectionTask) WaitToFinish() error {
defer dct.cancel()
for {
select {
case err := <-dct.done:
......@@ -554,7 +556,6 @@ func (sct *ShowCollectionsTask) PostExecute() error {
}
func (sct *ShowCollectionsTask) WaitToFinish() error {
defer sct.cancel()
for {
select {
case err := <-sct.done:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册