From e0cbacba59efd66bec47da6e1500906933175144 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Thu, 2 Jun 2022 10:12:03 +0800 Subject: [PATCH] Check ignorable error to prevent unnecessary panic (#17317) Signed-off-by: bigsheeper --- internal/common/error.go | 36 +++++++++++++++++++ internal/common/error_test.go | 31 ++++++++++++++++ internal/datanode/flow_graph_dd_node.go | 5 ++- .../mqwrapper/kafka/kafka_producer.go | 9 ++--- 4 files changed, 76 insertions(+), 5 deletions(-) create mode 100644 internal/common/error.go create mode 100644 internal/common/error_test.go diff --git a/internal/common/error.go b/internal/common/error.go new file mode 100644 index 000000000..9e0982346 --- /dev/null +++ b/internal/common/error.go @@ -0,0 +1,36 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +type IgnorableError struct { + msg string +} + +func (i *IgnorableError) Error() string { + return i.msg +} + +func NewIgnorableError(err error) error { + return &IgnorableError{ + msg: err.Error(), + } +} + +func IsIgnorableError(err error) bool { + _, ok := err.(*IgnorableError) + return ok +} diff --git a/internal/common/error_test.go b/internal/common/error_test.go new file mode 100644 index 000000000..3cc4650bc --- /dev/null +++ b/internal/common/error_test.go @@ -0,0 +1,31 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIgnorableError(t *testing.T) { + err := fmt.Errorf("test err") + iErr := NewIgnorableError(err) + assert.True(t, IsIgnorableError(iErr)) + assert.False(t, IsIgnorableError(err)) +} diff --git a/internal/datanode/flow_graph_dd_node.go b/internal/datanode/flow_graph_dd_node.go index 7819746a0..d99457920 100644 --- a/internal/datanode/flow_graph_dd_node.go +++ b/internal/datanode/flow_graph_dd_node.go @@ -26,6 +26,7 @@ import ( "github.com/opentracing/opentracing-go" "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/mq/msgstream" @@ -177,7 +178,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg { if err != nil { err = fmt.Errorf("DDNode forward delete msg failed, vChannel = %s, err = %s", ddn.vchannelName, err) log.Error(err.Error()) - panic(err) + if !common.IsIgnorableError(err) { + panic(err) + } } fgMsg.startPositions = append(fgMsg.startPositions, msMsg.StartPositions()...) diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go index f40e13107..08eee9807 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go @@ -2,14 +2,15 @@ package kafka import ( "context" - "errors" + "fmt" "sync" "time" - "github.com/milvus-io/milvus/internal/log" + "github.com/confluentinc/confluent-kafka-go/kafka" "go.uber.org/zap" - "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" ) @@ -37,7 +38,7 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMe e, ok := <-kp.deliveryChan if !ok { log.Error("kafka produce message fail because of delivery chan is closed", zap.String("topic", kp.topic)) - return nil, errors.New("delivery chan of kafka producer is closed") + return nil, common.NewIgnorableError(fmt.Errorf("delivery chan of kafka producer is closed")) } m := e.(*kafka.Message) -- GitLab