From 62065f75d18327acb7abb7cbc272d8496c77845d Mon Sep 17 00:00:00 2001 From: Yaron Schneider Date: Tue, 13 Jun 2023 23:06:25 -0700 Subject: [PATCH] Add direction for bindings (#6509) * add direction for bindings Signed-off-by: yaron2 * linter Signed-off-by: yaron2 * fix test field Signed-off-by: yaron2 * use consts Signed-off-by: yaron2 * use correct format to read from component yamls Signed-off-by: yaron2 --------- Signed-off-by: yaron2 --- pkg/runtime/runtime.go | 30 +++++++ pkg/runtime/runtime_test.go | 100 +++++++++++++++++++++ tests/config/dapr_cron_binding.yaml | 2 + tests/config/dapr_kafka_bindings_grpc.yaml | 2 + 4 files changed, 134 insertions(+) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 7689c751..b06457cf 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -122,6 +122,9 @@ const ( bindingsConcurrencyParallel = "parallel" bindingsConcurrencySequential = "sequential" pubsubName = "pubsubName" + bindingDirection = "direction" + inputBinding = "input" + outputBinding = "output" // hot reloading is currently unsupported, but // setting this environment variable restores the @@ -1705,7 +1708,30 @@ func (a *DaprRuntime) isAppSubscribedToBinding(binding string) (bool, error) { return false, nil } +func isBindingOfDirection(direction string, metadata []componentsV1alpha1.MetadataItem) bool { + directionFound := false + + for _, m := range metadata { + if strings.EqualFold(m.Name, bindingDirection) { + directionFound = true + + directions := strings.Split(m.Value.String(), ",") + for _, d := range directions { + if strings.TrimSpace(strings.ToLower(d)) == direction { + return true + } + } + } + } + + return !directionFound +} + func (a *DaprRuntime) initInputBinding(c componentsV1alpha1.Component) error { + if !isBindingOfDirection(inputBinding, c.Spec.Metadata) { + return nil + } + fName := c.LogName() binding, err := a.bindingsRegistry.CreateInputBinding(c.Spec.Type, c.Spec.Version, fName) if err != nil { @@ -1732,6 +1758,10 @@ func (a *DaprRuntime) initInputBinding(c componentsV1alpha1.Component) error { } func (a *DaprRuntime) initOutputBinding(c componentsV1alpha1.Component) error { + if !isBindingOfDirection(outputBinding, c.Spec.Metadata) { + return nil + } + fName := c.LogName() binding, err := a.bindingsRegistry.CreateOutputBinding(c.Spec.Type, c.Spec.Version, fName) if err != nil { diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index 4b63caeb..6543a4a7 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -6137,3 +6137,103 @@ func TestHTTPEndpointsUpdate(t *testing.T) { _, exists = rt.compStore.GetHTTPEndpoint(endpoint3.Name) assert.True(t, exists, fmt.Sprintf("expect http endpoint with name: %s", endpoint3.Name)) } + +func TestIsBindingOfDirection(t *testing.T) { + t.Run("no direction in metadata for input binding", func(t *testing.T) { + m := []componentsV1alpha1.MetadataItem{} + r := isBindingOfDirection("input", m) + + assert.True(t, r) + }) + + t.Run("no direction in metadata for output binding", func(t *testing.T) { + m := []componentsV1alpha1.MetadataItem{} + r := isBindingOfDirection("output", m) + + assert.True(t, r) + }) + + t.Run("input direction in metadata", func(t *testing.T) { + m := []componentsV1alpha1.MetadataItem{ + { + Name: "direction", + Value: componentsV1alpha1.DynamicValue{ + JSON: v1.JSON{ + Raw: []byte("input"), + }, + }, + }, + } + r := isBindingOfDirection("input", m) + f := isBindingOfDirection("output", m) + + assert.True(t, r) + assert.False(t, f) + }) + + t.Run("output direction in metadata", func(t *testing.T) { + m := []componentsV1alpha1.MetadataItem{ + { + Name: "direction", + Value: componentsV1alpha1.DynamicValue{ + JSON: v1.JSON{ + Raw: []byte("output"), + }, + }, + }, + } + r := isBindingOfDirection("output", m) + f := isBindingOfDirection("input", m) + + assert.True(t, r) + assert.False(t, f) + }) + + t.Run("input and output direction in metadata", func(t *testing.T) { + m := []componentsV1alpha1.MetadataItem{ + { + Name: "direction", + Value: componentsV1alpha1.DynamicValue{ + JSON: v1.JSON{ + Raw: []byte("input, output"), + }, + }, + }, + } + r := isBindingOfDirection("output", m) + f := isBindingOfDirection("input", m) + + assert.True(t, r) + assert.True(t, f) + }) + + t.Run("invalid direction for input binding", func(t *testing.T) { + m := []componentsV1alpha1.MetadataItem{ + { + Name: "direction", + Value: componentsV1alpha1.DynamicValue{ + JSON: v1.JSON{ + Raw: []byte("aaa"), + }, + }, + }, + } + f := isBindingOfDirection("input", m) + assert.False(t, f) + }) + + t.Run("invalid direction for output binding", func(t *testing.T) { + m := []componentsV1alpha1.MetadataItem{ + { + Name: "direction", + Value: componentsV1alpha1.DynamicValue{ + JSON: v1.JSON{ + Raw: []byte("aaa"), + }, + }, + }, + } + f := isBindingOfDirection("output", m) + assert.False(t, f) + }) +} diff --git a/tests/config/dapr_cron_binding.yaml b/tests/config/dapr_cron_binding.yaml index eb5e5c98..2fcb5f2c 100644 --- a/tests/config/dapr_cron_binding.yaml +++ b/tests/config/dapr_cron_binding.yaml @@ -21,3 +21,5 @@ spec: metadata: - name: schedule value: "@every 1s" + - name: direction + value: input diff --git a/tests/config/dapr_kafka_bindings_grpc.yaml b/tests/config/dapr_kafka_bindings_grpc.yaml index ebb7f0d1..0e0c673e 100644 --- a/tests/config/dapr_kafka_bindings_grpc.yaml +++ b/tests/config/dapr_kafka_bindings_grpc.yaml @@ -35,3 +35,5 @@ spec: value: "false" - name: initialOffset value: oldest + - name: direction + value: input, output -- GitLab