未验证 提交 62065f75 编写于 作者: Y Yaron Schneider 提交者: GitHub

Add direction for bindings (#6509)

* add direction for bindings
Signed-off-by: Nyaron2 <schneider.yaron@live.com>

* linter
Signed-off-by: Nyaron2 <schneider.yaron@live.com>

* fix test field
Signed-off-by: Nyaron2 <schneider.yaron@live.com>

* use consts
Signed-off-by: Nyaron2 <schneider.yaron@live.com>

* use correct format to read from component yamls
Signed-off-by: Nyaron2 <schneider.yaron@live.com>

---------
Signed-off-by: Nyaron2 <schneider.yaron@live.com>
上级 715457a0
......@@ -122,6 +122,9 @@ const (
bindingsConcurrencyParallel = "parallel"
bindingsConcurrencySequential = "sequential"
pubsubName = "pubsubName"
bindingDirection = "direction"
inputBinding = "input"
outputBinding = "output"
// hot reloading is currently unsupported, but
// setting this environment variable restores the
......@@ -1705,7 +1708,30 @@ func (a *DaprRuntime) isAppSubscribedToBinding(binding string) (bool, error) {
return false, nil
}
func isBindingOfDirection(direction string, metadata []componentsV1alpha1.MetadataItem) bool {
directionFound := false
for _, m := range metadata {
if strings.EqualFold(m.Name, bindingDirection) {
directionFound = true
directions := strings.Split(m.Value.String(), ",")
for _, d := range directions {
if strings.TrimSpace(strings.ToLower(d)) == direction {
return true
}
}
}
}
return !directionFound
}
func (a *DaprRuntime) initInputBinding(c componentsV1alpha1.Component) error {
if !isBindingOfDirection(inputBinding, c.Spec.Metadata) {
return nil
}
fName := c.LogName()
binding, err := a.bindingsRegistry.CreateInputBinding(c.Spec.Type, c.Spec.Version, fName)
if err != nil {
......@@ -1732,6 +1758,10 @@ func (a *DaprRuntime) initInputBinding(c componentsV1alpha1.Component) error {
}
func (a *DaprRuntime) initOutputBinding(c componentsV1alpha1.Component) error {
if !isBindingOfDirection(outputBinding, c.Spec.Metadata) {
return nil
}
fName := c.LogName()
binding, err := a.bindingsRegistry.CreateOutputBinding(c.Spec.Type, c.Spec.Version, fName)
if err != nil {
......
......@@ -6137,3 +6137,103 @@ func TestHTTPEndpointsUpdate(t *testing.T) {
_, exists = rt.compStore.GetHTTPEndpoint(endpoint3.Name)
assert.True(t, exists, fmt.Sprintf("expect http endpoint with name: %s", endpoint3.Name))
}
func TestIsBindingOfDirection(t *testing.T) {
t.Run("no direction in metadata for input binding", func(t *testing.T) {
m := []componentsV1alpha1.MetadataItem{}
r := isBindingOfDirection("input", m)
assert.True(t, r)
})
t.Run("no direction in metadata for output binding", func(t *testing.T) {
m := []componentsV1alpha1.MetadataItem{}
r := isBindingOfDirection("output", m)
assert.True(t, r)
})
t.Run("input direction in metadata", func(t *testing.T) {
m := []componentsV1alpha1.MetadataItem{
{
Name: "direction",
Value: componentsV1alpha1.DynamicValue{
JSON: v1.JSON{
Raw: []byte("input"),
},
},
},
}
r := isBindingOfDirection("input", m)
f := isBindingOfDirection("output", m)
assert.True(t, r)
assert.False(t, f)
})
t.Run("output direction in metadata", func(t *testing.T) {
m := []componentsV1alpha1.MetadataItem{
{
Name: "direction",
Value: componentsV1alpha1.DynamicValue{
JSON: v1.JSON{
Raw: []byte("output"),
},
},
},
}
r := isBindingOfDirection("output", m)
f := isBindingOfDirection("input", m)
assert.True(t, r)
assert.False(t, f)
})
t.Run("input and output direction in metadata", func(t *testing.T) {
m := []componentsV1alpha1.MetadataItem{
{
Name: "direction",
Value: componentsV1alpha1.DynamicValue{
JSON: v1.JSON{
Raw: []byte("input, output"),
},
},
},
}
r := isBindingOfDirection("output", m)
f := isBindingOfDirection("input", m)
assert.True(t, r)
assert.True(t, f)
})
t.Run("invalid direction for input binding", func(t *testing.T) {
m := []componentsV1alpha1.MetadataItem{
{
Name: "direction",
Value: componentsV1alpha1.DynamicValue{
JSON: v1.JSON{
Raw: []byte("aaa"),
},
},
},
}
f := isBindingOfDirection("input", m)
assert.False(t, f)
})
t.Run("invalid direction for output binding", func(t *testing.T) {
m := []componentsV1alpha1.MetadataItem{
{
Name: "direction",
Value: componentsV1alpha1.DynamicValue{
JSON: v1.JSON{
Raw: []byte("aaa"),
},
},
},
}
f := isBindingOfDirection("output", m)
assert.False(t, f)
})
}
......@@ -21,3 +21,5 @@ spec:
metadata:
- name: schedule
value: "@every 1s"
- name: direction
value: input
......@@ -35,3 +35,5 @@ spec:
value: "false"
- name: initialOffset
value: oldest
- name: direction
value: input, output
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册