提交 eca1e254 编写于 作者: N neza2017 提交者: yefu.chen

Add msg id at master service

Signed-off-by: Nneza2017 <yefu.chen@zilliz.com>
上级 41e19756
......@@ -71,16 +71,16 @@ timeout(time: "${regressionTimeout}", unit: 'MINUTES') {
}
def etcdLabels = "app.kubernetes.io/instance=${env.HELM_RELEASE_NAME},app.kubernetes.io/name=etcd"
def minioLables = "release=${env.HELM_RELEASE_NAME},app=minio"
def pulsarLabels = "app.kubernetes.io/instance=${env.HELM_RELEASE_NAME},component=pulsar"
def pulsarLabels = "release=${env.HELM_RELEASE_NAME},app=pulsar"
sh "mkdir -p ${env.DEV_TEST_ARTIFACTS_PATH}"
sh "kubectl cp -n ${env.HELM_RELEASE_NAMESPACE} \$(kubectl get pod -n ${env.HELM_RELEASE_NAMESPACE} -l ${milvusLabels} -o jsonpath='{range.items[0]}{.metadata.name}'):logs ${env.DEV_TEST_ARTIFACTS_PATH}"
sh "kubectl logs -n ${env.HELM_RELEASE_NAMESPACE} \$(kubectl get pod -n ${env.HELM_RELEASE_NAMESPACE} -l ${etcdLabels} -o jsonpath='{range.items[*]}{.metadata.name} ') > ${env.DEV_TEST_ARTIFACTS_PATH}/etcd-${REGRESSION_SERVICE_TYPE}.log"
sh "kubectl logs -n ${env.HELM_RELEASE_NAMESPACE} \$(kubectl get pod -n ${env.HELM_RELEASE_NAMESPACE} -l ${minioLables} -o jsonpath='{range.items[*]}{.metadata.name} ') > ${env.DEV_TEST_ARTIFACTS_PATH}/minio-${REGRESSION_SERVICE_TYPE}.log"
if ("${REGRESSION_SERVICE_TYPE}" == "distributed") {
sh "kubectl logs -n ${env.HELM_RELEASE_NAMESPACE} \$(kubectl get pod -n ${env.HELM_RELEASE_NAMESPACE} -l ${pulsarLabels} -o jsonpath='{range.items[*]}{.metadata.name} ') > ${env.DEV_TEST_ARTIFACTS_PATH}/pulsar-${REGRESSION_SERVICE_TYPE}.log"
}
// if ("${REGRESSION_SERVICE_TYPE}" == "distributed") {
// sh "kubectl logs -n ${env.HELM_RELEASE_NAMESPACE} \$(kubectl get pod -n ${env.HELM_RELEASE_NAMESPACE} -l ${pulsarLabels} -o jsonpath='{range.items[*]}{.metadata.name} ') > ${env.DEV_TEST_ARTIFACTS_PATH}/pulsar-${REGRESSION_SERVICE_TYPE}.log"
// }
archiveArtifacts artifacts: "${env.DEV_TEST_ARTIFACTS_PATH}/**", allowEmptyArchive: true
}
}
......
......@@ -8,14 +8,14 @@ import (
"strings"
"syscall"
"github.com/zilliztech/milvus-distributed/internal/logutil"
"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
"github.com/zilliztech/milvus-distributed/internal/logutil"
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
"github.com/zilliztech/milvus-distributed/internal/msgstream/rmqms"
"github.com/zilliztech/milvus-distributed/internal/util/rocksmq/server/rocksmq"
"github.com/zilliztech/milvus-distributed/internal/util/trace"
)
func newMsgFactory(localMsg bool) msgstream.Factory {
......@@ -57,6 +57,12 @@ func (mr *MilvusRoles) EnvValue(env string) bool {
}
func (mr *MilvusRoles) Run(localMsg bool) {
closer := trace.InitTracing("singleNode")
if closer != nil {
defer closer.Close()
}
if !mr.HasAnyRole() {
return
}
......
......@@ -118,9 +118,11 @@ func (s *Server) Run() error {
}
func (s *Server) Stop() error {
if s.closer != nil {
if err := s.closer.Close(); err != nil {
return err
}
}
s.cancel()
if s.grpcServer != nil {
s.grpcServer.GracefulStop()
......@@ -149,17 +151,13 @@ func (s *Server) init() error {
log.Debug("DataNode port", zap.Int("port", Params.Port))
tracer, closer, err := trace.InitTracing(fmt.Sprintf("data_node ip: %s, port: %d", Params.IP, Params.Port))
if err != nil {
log.Error("data_node", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
closer := trace.InitTracing(fmt.Sprintf("data_node ip: %s, port: %d", Params.IP, Params.Port))
s.closer = closer
s.wg.Add(1)
go s.startGrpcLoop(Params.Port)
// wait for grpc server loop start
err = <-s.grpcErrChan
err := <-s.grpcErrChan
if err != nil {
return err
}
......
......@@ -66,11 +66,7 @@ func (s *Server) init() error {
Params.Init()
Params.LoadFromEnv()
tracer, closer, err := trace.InitTracing("data_service")
if err != nil {
log.Error("data_service", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
closer := trace.InitTracing("data_service")
s.closer = closer
s.wg.Add(1)
......@@ -147,9 +143,11 @@ func (s *Server) start() error {
func (s *Server) Stop() error {
var err error
if s.closer != nil {
if err = s.closer.Close(); err != nil {
return err
}
}
s.cancel()
if s.grpcServer != nil {
......
......@@ -97,11 +97,7 @@ func (s *Server) init() error {
indexnode.Params.IP = Params.IP
indexnode.Params.Address = Params.Address
tracer, closer, err := trace.InitTracing(fmt.Sprintf("index_node_%d", indexnode.Params.NodeID))
if err != nil {
log.Error("index_node", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
closer := trace.InitTracing(fmt.Sprintf("index_node_%d", indexnode.Params.NodeID))
s.closer = closer
Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10)
......
......@@ -56,11 +56,7 @@ func (s *Server) init() error {
Params.Init()
indexservice.Params.Init()
tracer, closer, err := trace.InitTracing("index_service")
if err != nil {
log.Error("index_service", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
closer := trace.InitTracing("index_service")
s.closer = closer
s.loopWg.Add(1)
......@@ -86,9 +82,11 @@ func (s *Server) start() error {
}
func (s *Server) Stop() error {
if s.closer != nil {
if err := s.closer.Close(); err != nil {
return err
}
}
if s.indexservice != nil {
s.indexservice.Stop()
}
......
......@@ -96,16 +96,12 @@ func (s *Server) init() error {
ctx := context.Background()
tracer, closer, err := trace.InitTracing("master_service")
if err != nil {
log.Error("master_service", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
closer := trace.InitTracing("master_service")
s.closer = closer
log.Debug("init params done")
err = s.startGrpc()
err := s.startGrpc()
if err != nil {
return err
}
......
......@@ -136,11 +136,7 @@ func (s *Server) init() error {
// for purpose of ID Allocator
proxynode.Params.MasterAddress = Params.MasterAddress
tracer, closer, err := trace.InitTracing(fmt.Sprintf("proxy_node ip: %s, port: %d", Params.IP, Params.Port))
if err != nil {
log.Error("proxy_node", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
closer := trace.InitTracing(fmt.Sprintf("proxy_node ip: %s, port: %d", Params.IP, Params.Port))
s.closer = closer
log.Debug("proxynode", zap.String("proxy host", Params.IP))
......@@ -243,7 +239,11 @@ func (s *Server) start() error {
func (s *Server) Stop() error {
var err error
s.closer.Close()
if s.closer != nil {
if err = s.closer.Close(); err != nil {
return err
}
}
if s.grpcServer != nil {
s.grpcServer.GracefulStop()
......
......@@ -73,11 +73,7 @@ func (s *Server) init() error {
proxyservice.Params.Init()
log.Debug("init params done")
tracer, closer, err := trace.InitTracing("proxy_service")
if err != nil {
log.Error("proxy_service", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
closer := trace.InitTracing("proxy_service")
s.closer = closer
s.wg.Add(1)
......@@ -137,9 +133,11 @@ func (s *Server) start() error {
}
func (s *Server) Stop() error {
if s.closer != nil {
if err := s.closer.Close(); err != nil {
return err
}
}
s.cancel()
s.closer.Close()
err := s.proxyservice.Stop()
......
......@@ -77,18 +77,14 @@ func (s *Server) init() error {
qn.Params.QueryNodePort = int64(Params.QueryNodePort)
qn.Params.QueryNodeID = Params.QueryNodeID
tracer, closer, err := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.QueryNodeIP, Params.QueryNodePort))
if err != nil {
log.Error("query_node", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
closer := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.QueryNodeIP, Params.QueryNodePort))
s.closer = closer
log.Debug("QueryNode", zap.Int("port", Params.QueryNodePort))
s.wg.Add(1)
go s.startGrpcLoop(Params.QueryNodePort)
// wait for grpc server loop start
err = <-s.grpcErrChan
err := <-s.grpcErrChan
if err != nil {
return err
}
......@@ -257,9 +253,11 @@ func (s *Server) Run() error {
}
func (s *Server) Stop() error {
if s.closer != nil {
if err := s.closer.Close(); err != nil {
return err
}
}
s.cancel()
if s.grpcServer != nil {
......
......@@ -81,11 +81,7 @@ func (s *Server) init() error {
Params.Init()
qs.Params.Init()
tracer, closer, err := trace.InitTracing("query_service")
if err != nil {
log.Error("query_service", zap.String("init trace err", err.Error()))
}
opentracing.SetGlobalTracer(tracer)
closer := trace.InitTracing("query_service")
s.closer = closer
s.wg.Add(1)
......@@ -185,11 +181,12 @@ func (s *Server) start() error {
}
func (s *Server) Stop() error {
err := s.closer.Close()
if err != nil {
if s.closer != nil {
if err := s.closer.Close(); err != nil {
return err
}
err = s.queryservice.Stop()
}
err := s.queryservice.Stop()
s.loopCancel()
if s.grpcServer != nil {
s.grpcServer.GracefulStop()
......
......@@ -879,7 +879,7 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti
Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
}, nil
}
log.Debug("CreateCollection ", zap.String("name", in.CollectionName))
log.Debug("CreateCollection ", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
t := &CreateCollectionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
......@@ -891,13 +891,13 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
log.Debug("CreateCollection failed", zap.String("name", in.CollectionName))
log.Debug("CreateCollection failed", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "Create collection failed: " + err.Error(),
}, nil
}
log.Debug("CreateCollection Success", zap.String("name", in.CollectionName))
log.Debug("CreateCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
......@@ -912,7 +912,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
}, nil
}
log.Debug("DropCollection", zap.String("name", in.CollectionName))
log.Debug("DropCollection", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
t := &DropCollectionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
......@@ -924,13 +924,13 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
log.Debug("DropCollection Failed", zap.String("name", in.CollectionName))
log.Debug("DropCollection Failed", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "Drop collection failed: " + err.Error(),
}, nil
}
log.Debug("DropCollection Success", zap.String("name", in.CollectionName))
log.Debug("DropCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
......@@ -948,7 +948,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
Value: false,
}, nil
}
log.Debug("HasCollection", zap.String("name", in.CollectionName))
log.Debug("HasCollection", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
t := &HasCollectionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
......@@ -961,7 +961,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
log.Debug("HasCollection Failed", zap.String("name", in.CollectionName))
log.Debug("HasCollection Failed", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
......@@ -970,7 +970,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
Value: false,
}, nil
}
log.Debug("HasCollection Success", zap.String("name", in.CollectionName))
log.Debug("HasCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
......@@ -992,7 +992,7 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl
CollectionID: 0,
}, nil
}
log.Debug("DescribeCollection", zap.String("name", in.CollectionName))
log.Debug("DescribeCollection", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
t := &DescribeCollectionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
......@@ -1005,7 +1005,7 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
log.Debug("DescribeCollection Failed", zap.String("name", in.CollectionName), zap.Error(err))
log.Debug("DescribeCollection Failed", zap.String("name", in.CollectionName), zap.Error(err), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
......@@ -1014,7 +1014,7 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl
Schema: nil,
}, nil
}
log.Debug("DescribeCollection Success", zap.String("name", in.CollectionName))
log.Debug("DescribeCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID))
t.Rsp.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
......@@ -1033,7 +1033,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
CollectionNames: nil,
}, nil
}
log.Debug("ShowCollections", zap.String("dbname", in.DbName))
log.Debug("ShowCollections", zap.String("dbname", in.DbName), zap.Int64("msgID", in.Base.MsgID))
t := &ShowCollectionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
......@@ -1048,7 +1048,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
log.Debug("ShowCollections failed", zap.String("dbname", in.DbName))
log.Debug("ShowCollections failed", zap.String("dbname", in.DbName), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.ShowCollectionsResponse{
CollectionNames: nil,
Status: &commonpb.Status{
......@@ -1057,7 +1057,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
},
}, nil
}
log.Debug("ShowCollections Success", zap.String("dbname", in.DbName), zap.Strings("collection names", t.Rsp.CollectionNames))
log.Debug("ShowCollections Success", zap.String("dbname", in.DbName), zap.Strings("collection names", t.Rsp.CollectionNames), zap.Int64("msgID", in.Base.MsgID))
t.Rsp.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
......@@ -1073,7 +1073,7 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition
Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
}, nil
}
log.Debug("CreatePartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName))
log.Debug("CreatePartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
t := &CreatePartitionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
......@@ -1085,13 +1085,13 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
log.Debug("CreatePartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName))
log.Debug("CreatePartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "create partition failed: " + err.Error(),
}, nil
}
log.Debug("CreatePartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName))
log.Debug("CreatePartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
......@@ -1106,7 +1106,7 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ
Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
}, nil
}
log.Debug("DropPartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName))
log.Debug("DropPartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
t := &DropPartitionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
......@@ -1118,13 +1118,13 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
log.Debug("DropPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName))
log.Debug("DropPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "DropPartition failed: " + err.Error(),
}, nil
}
log.Debug("DropPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName))
log.Debug("DropPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
......@@ -1142,7 +1142,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
Value: false,
}, nil
}
log.Debug("HasPartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName))
log.Debug("HasPartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
t := &HasPartitionReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
......@@ -1155,7 +1155,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
log.Debug("HasPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName))
log.Debug("HasPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
......@@ -1164,7 +1164,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
Value: false,
}, nil
}
log.Debug("HasPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName))
log.Debug("HasPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
......@@ -1232,7 +1232,7 @@ func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest)
Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
}, nil
}
log.Debug("CreateIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName))
log.Debug("CreateIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID))
t := &CreateIndexReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
......@@ -1244,13 +1244,13 @@ func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest)
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
log.Debug("CreateIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName))
log.Debug("CreateIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "CreateIndex failed, error = " + err.Error(),
}, nil
}
log.Debug("CreateIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName))
log.Debug("CreateIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
......@@ -1268,7 +1268,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ
IndexDescriptions: nil,
}, nil
}
log.Debug("DescribeIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName))
log.Debug("DescribeIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID))
t := &DescribeIndexReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
......@@ -1284,6 +1284,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
log.Debug("DescribeIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
......@@ -1296,7 +1297,7 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ
for _, i := range t.Rsp.IndexDescriptions {
idxNames = append(idxNames, i.IndexName)
}
log.Debug("DescribeIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Strings("index names", idxNames))
log.Debug("DescribeIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Strings("index names", idxNames), zap.Int64("msgID", in.Base.MsgID))
if len(t.Rsp.IndexDescriptions) == 0 {
t.Rsp.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_IndexNotExist,
......@@ -1319,7 +1320,7 @@ func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*c
Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
}, nil
}
log.Debug("DropIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName))
log.Debug("DropIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID))
t := &DropIndexReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
......@@ -1331,13 +1332,13 @@ func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*c
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
log.Debug("DropIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName))
log.Debug("DropIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "DropIndex failed, error = " + err.Error(),
}, nil
}
log.Debug("DropIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName))
log.Debug("DropIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
......@@ -1355,7 +1356,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment
IndexID: 0,
}, nil
}
log.Debug("DescribeSegment", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID))
log.Debug("DescribeSegment", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID))
t := &DescribeSegmentReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
......@@ -1371,7 +1372,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
log.Debug("DescribeSegment Failed", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID))
log.Debug("DescribeSegment Failed", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.DescribeSegmentResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
......@@ -1380,7 +1381,7 @@ func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegment
IndexID: 0,
}, nil
}
log.Debug("DescribeSegment Success", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID))
log.Debug("DescribeSegment Success", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID))
t.Rsp.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
......@@ -1399,7 +1400,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques
SegmentIDs: nil,
}, nil
}
log.Debug("ShowSegments", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID))
log.Debug("ShowSegments", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64("msgID", in.Base.MsgID))
t := &ShowSegmentReqTask{
baseReqTask: baseReqTask{
ctx: ctx,
......@@ -1415,6 +1416,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques
c.ddReqQueue <- t
err := t.WaitToFinish()
if err != nil {
log.Debug("ShowSegments Failed", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64("msgID", in.Base.MsgID))
return &milvuspb.ShowSegmentsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
......@@ -1423,7 +1425,7 @@ func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsReques
SegmentIDs: nil,
}, nil
}
log.Debug("ShowSegments Success", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64s("segments ids", t.Rsp.SegmentIDs))
log.Debug("ShowSegments Success", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64s("segments ids", t.Rsp.SegmentIDs), zap.Int64("msgID", in.Base.MsgID))
t.Rsp.Status = &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
......
......@@ -88,8 +88,8 @@ func (s *searchService) consumeSearch() {
if !ok {
continue
}
sp, ctx := trace.StartSpanFromContext(sm.BaseMsg.Ctx)
sm.BaseMsg.Ctx = ctx
sp, ctx := trace.StartSpanFromContext(sm.TraceCtx())
sm.SetTraceCtx(ctx)
err := s.collectionCheck(sm.CollectionID)
if err != nil {
s.emptySearchCollection.emptySearch(sm)
......
......@@ -17,23 +17,35 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
)
func InitTracing(serviceName string) (opentracing.Tracer, io.Closer, error) {
func InitTracing(serviceName string) io.Closer {
if opentracing.IsGlobalTracerRegistered() {
return nil
}
var cfg *config.Configuration
var err error
if true {
cfg, err := config.FromEnv()
cfg, err = config.FromEnv()
if err != nil {
return nil, nil, errors.New("trace from env error")
log.Error(err)
return nil
}
cfg.ServiceName = serviceName
return cfg.NewTracer()
}
cfg := &config.Configuration{
} else {
cfg = &config.Configuration{
ServiceName: serviceName,
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
}
return cfg.NewTracer()
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
log.Error(err)
return nil
}
opentracing.SetGlobalTracer(tracer)
return closer
}
func StartSpanFromContext(ctx context.Context, opts ...opentracing.StartSpanOption) (opentracing.Span, context.Context) {
......
......@@ -7,7 +7,6 @@ import (
"errors"
"github.com/opentracing/opentracing-go"
oplog "github.com/opentracing/opentracing-go/log"
)
......@@ -18,8 +17,7 @@ type simpleStruct struct {
func TestTracing(t *testing.T) {
//Already Init in each framework, this can be ignored in debug
tracer, closer, _ := InitTracing("test")
opentracing.SetGlobalTracer(tracer)
closer := InitTracing("test")
defer closer.Close()
// context normally can be propagated through func params
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册