未验证 提交 90b3b2f9 编写于 作者: Y Yaron Schneider 提交者: GitHub

refactor (#113)

* refactor

* fix warn format, fix controller runtime name

* fix dep

* added versioned  deploy yaml, fixed default config

* fixed typo, fixed default config load

* added lock comments per feedback
上级 6e1d6ac1
...@@ -260,22 +260,6 @@ ...@@ -260,22 +260,6 @@
revision = "0ca9ea5df5451ffdf184b4428c902747c2c11cd7" revision = "0ca9ea5df5451ffdf184b4428c902747c2c11cd7"
version = "v1.0.0" version = "v1.0.0"
[[projects]]
digest = "1:33082c63746b464db3d1c2c07a1396d860484d97fe857ef9e8668a9b406db09f"
name = "github.com/go-redis/redis"
packages = [
".",
"internal",
"internal/consistenthash",
"internal/hashtag",
"internal/pool",
"internal/proto",
"internal/util",
]
pruneopts = "UT"
revision = "d22fde8721cc915a55aeb6b00944a76a92bfeb6e"
version = "v6.15.2"
[[projects]] [[projects]]
digest = "1:4d02824a56d268f74a6b6fdd944b20b58a77c3d70e81008b3ee0c4f1a6777340" digest = "1:4d02824a56d268f74a6b6fdd944b20b58a77c3d70e81008b3ee0c4f1a6777340"
name = "github.com/gogo/protobuf" name = "github.com/gogo/protobuf"
...@@ -536,14 +520,6 @@ ...@@ -536,14 +520,6 @@
revision = "ba968bfe8b2f7e042a574c888954fccecfa385b4" revision = "ba968bfe8b2f7e042a574c888954fccecfa385b4"
version = "v0.8.1" version = "v0.8.1"
[[projects]]
digest = "1:0028cb19b2e4c3112225cd871870f2d9cf49b9b4276531f03438a88e94be86fe"
name = "github.com/pmezard/go-difflib"
packages = ["difflib"]
pruneopts = "UT"
revision = "792786c7400a136282c1664665ae0a8db921c6c2"
version = "v1.0.0"
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:d6efa822e9b1e35da4653e912128933143d37d2358eebe2b35c1a5f60e028bd0" digest = "1:d6efa822e9b1e35da4653e912128933143d37d2358eebe2b35c1a5f60e028bd0"
...@@ -584,14 +560,6 @@ ...@@ -584,14 +560,6 @@
pruneopts = "UT" pruneopts = "UT"
revision = "75d898a42a940fbc854dfd1a4199eabdc00cf024" revision = "75d898a42a940fbc854dfd1a4199eabdc00cf024"
[[projects]]
digest = "1:972c2427413d41a1e06ca4897e8528e5a1622894050e2f527b38ddf0f343f759"
name = "github.com/stretchr/testify"
packages = ["assert"]
pruneopts = "UT"
revision = "ffdc059bfe9ce6a4e144ba849dbedead332c6053"
version = "v1.3.0"
[[projects]] [[projects]]
digest = "1:c468422f334a6b46a19448ad59aaffdfc0a36b08fdcc1c749a0b29b6453d7e59" digest = "1:c468422f334a6b46a19448ad59aaffdfc0a36b08fdcc1c749a0b29b6453d7e59"
name = "github.com/valyala/bytebufferpool" name = "github.com/valyala/bytebufferpool"
...@@ -1084,7 +1052,6 @@ ...@@ -1084,7 +1052,6 @@
"github.com/aws/aws-sdk-go/service/sqs", "github.com/aws/aws-sdk-go/service/sqs",
"github.com/eclipse/paho.mqtt.golang", "github.com/eclipse/paho.mqtt.golang",
"github.com/ghodss/yaml", "github.com/ghodss/yaml",
"github.com/go-redis/redis",
"github.com/golang/protobuf/proto", "github.com/golang/protobuf/proto",
"github.com/golang/protobuf/ptypes/any", "github.com/golang/protobuf/ptypes/any",
"github.com/golang/protobuf/ptypes/empty", "github.com/golang/protobuf/ptypes/empty",
...@@ -1100,12 +1067,12 @@ ...@@ -1100,12 +1067,12 @@
"github.com/qiangxue/fasthttp-routing", "github.com/qiangxue/fasthttp-routing",
"github.com/satori/go.uuid", "github.com/satori/go.uuid",
"github.com/streadway/amqp", "github.com/streadway/amqp",
"github.com/stretchr/testify/assert",
"github.com/valyala/fasthttp", "github.com/valyala/fasthttp",
"go.opencensus.io/trace", "go.opencensus.io/trace",
"google.golang.org/api/option", "google.golang.org/api/option",
"google.golang.org/grpc", "google.golang.org/grpc",
"google.golang.org/grpc/metadata", "google.golang.org/grpc/metadata",
"gopkg.in/yaml.v2",
"k8s.io/api/apps/v1", "k8s.io/api/apps/v1",
"k8s.io/api/core/v1", "k8s.io/api/core/v1",
"k8s.io/apimachinery/pkg/api/errors", "k8s.io/apimachinery/pkg/api/errors",
......
...@@ -46,10 +46,6 @@ ignored = ["github.com/a8m/documentdb-go"] ...@@ -46,10 +46,6 @@ ignored = ["github.com/a8m/documentdb-go"]
name = "github.com/ghodss/yaml" name = "github.com/ghodss/yaml"
version = "1.0.0" version = "1.0.0"
[[constraint]]
name = "github.com/go-redis/redis"
version = "6.15.1"
[[constraint]] [[constraint]]
name = "github.com/gorilla/mux" name = "github.com/gorilla/mux"
version = "1.7.0" version = "1.7.0"
......
...@@ -7,7 +7,7 @@ GIT_VERSION = $(shell git describe --always --abbrev=7 --dirty) ...@@ -7,7 +7,7 @@ GIT_VERSION = $(shell git describe --always --abbrev=7 --dirty)
TARGETS ?= darwin linux windows TARGETS ?= darwin linux windows
ARCH ?= amd64 ARCH ?= amd64
CGO ?= 0 CGO ?= 0
BINARIES ?= action placement controller BINARIES ?= actionsrt placement controller
ifdef REL_VERSION ifdef REL_VERSION
ACTIONS_VERSION := $(REL_VERSION) ACTIONS_VERSION := $(REL_VERSION)
......
...@@ -16,7 +16,7 @@ jobs: ...@@ -16,7 +16,7 @@ jobs:
poolImage: macOS-latest poolImage: macOS-latest
targetOS: darwin targetOS: darwin
targetArch: amd64 targetArch: amd64
binaryName: action binaryName: actionsrt
- template: 'build-binary-template.yml' - template: 'build-binary-template.yml'
parameters: parameters:
poolImage: macOS-latest poolImage: macOS-latest
...@@ -34,7 +34,7 @@ jobs: ...@@ -34,7 +34,7 @@ jobs:
poolImage: macOS-latest poolImage: macOS-latest
targetOS: linux targetOS: linux
targetArch: arm targetArch: arm
binaryName: action binaryName: actionsrt
- template: 'build-binary-template.yml' - template: 'build-binary-template.yml'
parameters: parameters:
poolImage: macOS-latest poolImage: macOS-latest
...@@ -52,7 +52,7 @@ jobs: ...@@ -52,7 +52,7 @@ jobs:
poolImage: ubuntu-latest poolImage: ubuntu-latest
targetOS: linux targetOS: linux
targetArch: amd64 targetArch: amd64
binaryName: action binaryName: actionsrt
- template: 'build-binary-template.yml' - template: 'build-binary-template.yml'
parameters: parameters:
poolImage: ubuntu-latest poolImage: ubuntu-latest
...@@ -70,7 +70,7 @@ jobs: ...@@ -70,7 +70,7 @@ jobs:
poolImage: windows-2019 poolImage: windows-2019
targetOS: windows targetOS: windows
targetArch: amd64 targetArch: amd64
binaryName: action binaryName: actionsrt
- template: 'build-binary-template.yml' - template: 'build-binary-template.yml'
parameters: parameters:
poolImage: windows-2019 poolImage: windows-2019
...@@ -87,31 +87,31 @@ jobs: ...@@ -87,31 +87,31 @@ jobs:
pool: pool:
vmImage: 'windows-2019' vmImage: 'windows-2019'
dependsOn: dependsOn:
- build_darwin_amd64_action - build_darwin_amd64_actionsrt
- build_darwin_amd64_placement - build_darwin_amd64_placement
- build_darwin_amd64_controller - build_darwin_amd64_controller
- build_linux_amd64_action - build_linux_amd64_actionsrt
- build_linux_amd64_placement - build_linux_amd64_placement
- build_linux_amd64_controller - build_linux_amd64_controller
- build_linux_arm_action - build_linux_arm_actionsrt
- build_linux_arm_placement - build_linux_arm_placement
- build_linux_arm_controller - build_linux_arm_controller
- build_windows_amd64_action - build_windows_amd64_actionsrt
- build_windows_amd64_placement - build_windows_amd64_placement
- build_windows_amd64_controller - build_windows_amd64_controller
condition: | condition: |
and and
( (
eq(dependencies.build_darwin_amd64_action.result, 'Succeeded'), eq(dependencies.build_darwin_amd64_actionsrt.result, 'Succeeded'),
eq(dependencies.build_darwin_amd64_placement.result, 'Succeeded'), eq(dependencies.build_darwin_amd64_placement.result, 'Succeeded'),
eq(dependencies.build_darwin_amd64_controller.result, 'Succeeded'), eq(dependencies.build_darwin_amd64_controller.result, 'Succeeded'),
eq(dependencies.build_linux_amd64_action.result, 'Succeeded'), eq(dependencies.build_linux_amd64_actionsrt.result, 'Succeeded'),
eq(dependencies.build_linux_amd64_placement.result, 'Succeeded'), eq(dependencies.build_linux_amd64_placement.result, 'Succeeded'),
eq(dependencies.build_linux_amd64_controller.result, 'Succeeded'), eq(dependencies.build_linux_amd64_controller.result, 'Succeeded'),
eq(dependencies.build_linux_arm_action.result, 'Succeeded'), eq(dependencies.build_linux_arm_actionsrt.result, 'Succeeded'),
eq(dependencies.build_linux_arm_placement.result, 'Succeeded'), eq(dependencies.build_linux_arm_placement.result, 'Succeeded'),
eq(dependencies.build_linux_arm_controller.result, 'Succeeded'), eq(dependencies.build_linux_arm_controller.result, 'Succeeded'),
eq(dependencies.build_windows_amd64_action.result, 'Succeeded'), eq(dependencies.build_windows_amd64_actionsrt.result, 'Succeeded'),
eq(dependencies.build_windows_amd64_placement.result, 'Succeeded'), eq(dependencies.build_windows_amd64_placement.result, 'Succeeded'),
eq(dependencies.build_windows_amd64_controller.result, 'Succeeded') eq(dependencies.build_windows_amd64_controller.result, 'Succeeded')
) )
......
...@@ -2,7 +2,7 @@ parameters: ...@@ -2,7 +2,7 @@ parameters:
poolImage: 'macOS-latest' poolImage: 'macOS-latest'
targetOS: 'darwin' targetOS: 'darwin'
targetArch: 'amd64' targetArch: 'amd64'
binaryName: 'action' binaryName: 'actionsrt'
jobs: jobs:
- job: build_${{ parameters.targetOS }}_${{ parameters.targetArch }}_${{ parameters.binaryName }} - job: build_${{ parameters.targetOS }}_${{ parameters.targetArch }}_${{ parameters.binaryName }}
......
package main
import (
"flag"
"os"
"os/signal"
"strconv"
log "github.com/Sirupsen/logrus"
"github.com/actionscore/actions/pkg/action"
"github.com/actionscore/actions/pkg/version"
)
func main() {
log.Infof("Starting Actions Runtime -- version %s -- commit %s", version.Version(), version.Commit())
mode := flag.String("mode", "standalone", "")
actionHTTPPort := flag.String("action-http-port", "3500", "")
actionGRPCPort := flag.String("action-grpc-port", "50001", "")
appPort := flag.String("app-port", "", "")
appProtocol := flag.String("protocol", "http", "")
eventSourcesPath := flag.String("event-sources-path", "./eventsources", "")
configurationName := flag.String("configuration-name", "", "")
actionID := flag.String("action-id", "", "")
apiAddress := flag.String("api-address", "", "")
placementServiceAddresss := flag.String("placement-address", "", "")
allowedOrigins := flag.String("allowed-origins", "*", "")
flag.Parse()
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt)
actionHTTP, _ := strconv.Atoi(*actionHTTPPort)
actionGRPC, _ := strconv.Atoi(*actionGRPCPort)
i := action.NewAction(*actionID, *appPort, *mode, *appProtocol, *eventSourcesPath, *configurationName, *apiAddress, *placementServiceAddresss, *allowedOrigins)
i.Run(actionHTTP, actionGRPC)
<-stop
}
package main
import (
"flag"
"fmt"
"os"
"os/signal"
"strconv"
"syscall"
"time"
log "github.com/Sirupsen/logrus"
global_config "github.com/actionscore/actions/pkg/config"
"github.com/actionscore/actions/pkg/modes"
actionsrt "github.com/actionscore/actions/pkg/runtime"
"github.com/actionscore/actions/pkg/version"
)
func main() {
log.Infof("starting Actions Runtime -- version %s -- commit %s", version.Version(), version.Commit())
mode := flag.String("mode", string(modes.StandaloneMode), "Runtime mode for Actions")
actionHTTPPort := flag.String("actions-http-port", fmt.Sprintf("%v", actionsrt.DefaultActionsHTTPPort), "HTTP port for Actions to listen on")
actionGRPCPort := flag.String("actions-grpc-port", fmt.Sprintf("%v", actionsrt.DefaultActionsGRPCPort), "gRPC port for Actions to listen on")
appPort := flag.String("app-port", "", "The port the application is listening on")
appProtocol := flag.String("protocol", string(actionsrt.HTTPProtocol), "Protocol for the application: gRPC or http")
componentsPath := flag.String("components-path", actionsrt.DefaultComponentsPath, "Path for components directory. Standalone mode only")
config := flag.String("config", "", "Path to config file, or name of a configuration object")
actionsID := flag.String("actions-id", "", "A unique ID for Actions. Used for Service Discovery and state")
controlPlaneAddress := flag.String("control-plane-address", "", "Address for an Actions control plane")
placementServiceAddresss := flag.String("placement-address", "", "Address for the Actions placement service")
allowedOrigins := flag.String("allowed-origins", actionsrt.DefaultAllowedOrigins, "Allowed HTTP origins")
flag.Parse()
actionHTTP, err := strconv.Atoi(*actionHTTPPort)
if err != nil {
log.Fatalf("error parsing actions-http-port flag: %s", err)
}
actionGRPC, err := strconv.Atoi(*actionGRPCPort)
if err != nil {
log.Fatalf("error parsing actions-grpc-port flag: %s", err)
}
applicationPort := 0
if *appPort != "" {
applicationPort, err = strconv.Atoi(*appPort)
if err != nil {
log.Fatalf("error parsing app-port: %s", err)
}
}
runtimeConfig := actionsrt.NewRuntimeConfig(*actionsID, *placementServiceAddresss, *controlPlaneAddress, *allowedOrigins, *config, *componentsPath,
*appProtocol, *mode, actionHTTP, actionGRPC, applicationPort)
var globalConfig *global_config.Configuration
if *config != "" {
switch modes.ActionsMode(*mode) {
case modes.KubernetesMode:
globalConfig, err = global_config.LoadKubernetesConfiguration(*config, *controlPlaneAddress)
case modes.StandaloneMode:
globalConfig, err = global_config.LoadStandaloneConfiguration(*config)
}
} else {
globalConfig = global_config.LoadDefaultConfiguration()
}
if err != nil {
log.Warnf("error loading config: %s. loading default config", err)
}
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGTERM, os.Interrupt)
rt := actionsrt.NewActionsRuntime(runtimeConfig, globalConfig)
err = rt.Run()
if err != nil {
log.Fatalf("error initializing Actions Runtime: %s", err)
}
<-stop
gracefulShutdownDuration := 5 * time.Second
log.Info("actions shutting down. Waiting 5 seconds to finish outstanding operations")
rt.Stop()
<-time.After(gracefulShutdownDuration)
}
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
) )
func main() { func main() {
log.Infof("Starting Actions Placement Service -- version %s -- commit %s", version.Version(), version.Commit()) log.Infof("starting Actions Placement Service -- version %s -- commit %s", version.Version(), version.Commit())
port := flag.String("port", "50005", "") port := flag.String("port", "50005", "")
flag.Parse() flag.Parse()
...@@ -22,6 +22,6 @@ func main() { ...@@ -22,6 +22,6 @@ func main() {
p := placement.NewPlacementService() p := placement.NewPlacementService()
go p.Run(*port) go p.Run(*port)
log.Infof("Placement Service started on port %s", *port) log.Infof("placement Service started on port %s", *port)
<-stop <-stop
} }
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: components.actions.io
spec:
group: actions.io
version: v1alpha1
names:
kind: Component
plural: components
singular: components
categories:
- all
- actions
scope: Namespaced
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: actions-controller
namespace: default
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: actions-controller
subjects:
- kind: ServiceAccount
name: actions-controller
namespace: default
roleRef:
kind: ClusterRole
name: cluster-admin
apiGroup: rbac.authorization.k8s.io
---
kind: Service
apiVersion: v1
metadata:
name: actions-api
spec:
selector:
app: actions-controller
ports:
- protocol: TCP
port: 80
targetPort: 6500
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: actions-controller
labels:
app: actions-controller
spec:
replicas: 1
selector:
matchLabels:
app: actions-controller
template:
metadata:
labels:
app: actions-controller
spec:
containers:
- name: actions-controller
image: actionscore.azurecr.io/actions:merge
env:
- name: RUNTIME_IMAGE
value: actionscore.azurecr.io/actions:merge
imagePullPolicy: Always
ports:
- containerPort: 6500
command: ["./controller"]
imagePullSecrets:
- name: actions-auth
serviceAccountName: actions-controller
---
kind: Service
apiVersion: v1
metadata:
name: actions-placement
spec:
selector:
app: actions-placement
ports:
- protocol: TCP
port: 80
targetPort: 50005
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: actions-placement
labels:
app: actions-placement
spec:
replicas: 1
selector:
matchLabels:
app: actions-placement
template:
metadata:
labels:
app: actions-placement
spec:
containers:
- name: actions-placement
image: actionscore.azurecr.io/actions:merge
imagePullPolicy: Always
ports:
- containerPort: 50005
command: ["./placement"]
imagePullSecrets:
- name: actions-auth
serviceAccountName: actions-controller
此差异已折叠。
package action
type ActionSource interface {
Init(eventSourceSpec EventSourceSpec) error
ReadAsync(metadata interface{}, callback func([]byte) error) error
Read(metadata interface{}) (interface{}, error)
Write(data interface{}) error
}
package action
import (
"net"
"os"
"strconv"
"testing"
"time"
"github.com/actionscore/actions/e2e/mocks"
"github.com/stretchr/testify/assert"
"github.com/valyala/fasthttp"
)
func TestInvokeActionGETWithOneParameter(t *testing.T) {
is := assert.New(t)
req := fasthttp.AcquireRequest()
req.SetRequestURI("http://localhost:3505/action/test-action/echo?param=1")
req.Header.Set("actions.action-address", "localhost:60006")
req.Header.SetMethod("GET")
resp := fasthttp.AcquireResponse()
client := &fasthttp.Client{}
client.Do(req, resp)
bodyBytes := resp.Body()
is.Equal(string(bodyBytes), "param=1;")
}
func TestInvokeActionGETWithTwoParameters(t *testing.T) {
is := assert.New(t)
req := fasthttp.AcquireRequest()
req.SetRequestURI("http://localhost:3505/action/test-action/echo?param1=1&param2=2")
req.Header.Set("actions.action-address", "localhost:60006")
req.Header.SetMethod("GET")
resp := fasthttp.AcquireResponse()
client := &fasthttp.Client{}
client.Do(req, resp)
bodyBytes := resp.Body()
body := string(bodyBytes)
is.True(body == "param1=1;param2=2;" || body == "param2=2;param1=1;")
}
func TestInvokeActionGETWithEncodedParameter(t *testing.T) {
is := assert.New(t)
req := fasthttp.AcquireRequest()
req.SetRequestURI("http://localhost:3505/action/test-action/echo?param=%24%26%2b%2c%2f%3a%3b%3d%3f%40")
req.Header.Set("actions.action-address", getIP()+":60006")
req.Header.SetMethod("GET")
resp := fasthttp.AcquireResponse()
client := &fasthttp.Client{}
client.Do(req, resp)
bodyBytes := resp.Body()
is.Equal(string(bodyBytes), "param=$&+,/:;=?@;")
}
func TestInvokeActionGETWithNoParameters(t *testing.T) {
is := assert.New(t)
req := fasthttp.AcquireRequest()
req.SetRequestURI("http://localhost:3505/action/test-action/echo")
req.Header.Set("actions.action-address", "localhost:60006")
req.Header.SetMethod("GET")
resp := fasthttp.AcquireResponse()
client := &fasthttp.Client{}
client.Do(req, resp)
bodyBytes := resp.Body()
is.Equal(string(bodyBytes), "")
}
func TestMain(m *testing.M) {
launchAndWait(func(port int, appPort string, grpcPort int) {
app := mocks.NewMockApp(true, 100, true)
app.Run(port)
}, 8089, "", 0)
launchAndWait(func(port int, appPort string, grpcPort int) {
i := NewAction("test-action", appPort, "standalone", "http", "", "", "", "", "")
i.Run(port, grpcPort)
}, 3505, "8089", 60006)
os.Exit(m.Run())
}
type launcher func(int, string, int)
func launchAndWait(fn launcher, port int, appPort string, grpcPort int) {
go func(port int, appPort string, grpcPort int) {
fn(port, appPort, grpcPort)
}(port, appPort, grpcPort)
timeout := time.Duration(2) * time.Second
for {
conn, _ := net.DialTimeout("tcp", net.JoinHostPort("", strconv.Itoa(port)), timeout)
if conn != nil {
conn.Close()
break
}
}
}
func getIP() string {
ifaces, _ := net.Interfaces()
for _, i := range ifaces {
addrs, _ := i.Addrs()
for _, a := range addrs {
if ipnet, ok := a.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return ipnet.IP.String()
}
}
}
}
return "127.0.0.1"
}
package action
type Configuration struct {
Spec ConfigurationSpec `json:"spec,omitempty"`
}
type ConfigurationSpec struct {
TracingSpec TracingSpec `json:"tracing,omitempty"`
}
type TracingSpec struct {
Enabled bool `json:"enabled"`
ExporterType string `json:"exporterType"`
ExporterAddress string `json:"exporterAddress"`
IncludeEvent bool `json:"includeEvent"`
IncludeEventBody bool `json:"includeEventBody"`
}
package action
type ContextActivation struct {
State []byte `json:"state"`
ID string `json:"id"`
}
package action
import "time"
type Event struct {
EventName string `json:"eventName,omitempty"`
To []string `json:"to,omitempty"`
Concurrency string `json:"concurrency,omitempty"`
CreatedAt time.Time `json:"createdAt,omitempty"`
State []KeyValState `json:"state,omitempty"`
Data interface{} `json:"data,omitempty"`
}
package action
import (
eventing_v1alpha1 "github.com/actionscore/actions/pkg/apis/eventing/v1alpha1"
)
type EventSource struct {
Name string `json:"name"`
Spec EventSourceSpec `json:"spec"`
Sender Sender
}
type EventSourceSpec struct {
Type string `json:"type"`
ConnectionInfo interface{} `json:"connectionInfo"`
SenderOptions eventing_v1alpha1.SenderOptions `json:"senderOptions,omitempty"`
}
package action
type MemoryStateStore struct {
State map[string]interface{}
}
func NewMemoryStateStore() *MemoryStateStore {
return &MemoryStateStore{}
}
func (m *MemoryStateStore) Init(eventSourceSpec EventSourceSpec) error {
m.State = make(map[string]interface{})
return nil
}
func (m *MemoryStateStore) ReadAsync(metadata interface{}, callback func([]byte) error) error {
return nil
}
func (m *MemoryStateStore) Read(metadata interface{}) (interface{}, error) {
key := metadata.(string)
return m.State[key], nil
}
func (m *MemoryStateStore) Write(data interface{}) error {
state := data.(KeyValState)
m.State[state.Key] = state.Value
return nil
}
package action
type ActionMetadata struct {
ID string `json:"id"`
Actors []ActorMetadata `json:"actors"`
Protocol string `json:"protocol"`
StateStore string `json:"stateStore"`
StateItemsCount int `json:"stateItemsCount"`
AppAddress string `json:"appAddress"`
Healthy bool `json:"healthy"`
}
type ActorMetadata struct {
ActorType string `json:"actorType"`
ActivatedContexts []string `json:"activatedContexts"`
}
package action
import (
"encoding/json"
"fmt"
"strconv"
"time"
)
type MockEventSource struct {
Spec EventSourceSpec
}
type MockMetadata struct {
MessageCount string `json:"messageCount,omitempty"`
Interval string `json:"interval,omitempty"`
}
func NewMockEventSource() *MockEventSource {
return &MockEventSource{}
}
func (a *MockEventSource) Init(eventSourceSpec EventSourceSpec) error {
a.Spec = eventSourceSpec
return nil
}
func (a *MockEventSource) Write(data interface{}) error {
_, err := json.Marshal(a.Spec.ConnectionInfo)
if err != nil {
return err
}
_, err = json.Marshal(data)
if err != nil {
return err
}
return nil
}
func (a *MockEventSource) Read(metadata interface{}) (interface{}, error) {
return nil, nil
}
func (a *MockEventSource) ReadAsync(metadata interface{}, callback func([]byte) error) error {
b, err := json.Marshal(a.Spec.ConnectionInfo)
if err != nil {
return err
}
var mockmeta MockMetadata
err = json.Unmarshal(b, &mockmeta)
if err != nil {
return err
}
count := 0
numInterval, _ := strconv.Atoi(mockmeta.Interval)
numMessageCount, _ := strconv.Atoi(mockmeta.MessageCount)
if numInterval == 0 {
numInterval = 50
}
for {
body := fmt.Sprintf("{\"time\":\"%s\"}", time.Now())
fmt.Println(body + "#" + time.Now().Format(time.RFC3339))
callback([]byte(body))
time.Sleep(time.Duration(numInterval) * time.Millisecond)
count++
if numMessageCount > 0 && count >= numMessageCount {
break
}
}
return nil
}
package action
import (
"context"
"encoding/json"
"fmt"
"time"
log "github.com/Sirupsen/logrus"
eventing_v1alpha1 "github.com/actionscore/actions/pkg/apis/eventing/v1alpha1"
"github.com/go-redis/redis"
)
type RedisSender struct {
InboxQueue string
Client *redis.Client
Options eventing_v1alpha1.SenderOptions
BatchBuffer []Event
LastSentBatch time.Time
}
func (s *RedisSender) MarshalEvents(events []Event) ([]byte, error) {
b, err := json.Marshal(events)
if err != nil {
return nil, err
}
return b, nil
}
func (s *RedisSender) MarshalEvent(event Event) ([]byte, error) {
return s.MarshalEvents([]Event{event})
}
func (s *RedisSender) UnMarshalEvents(value string) ([]Event, error) {
var events []Event
if err := json.Unmarshal([]byte(value), &events); err != nil {
return nil, err
}
return events, nil
}
func (s *RedisSender) Enqueue(event Event) error {
s.BatchBuffer = append(s.BatchBuffer, event)
if len(s.BatchBuffer) >= s.Options.SendBufferSize {
b, err := s.MarshalEvents(s.BatchBuffer)
if err != nil {
return err
}
s.BatchBuffer = nil
s.LastSentBatch = time.Now()
return s.Client.LPush(s.InboxQueue, b).Err()
}
return nil
}
func (s *RedisSender) Dequeue(workerIndex int, events []Event) error {
queueName := fmt.Sprintf("%s%s_%d", s.InboxQueue, "_processing", workerIndex)
b, err := s.MarshalEvents(events)
if err != nil {
return err
}
_, err = s.Client.LRem(queueName, -1, b).Result()
return err
}
func (s *RedisSender) PeekLock(workerIndex int) (*[]Event, error) {
queueName := fmt.Sprintf("%s%s_%d", s.InboxQueue, "_processing", workerIndex)
_, err := s.Client.RPopLPush(s.InboxQueue, queueName).Result()
v, err := s.Client.LRange(queueName, -1, -1).Result()
if err != nil {
return nil, err
}
if len(v) == 1 {
var events []Event
events, err = s.UnMarshalEvents(v[0])
if err != nil {
return nil, err
}
return &events, nil
} else {
return nil, nil
}
}
func (s *RedisSender) Init(spec EventSourceSpec) error {
s.Options = spec.SenderOptions
s.InboxQueue = spec.SenderOptions.QueueName
connInfo := spec.ConnectionInfo
b, err := json.Marshal(connInfo)
if err != nil {
return err
}
var redisCreds RedisCredentials
err = json.Unmarshal(b, &redisCreds)
if err != nil {
return err
}
s.Client = redis.NewClient(&redis.Options{
Addr: redisCreds.Host,
Password: redisCreds.Password,
DB: 0,
})
_, err = s.Client.Ping().Result()
if err != nil {
return err
}
return nil
}
func (b *RedisSender) StartLoop(postFunc func(events *[]Event, ctx1 context.Context) error, ctx context.Context) {
for i := 0; i < b.Options.NumWorkers; i++ {
go func(index int, ctx context.Context) {
for {
evts, err := b.PeekLock(index)
if err != nil {
log.Errorf("Error locking event - %s", err)
}
if evts != nil {
err := postFunc(evts, ctx)
if err == nil {
err = b.Dequeue(index, *evts)
if err != nil {
log.Errorf("Error dequeuing event - %s", err)
}
} else {
log.Errorf("Error posting event - %s", err)
}
}
time.Sleep(time.Millisecond * 50)
}
}(i, ctx)
}
}
func (m *RedisSender) ReadAsync(metadata interface{}, callback func([]byte) error) error {
return nil
}
func (m *RedisSender) Read(metadata interface{}) (interface{}, error) {
return nil, nil
}
func (m *RedisSender) Write(data interface{}) error {
return nil
}
package action
import (
"context"
"time"
eventing_v1alpha1 "github.com/actionscore/actions/pkg/apis/eventing/v1alpha1"
)
type Sender interface {
//Ensure makes sure a queue name exists
Init(spec EventSourceSpec) error
//Enqueue an event to the sending queue
Enqueue(event Event) error
//Dequeue an event from the sending queue
Dequeue(workerIndex int, events []Event) error
//Get a lock on queue header and return the header element
PeekLock(workerIndex int) (*[]Event, error)
//Start message sending loop
StartLoop(postFunc func(event *[]Event, context context.Context) error, context context.Context)
ReadAsync(metadata interface{}, callback func([]byte) error) error
Read(metadata interface{}) (interface{}, error)
Write(data interface{}) error
}
//batch modes
const WindowedBatch = "windowed"
const FixedSizeBatch = "fixsize"
const NoBatch = "no"
//dedup modes
const NoDedup = "no"
const WindowedDedup = "windowed"
//order modes
const NoOrder = "no"
const FIFO = "fifo"
const Sorted = "sorted"
func NewRedisSender() Sender {
return &RedisSender{
LastSentBatch: time.Time{},
}
}
func DefaultSenderOptions() eventing_v1alpha1.SenderOptions {
return eventing_v1alpha1.SenderOptions{
NumWorkers: 10,
BatchOptions: eventing_v1alpha1.BatchOptions{
BatchMode: FixedSizeBatch,
BatchSize: 10,
},
}
}
package action
type StateStore interface {
GetAll(keyMatch string) ([]KeyValState, error)
}
type KeyValState struct {
Key string `json:"key"`
Value interface{} `json:"value"`
}
package actors
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/actionscore/actions/pkg/channel/http"
"github.com/golang/protobuf/ptypes/any"
log "github.com/Sirupsen/logrus"
"github.com/actionscore/actions/pkg/channel"
"github.com/actionscore/actions/pkg/components/state"
"github.com/actionscore/actions/pkg/placement"
pb "github.com/actionscore/actions/pkg/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
type Actors interface {
Call(req *CallRequest) (*CallResponse, error)
Init() error
GetState(req *GetStateRequest) (*StateResponse, error)
SaveState(req *SaveStateRequest) error
}
type actors struct {
appChannel channel.AppChannel
store state.StateStore
activeActors map[string]string
actorLock *sync.RWMutex
placementTableLock *sync.RWMutex
placementTables *placement.PlacementTables
placementSignal chan struct{}
placementBlock bool
operationUpdateLock *sync.Mutex
grpcConnectionFn func(address string) (*grpc.ClientConn, error)
config Config
}
const (
idHeader = "id"
lockOperation = "lock"
unlockOperation = "unlock"
updateOperation = "update"
)
func NewActors(stateStore state.StateStore, appChannel channel.AppChannel, grpcConnectionFn func(address string) (*grpc.ClientConn, error), config Config) Actors {
return &actors{
appChannel: appChannel,
config: config,
store: stateStore,
actorLock: &sync.RWMutex{},
activeActors: map[string]string{},
placementTableLock: &sync.RWMutex{},
placementTables: &placement.PlacementTables{Entries: make(map[string]*placement.Consistent)},
operationUpdateLock: &sync.Mutex{},
grpcConnectionFn: grpcConnectionFn,
}
}
func (a *actors) Init() error {
if a.config.PlacementServiceAddress == "" {
return errors.New("couldn't connect to placement service: address is empty")
}
go a.connectToPlacementService(a.config.PlacementServiceAddress, a.config.HostAddress, a.config.HeartbeatInterval)
return nil
}
func (a *actors) Call(req *CallRequest) (*CallResponse, error) {
if a.placementBlock {
<-a.placementSignal
}
targetActorAddress := a.lookupActorAddress(req.ActorType, req.ActorID)
if targetActorAddress == "" {
return nil, fmt.Errorf("error finding address for actor type %s with id %s", req.ActorType, req.ActorID)
}
var resp []byte
var err error
if a.isActorLocal(targetActorAddress, a.config.HostAddress, a.config.Port) {
err := a.tryActivateActor(req.ActorType, req.ActorID)
if err != nil {
return nil, err
}
resp, err = a.callLocalActor(req.ActorType, req.ActorID, req.Method, req.Data)
} else {
resp, err = a.callRemoteActor(targetActorAddress, req.ActorType, req.ActorID, req.Method, req.Data)
}
if err != nil {
return nil, err
}
return &CallResponse{
Data: resp,
}, nil
}
func (a *actors) callLocalActor(actorType, actorID, actorMethod string, data []byte) ([]byte, error) {
method := fmt.Sprintf("actors/%s/%s/%s", actorType, actorID, actorMethod)
req := channel.InvokeRequest{
Method: method,
Payload: data,
Metadata: map[string]string{http.HTTPVerb: http.Put},
}
resp, err := a.appChannel.InvokeMethod(&req)
if err != nil {
return nil, err
}
return resp.Data, nil
}
func (a *actors) callRemoteActor(targetAddress, actorType, actorID, actorMethod string, data []byte) ([]byte, error) {
req := pb.CallActorEnvelope{
ActorType: actorType,
ActorID: actorID,
Method: actorMethod,
Data: &any.Any{Value: data},
}
conn, err := a.grpcConnectionFn(targetAddress)
if err != nil {
return nil, err
}
client := pb.NewActionsClient(conn)
resp, err := client.CallActor(context.Background(), &req)
if err != nil {
return nil, err
}
return resp.Data.Value, nil
}
func (a *actors) tryActivateActor(actorType, actorID string) error {
// read lock actor for read confirmation
a.actorLock.RLock()
_, exists := a.activeActors[actorID]
a.actorLock.RUnlock()
if !exists {
// lock for actor activation
a.actorLock.Lock()
defer a.actorLock.Unlock()
key := a.constructActorStateKey(actorType, actorID)
resp, err := a.store.Get(&state.GetRequest{
Key: key,
})
req := channel.InvokeRequest{
Method: fmt.Sprintf("actors/%s/%s", actorType, actorID),
Metadata: map[string]string{http.HTTPVerb: http.Post},
Payload: resp.Data,
}
_, err = a.appChannel.InvokeMethod(&req)
if err != nil {
return fmt.Errorf("error activating actor type %s with id %s: %s", actorType, actorID, err)
}
a.activeActors[actorID] = actorType
}
return nil
}
func (a *actors) isActorLocal(targetActorAddress, hostAddress string, grpcPort int) bool {
return strings.Contains(targetActorAddress, "localhost") || strings.Contains(targetActorAddress, "127.0.0.1") ||
targetActorAddress == fmt.Sprintf("%s:%v", hostAddress, grpcPort)
}
func (a *actors) GetState(req *GetStateRequest) (*StateResponse, error) {
key := a.constructActorStateKey(req.ActorType, req.ActorID)
resp, err := a.store.Get(&state.GetRequest{
Key: key,
})
if err != nil {
return nil, err
}
return &StateResponse{
Data: resp.Data,
}, nil
}
func (a *actors) SaveState(req *SaveStateRequest) error {
key := a.constructActorStateKey(req.ActorType, req.ActorID)
err := a.store.Set(&state.SetRequest{
Value: req.Data,
Key: key,
})
return err
}
func (a *actors) constructActorStateKey(actorType, actorID string) string {
return fmt.Sprintf("%s-%s-%s", a.config.ActionsID, actorType, actorID)
}
func (a *actors) connectToPlacementService(placementAddress, hostAddress string, heartbeatInterval time.Duration) {
log.Infof("actors: starting connection attempt to placement service at %s", placementAddress)
stream := a.getPlacementClientPersistently(placementAddress, hostAddress)
log.Infof("actors: established connection to placement service at %s", placementAddress)
go func() {
for {
host := pb.Host{
Name: hostAddress,
Load: 1,
Entities: a.config.HostedActorTypes,
Port: int64(a.config.Port),
}
if stream != nil {
if err := stream.Send(&host); err != nil {
log.Error("actors: connection failure to placement service: retrying")
stream = a.getPlacementClientPersistently(placementAddress, hostAddress)
}
}
time.Sleep(heartbeatInterval)
}
}()
go func() {
for {
resp, err := stream.Recv()
if err != nil {
log.Error("actors: connection failure to placement service: retrying")
stream = a.getPlacementClientPersistently(placementAddress, hostAddress)
}
if resp != nil {
a.onPlacementOrder(resp)
}
}
}()
}
func (a *actors) getPlacementClientPersistently(placementAddress, hostAddress string) pb.PlacementService_ReportActionStatusClient {
for {
retryInterval := time.Millisecond * 250
conn, err := grpc.Dial(placementAddress, grpc.WithInsecure())
if err != nil {
time.Sleep(retryInterval)
continue
}
header := metadata.New(map[string]string{idHeader: hostAddress})
ctx := metadata.NewOutgoingContext(context.Background(), header)
client := pb.NewPlacementServiceClient(conn)
stream, err := client.ReportActionStatus(ctx)
if err != nil {
time.Sleep(retryInterval)
continue
}
return stream
}
}
func (a *actors) onPlacementOrder(in *pb.PlacementOrder) {
log.Infof("actors: placement order received: %s", in.Operation)
// lock all incoming calls when an updated table arrives
a.operationUpdateLock.Lock()
defer a.operationUpdateLock.Unlock()
switch in.Operation {
case lockOperation:
{
a.blockPlacements()
go func() {
time.Sleep(time.Second * 5)
a.unblockPlacements()
}()
}
case unlockOperation:
{
a.unblockPlacements()
}
case updateOperation:
{
a.updatePlacements(in.Tables)
}
}
}
func (a *actors) blockPlacements() {
a.placementSignal = make(chan struct{})
a.placementBlock = true
}
func (a *actors) unblockPlacements() {
if a.placementBlock {
a.placementBlock = false
close(a.placementSignal)
}
}
func (a *actors) updatePlacements(in *pb.PlacementTables) {
if in.Version != a.placementTables.Version {
for k, v := range in.Entries {
loadMap := map[string]*placement.Host{}
for lk, lv := range v.LoadMap {
loadMap[lk] = placement.NewHost(lv.Name, lv.Load, lv.Port)
}
c := placement.NewFromExisting(v.Hosts, v.SortedSet, loadMap)
a.placementTables.Entries[k] = c
}
a.placementTables.Version = in.Version
log.Info("actors: placement tables updated")
}
}
func (a *actors) lookupActorAddress(actorType, actorID string) string {
// read lock for table map
a.placementTableLock.RLock()
defer a.placementTableLock.RUnlock()
t := a.placementTables.Entries[actorType]
if t == nil {
return ""
}
host, _ := t.GetHost(actorID)
return fmt.Sprintf("%s:%v", host.Name, host.Port)
}
package actors
type CallRequest struct {
ActorType string `json:"actorType"`
ActorID string `json:"actorId"`
Method string `json:"method"`
Data []byte `json:"data"`
Metadata map[string]string `json:"metadata"`
}
package actors
type CallResponse struct {
Data []byte `json:"data"`
Metadata map[string]string `json:"metadata"`
}
package actors
import "time"
type Config struct {
HostAddress string
ActionsID string
PlacementServiceAddress string
HostedActorTypes []string
Port int
HeartbeatInterval time.Duration
}
package actors
type GetStateRequest struct {
ActorID string `json:"actorId"`
ActorType string `json:"actorType"`
}
package actors
type SaveStateRequest struct {
ActorID string `json:"actorId"`
ActorType string `json:"actorType"`
Data []byte `json:"data"`
}
package actors
type StateResponse struct {
Data []byte `json:"data"`
}
package eventing package components
const ( const (
GroupName = "actions.io" GroupName = "actions.io"
......
// +k8s:deepcopy-gen=package // +k8s:deepcopy-gen=package
// +groupName=eventing.actions.io // +groupName=components.actions.io
package v1alpha1 package v1alpha1
package v1alpha1 package v1alpha1
import ( import (
"github.com/actionscore/actions/pkg/apis/eventing" "github.com/actionscore/actions/pkg/apis/components"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
) )
// SchemeGroupVersion is group version used to register these objects // SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: eventing.GroupName, Version: "v1alpha1"} var SchemeGroupVersion = schema.GroupVersion{Group: components.GroupName, Version: "v1alpha1"}
// Kind takes an unqualified kind and returns back a Group qualified GroupKind // Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind { func Kind(kind string) schema.GroupKind {
...@@ -29,8 +29,8 @@ var ( ...@@ -29,8 +29,8 @@ var (
func addKnownTypes(scheme *runtime.Scheme) error { func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes( scheme.AddKnownTypes(
SchemeGroupVersion, SchemeGroupVersion,
&EventSource{}, &Component{},
&EventSourceList{}, &ComponentList{},
) )
scheme.AddKnownTypes(SchemeGroupVersion) scheme.AddKnownTypes(SchemeGroupVersion)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion) metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
......
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +genclient:noStatus
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// Component describes an Actions component type
type Component struct {
metav1.TypeMeta `json:",inline"`
// +optional
metav1.ObjectMeta `json:"metadata,omitempty"`
// +optional
Spec ComponentSpec `json:"spec,omitempty"`
}
// ComponentSpec is the spec for a component
type ComponentSpec struct {
Type string `json:"type"`
ConnectionInfo map[string]string `json:"connectionInfo"`
Properties map[string]string `json:"properties"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// ComponentList is a list of Actions components
type ComponentList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []Component `json:"items"`
}
...@@ -25,39 +25,7 @@ import ( ...@@ -25,39 +25,7 @@ import (
) )
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *BatchOptions) DeepCopyInto(out *BatchOptions) { func (in *Component) DeepCopyInto(out *Component) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BatchOptions.
func (in *BatchOptions) DeepCopy() *BatchOptions {
if in == nil {
return nil
}
out := new(BatchOptions)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DedupOptions) DeepCopyInto(out *DedupOptions) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DedupOptions.
func (in *DedupOptions) DeepCopy() *DedupOptions {
if in == nil {
return nil
}
out := new(DedupOptions)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EventSource) DeepCopyInto(out *EventSource) {
*out = *in *out = *in
out.TypeMeta = in.TypeMeta out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
...@@ -65,18 +33,18 @@ func (in *EventSource) DeepCopyInto(out *EventSource) { ...@@ -65,18 +33,18 @@ func (in *EventSource) DeepCopyInto(out *EventSource) {
return return
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EventSource. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Component.
func (in *EventSource) DeepCopy() *EventSource { func (in *Component) DeepCopy() *Component {
if in == nil { if in == nil {
return nil return nil
} }
out := new(EventSource) out := new(Component)
in.DeepCopyInto(out) in.DeepCopyInto(out)
return out return out
} }
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. // DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *EventSource) DeepCopyObject() runtime.Object { func (in *Component) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil { if c := in.DeepCopy(); c != nil {
return c return c
} }
...@@ -84,13 +52,13 @@ func (in *EventSource) DeepCopyObject() runtime.Object { ...@@ -84,13 +52,13 @@ func (in *EventSource) DeepCopyObject() runtime.Object {
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EventSourceList) DeepCopyInto(out *EventSourceList) { func (in *ComponentList) DeepCopyInto(out *ComponentList) {
*out = *in *out = *in
out.TypeMeta = in.TypeMeta out.TypeMeta = in.TypeMeta
out.ListMeta = in.ListMeta out.ListMeta = in.ListMeta
if in.Items != nil { if in.Items != nil {
in, out := &in.Items, &out.Items in, out := &in.Items, &out.Items
*out = make([]EventSource, len(*in)) *out = make([]Component, len(*in))
for i := range *in { for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i]) (*in)[i].DeepCopyInto(&(*out)[i])
} }
...@@ -98,18 +66,18 @@ func (in *EventSourceList) DeepCopyInto(out *EventSourceList) { ...@@ -98,18 +66,18 @@ func (in *EventSourceList) DeepCopyInto(out *EventSourceList) {
return return
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EventSourceList. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ComponentList.
func (in *EventSourceList) DeepCopy() *EventSourceList { func (in *ComponentList) DeepCopy() *ComponentList {
if in == nil { if in == nil {
return nil return nil
} }
out := new(EventSourceList) out := new(ComponentList)
in.DeepCopyInto(out) in.DeepCopyInto(out)
return out return out
} }
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. // DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *EventSourceList) DeepCopyObject() runtime.Object { func (in *ComponentList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil { if c := in.DeepCopy(); c != nil {
return c return c
} }
...@@ -117,7 +85,7 @@ func (in *EventSourceList) DeepCopyObject() runtime.Object { ...@@ -117,7 +85,7 @@ func (in *EventSourceList) DeepCopyObject() runtime.Object {
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *EventSourceSpec) DeepCopyInto(out *EventSourceSpec) { func (in *ComponentSpec) DeepCopyInto(out *ComponentSpec) {
*out = *in *out = *in
if in.ConnectionInfo != nil { if in.ConnectionInfo != nil {
in, out := &in.ConnectionInfo, &out.ConnectionInfo in, out := &in.ConnectionInfo, &out.ConnectionInfo
...@@ -126,50 +94,22 @@ func (in *EventSourceSpec) DeepCopyInto(out *EventSourceSpec) { ...@@ -126,50 +94,22 @@ func (in *EventSourceSpec) DeepCopyInto(out *EventSourceSpec) {
(*out)[key] = val (*out)[key] = val
} }
} }
out.SenderOptions = in.SenderOptions if in.Properties != nil {
return in, out := &in.Properties, &out.Properties
} *out = make(map[string]string, len(*in))
for key, val := range *in {
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EventSourceSpec. (*out)[key] = val
func (in *EventSourceSpec) DeepCopy() *EventSourceSpec { }
if in == nil {
return nil
}
out := new(EventSourceSpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *OrderOptions) DeepCopyInto(out *OrderOptions) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OrderOptions.
func (in *OrderOptions) DeepCopy() *OrderOptions {
if in == nil {
return nil
} }
out := new(OrderOptions)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SenderOptions) DeepCopyInto(out *SenderOptions) {
*out = *in
out.BatchOptions = in.BatchOptions
out.DedupOptions = in.DedupOptions
return return
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SenderOptions. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ComponentSpec.
func (in *SenderOptions) DeepCopy() *SenderOptions { func (in *ComponentSpec) DeepCopy() *ComponentSpec {
if in == nil { if in == nil {
return nil return nil
} }
out := new(SenderOptions) out := new(ComponentSpec)
in.DeepCopyInto(out) in.DeepCopyInto(out)
return out return out
} }
package v1alpha1
import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +genclient:noStatus
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// EventSource describes an Action event source
type EventSource struct {
metav1.TypeMeta `json:",inline"`
// +optional
metav1.ObjectMeta `json:"metadata,omitempty"`
// +optional
Spec EventSourceSpec `json:"spec,omitempty"`
}
// EventSourceSpec is the spec for an EventSource
type EventSourceSpec struct {
Type string `json:"type"`
// +k8s:deepcopy-gen=false
ConnectionInfo map[string]string `json:"connectionInfo"`
SenderOptions SenderOptions `json:"senderOptions,omitempty"`
}
type SenderOptions struct {
BatchOptions BatchOptions `json:"batchOptions,omitempty"`
DedupOptions DedupOptions `json:"dedupOptions,omitempty"`
NumWorkers int `json:"numWorkers,omitempty"`
SendBufferSize int `json:"sendBufferSize,omitempty"`
QueueName string `json:"queueName,omitempty"`
}
type BatchOptions struct {
BatchMode string `json:"mode"`
BatchSize int `json:"size,omitempty"`
WindowSize time.Duration `json:"window,omitempty"`
}
type DedupOptions struct {
DedupMode string `json:"mode"`
DedupWindow time.Duration `json:"window,omitempty"`
DedupField string `json:"field,omitempty"`
}
type OrderOptions struct {
OrderMode string `json:"mode"`
SortWindow time.Duration `json:"window,omitempty"`
SortField string `json:"field,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// EventSourceList is a list of Action event sources
type EventSourceList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []EventSource `json:"items"`
}
package bindings
import (
"github.com/actionscore/actions/pkg/bindings/cosmosdb"
"github.com/actionscore/actions/pkg/bindings/dynamodb"
"github.com/actionscore/actions/pkg/bindings/eventhubs"
"github.com/actionscore/actions/pkg/bindings/gcpbucket"
"github.com/actionscore/actions/pkg/bindings/http"
"github.com/actionscore/actions/pkg/bindings/kafka"
"github.com/actionscore/actions/pkg/bindings/mqtt"
"github.com/actionscore/actions/pkg/bindings/rabbitmq"
"github.com/actionscore/actions/pkg/bindings/redis"
"github.com/actionscore/actions/pkg/bindings/sns"
"github.com/actionscore/actions/pkg/bindings/sqs"
"github.com/actionscore/actions/pkg/components/bindings"
)
// Load input/output bindings
func Load() {
bindings.RegisterInputBinding("aws.sqs", sqs.NewAWSSQS())
bindings.RegisterOutputBinding("aws.sqs", sqs.NewAWSSQS())
bindings.RegisterOutputBinding("aws.sns", sns.NewAWSSns())
bindings.RegisterInputBinding("azure.eventhubs", eventhubs.NewAzureEventHubs())
bindings.RegisterOutputBinding("azure.eventhubs", eventhubs.NewAzureEventHubs())
bindings.RegisterOutputBinding("aws.dynamodb", dynamodb.NewDynamoDB())
bindings.RegisterOutputBinding("azure.cosmosdb", cosmosdb.NewCosmosDB())
bindings.RegisterOutputBinding("gcp.storage", gcpbucket.NewGCPStorage())
bindings.RegisterInputBinding("http", http.NewHTTP())
bindings.RegisterOutputBinding("http", http.NewHTTP())
bindings.RegisterInputBinding("kafka", kafka.NewKafka())
bindings.RegisterOutputBinding("kafka", kafka.NewKafka())
bindings.RegisterInputBinding("mqtt", mqtt.NewMQTT())
bindings.RegisterOutputBinding("mqtt", mqtt.NewMQTT())
bindings.RegisterInputBinding("rabbitmq", rabbitmq.NewRabbitMQ())
bindings.RegisterOutputBinding("rabbitmq", rabbitmq.NewRabbitMQ())
bindings.RegisterOutputBinding("redis", redis.NewRedis())
}
package action package cosmosdb
import ( import (
"encoding/json" "encoding/json"
...@@ -6,6 +6,7 @@ import ( ...@@ -6,6 +6,7 @@ import (
_ "github.com/a8m/documentdb" // documentdb go pkg fix _ "github.com/a8m/documentdb" // documentdb go pkg fix
documentdb "github.com/a8m/documentdb-go" documentdb "github.com/a8m/documentdb-go"
"github.com/actionscore/actions/pkg/components/bindings"
"github.com/google/uuid" "github.com/google/uuid"
) )
...@@ -15,12 +16,25 @@ type CosmosDB struct { ...@@ -15,12 +16,25 @@ type CosmosDB struct {
db *documentdb.Database db *documentdb.Database
} }
type CosmosDBCredentials struct {
URL string `json:"url"`
MasterKey string `json:"masterKey"`
Database string `json:"database"`
Collection string `json:"collection"`
}
type CosmosItem struct {
documentdb.Document
ID string `json:"id"`
Value interface{} `json:"value"`
}
func NewCosmosDB() *CosmosDB { func NewCosmosDB() *CosmosDB {
return &CosmosDB{} return &CosmosDB{}
} }
func (c *CosmosDB) Init(eventSourceSpec EventSourceSpec) error { func (c *CosmosDB) Init(metadata bindings.Metadata) error {
connInfo := eventSourceSpec.ConnectionInfo connInfo := metadata.ConnectionInfo
b, err := json.Marshal(connInfo) b, err := json.Marshal(connInfo)
if err != nil { if err != nil {
return err return err
...@@ -63,23 +77,21 @@ func (c *CosmosDB) Init(eventSourceSpec EventSourceSpec) error { ...@@ -63,23 +77,21 @@ func (c *CosmosDB) Init(eventSourceSpec EventSourceSpec) error {
return nil return nil
} }
func (c *CosmosDB) Read(metadata interface{}) (interface{}, error) { func (c *CosmosDB) Write(req *bindings.WriteRequest) error {
return nil, nil var obj interface{}
} err := json.Unmarshal(req.Data, &obj)
if err != nil {
func (c *CosmosDB) ReadAsync(metadata interface{}, callback func([]byte) error) error { return err
return nil }
}
func (c *CosmosDB) Write(data interface{}) error {
key := uuid.New() key := uuid.New()
i := CosmosItem{ i := CosmosItem{
ID: key.String(), ID: key.String(),
Value: data, Value: obj,
} }
_, err := c.client.CreateDocument(c.collection.Self, i, documentdb.PartitionKey(key)) _, err = c.client.CreateDocument(c.collection.Self, i, documentdb.PartitionKey(key))
if err != nil { if err != nil {
return err return err
} }
......
package action package dynamodb
import ( import (
"encoding/json" "encoding/json"
"os" "os"
"github.com/actionscore/actions/pkg/components/bindings"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb"
...@@ -32,8 +33,8 @@ func NewDynamoDB() *DynamoDB { ...@@ -32,8 +33,8 @@ func NewDynamoDB() *DynamoDB {
return &DynamoDB{} return &DynamoDB{}
} }
func (d *DynamoDB) Init(eventSourceSpec EventSourceSpec) error { func (d *DynamoDB) Init(metadata bindings.Metadata) error {
meta, err := d.GetDynamoDBMetadata(eventSourceSpec) meta, err := d.GetDynamoDBMetadata(metadata)
if err != nil { if err != nil {
return err return err
} }
...@@ -49,10 +50,16 @@ func (d *DynamoDB) Init(eventSourceSpec EventSourceSpec) error { ...@@ -49,10 +50,16 @@ func (d *DynamoDB) Init(eventSourceSpec EventSourceSpec) error {
return nil return nil
} }
func (d *DynamoDB) Write(data interface{}) error { func (d *DynamoDB) Write(req *bindings.WriteRequest) error {
var obj interface{}
err := json.Unmarshal(req.Data, &obj)
if err != nil {
return err
}
i := DynamoItem{ i := DynamoItem{
ID: uuid.New().String(), ID: uuid.New().String(),
Value: data, Value: obj,
} }
item, err := dynamodbattribute.MarshalMap(i) item, err := dynamodbattribute.MarshalMap(i)
...@@ -73,11 +80,7 @@ func (d *DynamoDB) Write(data interface{}) error { ...@@ -73,11 +80,7 @@ func (d *DynamoDB) Write(data interface{}) error {
return nil return nil
} }
func (d *DynamoDB) Read(metadata interface{}) (interface{}, error) { func (d *DynamoDB) GetDynamoDBMetadata(spec bindings.Metadata) (*DynamoDBMetadata, error) {
return nil, nil
}
func (d *DynamoDB) GetDynamoDBMetadata(spec EventSourceSpec) (*DynamoDBMetadata, error) {
b, err := json.Marshal(spec.ConnectionInfo) b, err := json.Marshal(spec.ConnectionInfo)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -102,7 +105,3 @@ func (d *DynamoDB) getClient(awsMeta *DynamoDBMetadata) (*dynamodb.DynamoDB, err ...@@ -102,7 +105,3 @@ func (d *DynamoDB) getClient(awsMeta *DynamoDBMetadata) (*dynamodb.DynamoDB, err
return c, nil return c, nil
} }
func (d *DynamoDB) ReadAsync(metadata interface{}, callback func([]byte) error) error {
return nil
}
package action package eventhubs
import ( import (
"bytes" "bytes"
...@@ -12,10 +12,11 @@ import ( ...@@ -12,10 +12,11 @@ import (
eventhub "github.com/Azure/azure-event-hubs-go" eventhub "github.com/Azure/azure-event-hubs-go"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/actionscore/actions/pkg/components/bindings"
) )
type AzureEventHubs struct { type AzureEventHubs struct {
Spec EventSourceSpec Spec bindings.Metadata
} }
type AzureEventHubsMetadata struct { type AzureEventHubsMetadata struct {
...@@ -26,8 +27,8 @@ func NewAzureEventHubs() *AzureEventHubs { ...@@ -26,8 +27,8 @@ func NewAzureEventHubs() *AzureEventHubs {
return &AzureEventHubs{} return &AzureEventHubs{}
} }
func (a *AzureEventHubs) Init(eventSourceSpec EventSourceSpec) error { func (a *AzureEventHubs) Init(metadata bindings.Metadata) error {
a.Spec = eventSourceSpec a.Spec = metadata
return nil return nil
} }
...@@ -41,7 +42,7 @@ func GetBytes(key interface{}) ([]byte, error) { ...@@ -41,7 +42,7 @@ func GetBytes(key interface{}) ([]byte, error) {
return buf.Bytes(), nil return buf.Bytes(), nil
} }
func (a *AzureEventHubs) Write(data interface{}) error { func (a *AzureEventHubs) Write(req *bindings.WriteRequest) error {
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
b, err := json.Marshal(a.Spec.ConnectionInfo) b, err := json.Marshal(a.Spec.ConnectionInfo)
...@@ -62,13 +63,8 @@ func (a *AzureEventHubs) Write(data interface{}) error { ...@@ -62,13 +63,8 @@ func (a *AzureEventHubs) Write(data interface{}) error {
return err return err
} }
dataBytes, err := json.Marshal(data)
if err != nil {
return err
}
err = hub.Send(context.Background(), &eventhub.Event{ err = hub.Send(context.Background(), &eventhub.Event{
Data: dataBytes, Data: req.Data,
}) })
if err != nil { if err != nil {
return err return err
...@@ -78,11 +74,7 @@ func (a *AzureEventHubs) Write(data interface{}) error { ...@@ -78,11 +74,7 @@ func (a *AzureEventHubs) Write(data interface{}) error {
return nil return nil
} }
func (a *AzureEventHubs) Read(metadata interface{}) (interface{}, error) { func (a *AzureEventHubs) Read(handler func(*bindings.ReadResponse) error) error {
return nil, nil
}
func (a *AzureEventHubs) ReadAsync(metadata interface{}, callback func([]byte) error) error {
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
b, err := json.Marshal(a.Spec.ConnectionInfo) b, err := json.Marshal(a.Spec.ConnectionInfo)
...@@ -103,8 +95,14 @@ func (a *AzureEventHubs) ReadAsync(metadata interface{}, callback func([]byte) e ...@@ -103,8 +95,14 @@ func (a *AzureEventHubs) ReadAsync(metadata interface{}, callback func([]byte) e
return err return err
} }
handler := func(c context.Context, event *eventhub.Event) error { callback := func(c context.Context, event *eventhub.Event) error {
return callback(event.Data) if event != nil {
handler(&bindings.ReadResponse{
Data: event.Data,
})
}
return nil
} }
ctx := context.Background() ctx := context.Background()
...@@ -114,7 +112,7 @@ func (a *AzureEventHubs) ReadAsync(metadata interface{}, callback func([]byte) e ...@@ -114,7 +112,7 @@ func (a *AzureEventHubs) ReadAsync(metadata interface{}, callback func([]byte) e
} }
for _, partitionID := range runtimeInfo.PartitionIDs { for _, partitionID := range runtimeInfo.PartitionIDs {
_, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset()) _, err := hub.Receive(ctx, partitionID, callback, eventhub.ReceiveWithLatestOffset())
if err != nil { if err != nil {
return err return err
} }
......
package action package gcpbucket
import ( import (
"context" "context"
...@@ -8,12 +8,13 @@ import ( ...@@ -8,12 +8,13 @@ import (
"cloud.google.com/go/storage" "cloud.google.com/go/storage"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/actionscore/actions/pkg/components/bindings"
uuid "github.com/satori/go.uuid" uuid "github.com/satori/go.uuid"
"google.golang.org/api/option" "google.golang.org/api/option"
) )
type GCPStorage struct { type GCPStorage struct {
Spec EventSourceSpec Spec bindings.Metadata
} }
type GCPMetadata struct { type GCPMetadata struct {
...@@ -34,20 +35,12 @@ func NewGCPStorage() *GCPStorage { ...@@ -34,20 +35,12 @@ func NewGCPStorage() *GCPStorage {
return &GCPStorage{} return &GCPStorage{}
} }
func (g *GCPStorage) Init(eventSourceSpec EventSourceSpec) error { func (g *GCPStorage) Init(metadata bindings.Metadata) error {
g.Spec = eventSourceSpec g.Spec = metadata
return nil return nil
} }
func (g *GCPStorage) Read(metadata interface{}) (interface{}, error) { func (g *GCPStorage) Write(req *bindings.WriteRequest) error {
return nil, nil
}
func (g *GCPStorage) ReadAsync(metadata interface{}, callback func([]byte) error) error {
return nil
}
func (g *GCPStorage) Write(data interface{}) error {
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
b, err := json.Marshal(g.Spec.ConnectionInfo) b, err := json.Marshal(g.Spec.ConnectionInfo)
...@@ -72,12 +65,7 @@ func (g *GCPStorage) Write(data interface{}) error { ...@@ -72,12 +65,7 @@ func (g *GCPStorage) Write(data interface{}) error {
name := uuid.NewV4() name := uuid.NewV4()
wc := client.Bucket(metadata.Bucket).Object(name.String()).NewWriter(ctx) wc := client.Bucket(metadata.Bucket).Object(name.String()).NewWriter(ctx)
dataBytes, err := json.Marshal(data) if _, err := wc.Write(req.Data); err != nil {
if err != nil {
return err
}
if _, err := wc.Write(dataBytes); err != nil {
return err return err
} }
...@@ -86,7 +74,5 @@ func (g *GCPStorage) Write(data interface{}) error { ...@@ -86,7 +74,5 @@ func (g *GCPStorage) Write(data interface{}) error {
return err return err
} }
log.Info("Written data to GCP Storage successfully")
return nil return nil
} }
package action package http
import ( import (
"bytes" "bytes"
...@@ -6,6 +6,8 @@ import ( ...@@ -6,6 +6,8 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"time" "time"
"github.com/actionscore/actions/pkg/components/bindings"
) )
type HttpSource struct { type HttpSource struct {
...@@ -17,12 +19,12 @@ type HttpMetadata struct { ...@@ -17,12 +19,12 @@ type HttpMetadata struct {
Method string `json:"method"` Method string `json:"method"`
} }
func NewHttpSource() *HttpSource { func NewHTTP() *HttpSource {
return &HttpSource{} return &HttpSource{}
} }
func (h *HttpSource) Init(eventSourceSpec EventSourceSpec) error { func (h *HttpSource) Init(metadata bindings.Metadata) error {
b, err := json.Marshal(eventSourceSpec.ConnectionInfo) b, err := json.Marshal(metadata.ConnectionInfo)
if err != nil { if err != nil {
return err return err
} }
...@@ -57,36 +59,22 @@ func (h *HttpSource) HttpGet(url string) ([]byte, error) { ...@@ -57,36 +59,22 @@ func (h *HttpSource) HttpGet(url string) ([]byte, error) {
return b, nil return b, nil
} }
func (h *HttpSource) Read(metadata interface{}) (interface{}, error) { func (h *HttpSource) Read(handler func(*bindings.ReadResponse) error) error {
var data interface{}
b, err := h.HttpGet(h.Spec.URL) b, err := h.HttpGet(h.Spec.URL)
if err != nil {
return nil, err
}
err = json.Unmarshal(b, &data)
if err != nil {
return nil, err
}
return data, nil
}
func (h *HttpSource) ReadAsync(metadata interface{}, callback func([]byte) error) error {
data, err := h.HttpGet(h.Spec.URL)
if err != nil { if err != nil {
return err return err
} }
return callback(data) handler(&bindings.ReadResponse{
} Data: b,
})
func (h *HttpSource) Write(data interface{}) error { return nil
b := new(bytes.Buffer) }
json.NewEncoder(b).Encode(data)
func (h *HttpSource) Write(req *bindings.WriteRequest) error {
client := http.Client{Timeout: time.Second * 5} client := http.Client{Timeout: time.Second * 5}
resp, err := client.Post(h.Spec.URL, "application/json; charset=utf-8", b) resp, err := client.Post(h.Spec.URL, "application/json; charset=utf-8", bytes.NewBuffer(req.Data))
if err != nil { if err != nil {
return err return err
} }
......
package action package kafka
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/actionscore/actions/pkg/components/bindings"
) )
type Kafka struct { type Kafka struct {
...@@ -28,19 +28,19 @@ type KafkaMetadata struct { ...@@ -28,19 +28,19 @@ type KafkaMetadata struct {
type consumer struct { type consumer struct {
ready chan bool ready chan bool
callback func([]byte) error callback func(*bindings.ReadResponse) error
}
func (consumer *consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
} }
func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { func (consumer *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() { for message := range claim.Messages() {
if consumer.callback != nil { if consumer.callback != nil {
consumer.callback(message.Value) err := consumer.callback(&bindings.ReadResponse{
Data: message.Value,
})
if err == nil {
session.MarkMessage(message, "")
}
} }
session.MarkMessage(message, "")
} }
return nil return nil
...@@ -55,8 +55,8 @@ func NewKafka() *Kafka { ...@@ -55,8 +55,8 @@ func NewKafka() *Kafka {
return &Kafka{} return &Kafka{}
} }
func (k *Kafka) Init(eventSourceSpec EventSourceSpec) error { func (k *Kafka) Init(metadata bindings.Metadata) error {
meta, err := k.GetKafkaMetadata(eventSourceSpec) meta, err := k.GetKafkaMetadata(metadata)
if err != nil { if err != nil {
return err return err
} }
...@@ -74,10 +74,10 @@ func (k *Kafka) Init(eventSourceSpec EventSourceSpec) error { ...@@ -74,10 +74,10 @@ func (k *Kafka) Init(eventSourceSpec EventSourceSpec) error {
return nil return nil
} }
func (k *Kafka) Write(data interface{}) error { func (k *Kafka) Write(req *bindings.WriteRequest) error {
_, _, err := k.producer.SendMessage(&sarama.ProducerMessage{ _, _, err := k.producer.SendMessage(&sarama.ProducerMessage{
Topic: k.publishTopic, Topic: k.publishTopic,
Value: sarama.StringEncoder(fmt.Sprintf("%v", data)), Value: sarama.ByteEncoder(req.Data),
}) })
if err != nil { if err != nil {
return err return err
...@@ -86,12 +86,8 @@ func (k *Kafka) Write(data interface{}) error { ...@@ -86,12 +86,8 @@ func (k *Kafka) Write(data interface{}) error {
return nil return nil
} }
func (k *Kafka) Read(metadata interface{}) (interface{}, error) { func (k *Kafka) GetKafkaMetadata(metadata bindings.Metadata) (*KafkaMetadata, error) {
return nil, nil b, err := json.Marshal(metadata.ConnectionInfo)
}
func (k *Kafka) GetKafkaMetadata(spec EventSourceSpec) (*KafkaMetadata, error) {
b, err := json.Marshal(spec.ConnectionInfo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -119,12 +115,12 @@ func (k *Kafka) getSyncProducer(meta *KafkaMetadata) (sarama.SyncProducer, error ...@@ -119,12 +115,12 @@ func (k *Kafka) getSyncProducer(meta *KafkaMetadata) (sarama.SyncProducer, error
return producer, nil return producer, nil
} }
func (k *Kafka) ReadAsync(metadata interface{}, callback func([]byte) error) error { func (k *Kafka) Read(handler func(*bindings.ReadResponse) error) error {
config := sarama.NewConfig() config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0 config.Version = sarama.V1_0_0_0
consumer := consumer{ consumer := consumer{
callback: callback, callback: handler,
} }
ctx := context.Background() ctx := context.Background()
...@@ -153,3 +149,7 @@ func (k *Kafka) ReadAsync(metadata interface{}, callback func([]byte) error) err ...@@ -153,3 +149,7 @@ func (k *Kafka) ReadAsync(metadata interface{}, callback func([]byte) error) err
return nil return nil
} }
func (consumer *consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
package action package mqtt
import ( import (
"crypto/tls" "crypto/tls"
...@@ -12,6 +12,8 @@ import ( ...@@ -12,6 +12,8 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/actionscore/actions/pkg/components/bindings"
mqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang"
) )
...@@ -29,15 +31,15 @@ func NewMQTT() *MQTT { ...@@ -29,15 +31,15 @@ func NewMQTT() *MQTT {
return &MQTT{} return &MQTT{}
} }
func (m *MQTT) Init(eventSourceSpec EventSourceSpec) error { func (m *MQTT) Init(metadata bindings.Metadata) error {
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
metadata, err := m.GetMQTTMetadata(eventSourceSpec) mqttMeta, err := m.GetMQTTMetadata(metadata)
if err != nil { if err != nil {
return err return err
} }
m.Metadata = metadata m.Metadata = mqttMeta
if m.Metadata.URL == "" { if m.Metadata.URL == "" {
return errors.New("MQTT Error: URL required") return errors.New("MQTT Error: URL required")
...@@ -50,8 +52,8 @@ func (m *MQTT) Init(eventSourceSpec EventSourceSpec) error { ...@@ -50,8 +52,8 @@ func (m *MQTT) Init(eventSourceSpec EventSourceSpec) error {
return nil return nil
} }
func (m *MQTT) GetMQTTMetadata(eventSourceSpec EventSourceSpec) (*MQTTMetadata, error) { func (m *MQTT) GetMQTTMetadata(metadata bindings.Metadata) (*MQTTMetadata, error) {
b, err := json.Marshal(eventSourceSpec.ConnectionInfo) b, err := json.Marshal(metadata.ConnectionInfo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -65,7 +67,7 @@ func (m *MQTT) GetMQTTMetadata(eventSourceSpec EventSourceSpec) (*MQTTMetadata, ...@@ -65,7 +67,7 @@ func (m *MQTT) GetMQTTMetadata(eventSourceSpec EventSourceSpec) (*MQTTMetadata,
return &mqttMetadata, nil return &mqttMetadata, nil
} }
func (m *MQTT) Write(data interface{}) error { func (m *MQTT) Write(req *bindings.WriteRequest) error {
uri, err := url.Parse(m.Metadata.URL) uri, err := url.Parse(m.Metadata.URL)
if err != nil { if err != nil {
return err return err
...@@ -76,17 +78,13 @@ func (m *MQTT) Write(data interface{}) error { ...@@ -76,17 +78,13 @@ func (m *MQTT) Write(data interface{}) error {
return err return err
} }
client.Publish(m.Metadata.Topic, 0, false, fmt.Sprintf("%s", data)) client.Publish(m.Metadata.Topic, 0, false, string(req.Data))
client.Disconnect(0) client.Disconnect(0)
return nil return nil
} }
func (m *MQTT) Read(metadata interface{}) (interface{}, error) { func (m *MQTT) Read(handler func(*bindings.ReadResponse) error) error {
return nil, nil
}
func (m *MQTT) ReadAsync(metadata interface{}, callback func([]byte) error) error {
uri, err := url.Parse(m.Metadata.URL) uri, err := url.Parse(m.Metadata.URL)
if err != nil { if err != nil {
return err return err
...@@ -102,7 +100,9 @@ func (m *MQTT) ReadAsync(metadata interface{}, callback func([]byte) error) erro ...@@ -102,7 +100,9 @@ func (m *MQTT) ReadAsync(metadata interface{}, callback func([]byte) error) erro
client.Subscribe(m.Metadata.Topic, 0, func(client mqtt.Client, msg mqtt.Message) { client.Subscribe(m.Metadata.Topic, 0, func(client mqtt.Client, msg mqtt.Message) {
if len(msg.Payload()) > 0 { if len(msg.Payload()) > 0 {
callback(msg.Payload()) handler(&bindings.ReadResponse{
Data: msg.Payload(),
})
} }
}) })
......
package action package rabbitmq
import ( import (
"encoding/json" "encoding/json"
"github.com/actionscore/actions/pkg/components/bindings"
"github.com/streadway/amqp" "github.com/streadway/amqp"
) )
...@@ -23,8 +24,8 @@ func NewRabbitMQ() *RabbitMQ { ...@@ -23,8 +24,8 @@ func NewRabbitMQ() *RabbitMQ {
return &RabbitMQ{} return &RabbitMQ{}
} }
func (r *RabbitMQ) Init(eventSourceSpec EventSourceSpec) error { func (r *RabbitMQ) Init(metadata bindings.Metadata) error {
meta, err := r.GetRabbitMQMetadata(eventSourceSpec) meta, err := r.GetRabbitMQMetadata(metadata)
if err != nil { if err != nil {
return err return err
} }
...@@ -47,20 +48,15 @@ func (r *RabbitMQ) Init(eventSourceSpec EventSourceSpec) error { ...@@ -47,20 +48,15 @@ func (r *RabbitMQ) Init(eventSourceSpec EventSourceSpec) error {
return nil return nil
} }
func (r *RabbitMQ) Write(data interface{}) error { func (r *RabbitMQ) Write(req *bindings.WriteRequest) error {
q, err := r.channel.QueueDeclare(r.metadata.QueueName, r.metadata.Durable, r.metadata.DeleteWhenUnused, false, false, nil) q, err := r.channel.QueueDeclare(r.metadata.QueueName, r.metadata.Durable, r.metadata.DeleteWhenUnused, false, false, nil)
if err != nil { if err != nil {
return err return err
} }
b, err := json.Marshal(data)
if err != nil {
return err
}
err = r.channel.Publish("", q.Name, false, false, amqp.Publishing{ err = r.channel.Publish("", q.Name, false, false, amqp.Publishing{
ContentType: "text/plain", ContentType: "text/plain",
Body: b, Body: req.Data,
}) })
if err != nil { if err != nil {
return err return err
...@@ -69,26 +65,22 @@ func (r *RabbitMQ) Write(data interface{}) error { ...@@ -69,26 +65,22 @@ func (r *RabbitMQ) Write(data interface{}) error {
return nil return nil
} }
func (r *RabbitMQ) Read(metadata interface{}) (interface{}, error) { func (r *RabbitMQ) GetRabbitMQMetadata(metadata bindings.Metadata) (*RabbitMQMetadata, error) {
return nil, nil b, err := json.Marshal(metadata.ConnectionInfo)
}
func (r *RabbitMQ) GetRabbitMQMetadata(eventSourceSpec EventSourceSpec) (*RabbitMQMetadata, error) {
b, err := json.Marshal(eventSourceSpec.ConnectionInfo)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var metadata RabbitMQMetadata var rabbitMQMeta RabbitMQMetadata
err = json.Unmarshal(b, &metadata) err = json.Unmarshal(b, &rabbitMQMeta)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &metadata, nil return &rabbitMQMeta, nil
} }
func (r *RabbitMQ) ReadAsync(metadata interface{}, callback func([]byte) error) error { func (r *RabbitMQ) Read(handler func(*bindings.ReadResponse) error) error {
q, err := r.channel.QueueDeclare(r.metadata.QueueName, r.metadata.Durable, r.metadata.DeleteWhenUnused, false, false, nil) q, err := r.channel.QueueDeclare(r.metadata.QueueName, r.metadata.Durable, r.metadata.DeleteWhenUnused, false, false, nil)
if err != nil { if err != nil {
return err return err
...@@ -111,7 +103,14 @@ func (r *RabbitMQ) ReadAsync(metadata interface{}, callback func([]byte) error) ...@@ -111,7 +103,14 @@ func (r *RabbitMQ) ReadAsync(metadata interface{}, callback func([]byte) error)
go func() { go func() {
for d := range msgs { for d := range msgs {
callback(d.Body) if len(d.Body) > 0 {
err := handler(&bindings.ReadResponse{
Data: d.Body,
})
if err == nil {
r.channel.Ack(d.DeliveryTag, false)
}
}
} }
}() }()
......
package action package redis
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/actionscore/actions/pkg/components/bindings"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/joomcode/redispipe/redis" "github.com/joomcode/redispipe/redis"
"github.com/joomcode/redispipe/redisconn" "github.com/joomcode/redispipe/redisconn"
...@@ -14,12 +16,17 @@ type Redis struct { ...@@ -14,12 +16,17 @@ type Redis struct {
client *redis.SyncCtx client *redis.SyncCtx
} }
type RedisCredentials struct {
Host string `json:"redisHost"`
Password string `json:"redisPassword"`
}
func NewRedis() *Redis { func NewRedis() *Redis {
return &Redis{} return &Redis{}
} }
func (r *Redis) Init(eventSourceSpec EventSourceSpec) error { func (r *Redis) Init(metadata bindings.Metadata) error {
connInfo := eventSourceSpec.ConnectionInfo connInfo := metadata.ConnectionInfo
b, err := json.Marshal(connInfo) b, err := json.Marshal(connInfo)
if err != nil { if err != nil {
return err return err
...@@ -48,22 +55,12 @@ func (r *Redis) Init(eventSourceSpec EventSourceSpec) error { ...@@ -48,22 +55,12 @@ func (r *Redis) Init(eventSourceSpec EventSourceSpec) error {
return nil return nil
} }
func (r *Redis) Write(data interface{}) error { func (r *Redis) Write(req *bindings.WriteRequest) error {
key := fmt.Sprintf("es_%s", uuid.New().String()) key := fmt.Sprintf("es_%s", uuid.New().String())
value := fmt.Sprintf("%v", data) res := r.client.Do(context.Background(), "SET", key, req.Data)
res := r.client.Do(context.Background(), "SET", key, value)
if err := redis.AsError(res); err != nil { if err := redis.AsError(res); err != nil {
return err return err
} }
return nil return nil
} }
func (r *Redis) Read(metadata interface{}) (interface{}, error) {
return nil, nil
}
func (r *Redis) ReadAsync(metadata interface{}, callback func([]byte) error) error {
return nil
}
package action package sns
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
"github.com/actionscore/actions/pkg/components/bindings"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns" "github.com/aws/aws-sdk-go/service/sns"
) )
type AWSSns struct { type AWSSns struct {
Spec EventSourceSpec Spec bindings.Metadata
} }
type AWSSnsMetadata struct { type AWSSnsMetadata struct {
...@@ -29,12 +30,12 @@ func NewAWSSns() *AWSSns { ...@@ -29,12 +30,12 @@ func NewAWSSns() *AWSSns {
return &AWSSns{} return &AWSSns{}
} }
func (a *AWSSns) Init(eventSourceSpec EventSourceSpec) error { func (a *AWSSns) Init(metadata bindings.Metadata) error {
a.Spec = eventSourceSpec a.Spec = metadata
return nil return nil
} }
func (a *AWSSns) Write(data interface{}) error { func (a *AWSSns) Write(req *bindings.WriteRequest) error {
b, err := json.Marshal(a.Spec.ConnectionInfo) b, err := json.Marshal(a.Spec.ConnectionInfo)
if err != nil { if err != nil {
return err return err
...@@ -53,13 +54,8 @@ func (a *AWSSns) Write(data interface{}) error { ...@@ -53,13 +54,8 @@ func (a *AWSSns) Write(data interface{}) error {
s := session.Must(session.NewSession()) s := session.Must(session.NewSession())
c := sns.New(s) c := sns.New(s)
b, err = json.Marshal(data)
if err != nil {
return err
}
var payload SNSDataPayload var payload SNSDataPayload
err = json.Unmarshal(b, &payload) err = json.Unmarshal(req.Data, &payload)
if err != nil { if err != nil {
return err return err
} }
...@@ -80,11 +76,3 @@ func (a *AWSSns) Write(data interface{}) error { ...@@ -80,11 +76,3 @@ func (a *AWSSns) Write(data interface{}) error {
return nil return nil
} }
func (a *AWSSns) Read(metadata interface{}) (interface{}, error) {
return nil, nil
}
func (a *AWSSns) ReadAsync(metadata interface{}, callback func([]byte) error) error {
return nil
}
package action package sqs
import ( import (
"crypto/tls" "crypto/tls"
...@@ -8,13 +8,14 @@ import ( ...@@ -8,13 +8,14 @@ import (
"time" "time"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/actionscore/actions/pkg/components/bindings"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs" "github.com/aws/aws-sdk-go/service/sqs"
) )
type AWSSQS struct { type AWSSQS struct {
Spec EventSourceSpec Spec bindings.Metadata
Client *sqs.SQS Client *sqs.SQS
QueueURL *string QueueURL *string
} }
...@@ -35,10 +36,9 @@ func NewAWSSQS() *AWSSQS { ...@@ -35,10 +36,9 @@ func NewAWSSQS() *AWSSQS {
return &AWSSQS{} return &AWSSQS{}
} }
func (a *AWSSQS) Init(eventSourceSpec EventSourceSpec) error { func (a *AWSSQS) Init(metadata bindings.Metadata) error {
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
a.Spec = metadata
a.Spec = eventSourceSpec
awsMeta, err := a.GetAWSMetadata() awsMeta, err := a.GetAWSMetadata()
if err != nil { if err != nil {
...@@ -65,60 +65,18 @@ func (a *AWSSQS) Init(eventSourceSpec EventSourceSpec) error { ...@@ -65,60 +65,18 @@ func (a *AWSSQS) Init(eventSourceSpec EventSourceSpec) error {
return nil return nil
} }
func (a *AWSSQS) Write(data interface{}) error { func (a *AWSSQS) Write(req *bindings.WriteRequest) error {
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
dataBytes, err := json.Marshal(data) msgBody := string(req.Data)
if err != nil { _, err := a.Client.SendMessage(&sqs.SendMessageInput{
return err
}
msgBody := string(dataBytes)
_, err = a.Client.SendMessage(&sqs.SendMessageInput{
MessageBody: &msgBody, MessageBody: &msgBody,
QueueUrl: a.QueueURL, QueueUrl: a.QueueURL,
}) })
if err != nil { return err
return err
}
log.Info("SQS event sent successfully")
return nil
}
func (a *AWSSQS) Read(metadata interface{}) (interface{}, error) {
return nil, nil
}
func (a *AWSSQS) GetAWSMetadata() (*AWSSQSMetadata, error) {
b, err := json.Marshal(a.Spec.ConnectionInfo)
if err != nil {
return nil, err
}
var awsMeta AWSSQSMetadata
err = json.Unmarshal(b, &awsMeta)
if err != nil {
return nil, err
}
return &awsMeta, nil
} }
func (a *AWSSQS) getClient(awsMeta *AWSSQSMetadata) (*sqs.SQS, error) { func (a *AWSSQS) Read(handler func(*bindings.ReadResponse) error) error {
os.Setenv("AWS_ACCESS_KEY_ID", awsMeta.AccessKey)
os.Setenv("AWS_SECRET_ACCESS_KEY", awsMeta.SecretKey)
os.Setenv("AWS_REGION", awsMeta.Region)
s := session.Must(session.NewSession())
c := sqs.New(s)
return c, nil
}
func (a *AWSSQS) ReadAsync(metadata interface{}, callback func([]byte) error) error {
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
for { for {
...@@ -140,7 +98,10 @@ func (a *AWSSQS) ReadAsync(metadata interface{}, callback func([]byte) error) er ...@@ -140,7 +98,10 @@ func (a *AWSSQS) ReadAsync(metadata interface{}, callback func([]byte) error) er
if len(result.Messages) > 0 { if len(result.Messages) > 0 {
for _, m := range result.Messages { for _, m := range result.Messages {
body := m.Body body := m.Body
err := callback([]byte(*body)) res := bindings.ReadResponse{
Data: []byte(*body),
}
err := handler(&res)
if err == nil { if err == nil {
msgHandle := m.ReceiptHandle msgHandle := m.ReceiptHandle
...@@ -155,3 +116,29 @@ func (a *AWSSQS) ReadAsync(metadata interface{}, callback func([]byte) error) er ...@@ -155,3 +116,29 @@ func (a *AWSSQS) ReadAsync(metadata interface{}, callback func([]byte) error) er
time.Sleep(time.Millisecond * 50) time.Sleep(time.Millisecond * 50)
} }
} }
func (a *AWSSQS) GetAWSMetadata() (*AWSSQSMetadata, error) {
b, err := json.Marshal(a.Spec.ConnectionInfo)
if err != nil {
return nil, err
}
var awsMeta AWSSQSMetadata
err = json.Unmarshal(b, &awsMeta)
if err != nil {
return nil, err
}
return &awsMeta, nil
}
func (a *AWSSQS) getClient(awsMeta *AWSSQSMetadata) (*sqs.SQS, error) {
os.Setenv("AWS_ACCESS_KEY_ID", awsMeta.AccessKey)
os.Setenv("AWS_SECRET_ACCESS_KEY", awsMeta.SecretKey)
os.Setenv("AWS_REGION", awsMeta.Region)
s := session.Must(session.NewSession())
c := sqs.New(s)
return c, nil
}
package channel
type AppChannel interface {
InvokeMethod(req *InvokeRequest) (*InvokeResponse, error)
}
package grpc
import (
"context"
"fmt"
"time"
"github.com/golang/protobuf/ptypes/any"
"google.golang.org/grpc"
"github.com/actionscore/actions/pkg/channel"
pb "github.com/actionscore/actions/pkg/proto"
)
type GRPCChannel struct {
client *grpc.ClientConn
baseAddress string
}
func CreateLocalChannel(port int, conn *grpc.ClientConn) *GRPCChannel {
return &GRPCChannel{
client: conn,
baseAddress: fmt.Sprintf("127.0.0.1:%v", port),
}
}
func (g *GRPCChannel) InvokeMethod(req *channel.InvokeRequest) (*channel.InvokeResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*1)
defer cancel()
c := pb.NewAppClient(g.client)
msg := pb.AppMethodCallEnvelope{
Data: &any.Any{Value: req.Payload},
Method: req.Method,
}
resp, err := c.OnMethodCall(ctx, &msg)
if err != nil {
return nil, err
}
return &channel.InvokeResponse{
Data: resp.Value,
Metadata: map[string]string{},
}, nil
}
package http
import (
"crypto/tls"
"fmt"
"time"
"github.com/actionscore/actions/pkg/channel"
"github.com/valyala/fasthttp"
)
const (
HTTPVerb = "http.verb"
HTTPStatusCode = "http.status_code"
Get = "GET"
Post = "POST"
Delete = "DELETE"
Put = "PUT"
Options = "OPTIONS"
)
type HTTPChannel struct {
client *fasthttp.Client
baseAddress string
}
func (h *HTTPChannel) InvokeMethod(invokeRequest *channel.InvokeRequest) (*channel.InvokeResponse, error) {
req := fasthttp.AcquireRequest()
uri := fmt.Sprintf("%s/%s", h.baseAddress, invokeRequest.Method)
req.SetRequestURI(uri)
req.SetBody(invokeRequest.Payload)
req.Header.SetContentType("application/json")
method := invokeRequest.Metadata[HTTPVerb]
if method == "" {
method = Post
}
req.Header.SetMethod(method)
resp := fasthttp.AcquireResponse()
err := h.client.Do(req, resp)
if err != nil {
return nil, err
}
body := resp.Body()
arr := make([]byte, len(body))
copy(arr, body)
statusCode := resp.StatusCode()
fasthttp.ReleaseRequest(req)
fasthttp.ReleaseResponse(resp)
return &channel.InvokeResponse{
Metadata: map[string]string{HTTPStatusCode: fmt.Sprintf("%v", statusCode)},
Data: arr,
}, nil
}
func CreateLocalChannel(port int) (channel.AppChannel, error) {
return &HTTPChannel{
client: &fasthttp.Client{MaxConnsPerHost: 1000000, TLSConfig: &tls.Config{InsecureSkipVerify: true}, ReadTimeout: time.Second * 60},
baseAddress: fmt.Sprintf("http://127.0.0.1:%v", port),
}, nil
}
package channel
type InvokeRequest struct {
Method string `json:"method"`
Payload []byte `json:"payload"`
Metadata map[string]string `json:"metadata"`
}
package channel
type InvokeResponse struct {
Data []byte `json:"data"`
Metadata map[string]string `json:"metadata"`
}
...@@ -19,8 +19,10 @@ limitations under the License. ...@@ -19,8 +19,10 @@ limitations under the License.
package versioned package versioned
import ( import (
"fmt"
componentsv1alpha1 "github.com/actionscore/actions/pkg/client/clientset/versioned/typed/components/v1alpha1"
configurationv1alpha1 "github.com/actionscore/actions/pkg/client/clientset/versioned/typed/configuration/v1alpha1" configurationv1alpha1 "github.com/actionscore/actions/pkg/client/clientset/versioned/typed/configuration/v1alpha1"
eventingv1alpha1 "github.com/actionscore/actions/pkg/client/clientset/versioned/typed/eventing/v1alpha1"
discovery "k8s.io/client-go/discovery" discovery "k8s.io/client-go/discovery"
rest "k8s.io/client-go/rest" rest "k8s.io/client-go/rest"
flowcontrol "k8s.io/client-go/util/flowcontrol" flowcontrol "k8s.io/client-go/util/flowcontrol"
...@@ -28,16 +30,21 @@ import ( ...@@ -28,16 +30,21 @@ import (
type Interface interface { type Interface interface {
Discovery() discovery.DiscoveryInterface Discovery() discovery.DiscoveryInterface
ComponentsV1alpha1() componentsv1alpha1.ComponentsV1alpha1Interface
ConfigurationV1alpha1() configurationv1alpha1.ConfigurationV1alpha1Interface ConfigurationV1alpha1() configurationv1alpha1.ConfigurationV1alpha1Interface
EventingV1alpha1() eventingv1alpha1.EventingV1alpha1Interface
} }
// Clientset contains the clients for groups. Each group has exactly one // Clientset contains the clients for groups. Each group has exactly one
// version included in a Clientset. // version included in a Clientset.
type Clientset struct { type Clientset struct {
*discovery.DiscoveryClient *discovery.DiscoveryClient
componentsV1alpha1 *componentsv1alpha1.ComponentsV1alpha1Client
configurationV1alpha1 *configurationv1alpha1.ConfigurationV1alpha1Client configurationV1alpha1 *configurationv1alpha1.ConfigurationV1alpha1Client
eventingV1alpha1 *eventingv1alpha1.EventingV1alpha1Client }
// ComponentsV1alpha1 retrieves the ComponentsV1alpha1Client
func (c *Clientset) ComponentsV1alpha1() componentsv1alpha1.ComponentsV1alpha1Interface {
return c.componentsV1alpha1
} }
// ConfigurationV1alpha1 retrieves the ConfigurationV1alpha1Client // ConfigurationV1alpha1 retrieves the ConfigurationV1alpha1Client
...@@ -45,11 +52,6 @@ func (c *Clientset) ConfigurationV1alpha1() configurationv1alpha1.ConfigurationV ...@@ -45,11 +52,6 @@ func (c *Clientset) ConfigurationV1alpha1() configurationv1alpha1.ConfigurationV
return c.configurationV1alpha1 return c.configurationV1alpha1
} }
// EventingV1alpha1 retrieves the EventingV1alpha1Client
func (c *Clientset) EventingV1alpha1() eventingv1alpha1.EventingV1alpha1Interface {
return c.eventingV1alpha1
}
// Discovery retrieves the DiscoveryClient // Discovery retrieves the DiscoveryClient
func (c *Clientset) Discovery() discovery.DiscoveryInterface { func (c *Clientset) Discovery() discovery.DiscoveryInterface {
if c == nil { if c == nil {
...@@ -59,18 +61,23 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface { ...@@ -59,18 +61,23 @@ func (c *Clientset) Discovery() discovery.DiscoveryInterface {
} }
// NewForConfig creates a new Clientset for the given config. // NewForConfig creates a new Clientset for the given config.
// If config's RateLimiter is not set and QPS and Burst are acceptable,
// NewForConfig will generate a rate-limiter in configShallowCopy.
func NewForConfig(c *rest.Config) (*Clientset, error) { func NewForConfig(c *rest.Config) (*Clientset, error) {
configShallowCopy := *c configShallowCopy := *c
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 { if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
if configShallowCopy.Burst <= 0 {
return nil, fmt.Errorf("Burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0")
}
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst) configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
} }
var cs Clientset var cs Clientset
var err error var err error
cs.configurationV1alpha1, err = configurationv1alpha1.NewForConfig(&configShallowCopy) cs.componentsV1alpha1, err = componentsv1alpha1.NewForConfig(&configShallowCopy)
if err != nil { if err != nil {
return nil, err return nil, err
} }
cs.eventingV1alpha1, err = eventingv1alpha1.NewForConfig(&configShallowCopy) cs.configurationV1alpha1, err = configurationv1alpha1.NewForConfig(&configShallowCopy)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -86,8 +93,8 @@ func NewForConfig(c *rest.Config) (*Clientset, error) { ...@@ -86,8 +93,8 @@ func NewForConfig(c *rest.Config) (*Clientset, error) {
// panics if there is an error in the config. // panics if there is an error in the config.
func NewForConfigOrDie(c *rest.Config) *Clientset { func NewForConfigOrDie(c *rest.Config) *Clientset {
var cs Clientset var cs Clientset
cs.componentsV1alpha1 = componentsv1alpha1.NewForConfigOrDie(c)
cs.configurationV1alpha1 = configurationv1alpha1.NewForConfigOrDie(c) cs.configurationV1alpha1 = configurationv1alpha1.NewForConfigOrDie(c)
cs.eventingV1alpha1 = eventingv1alpha1.NewForConfigOrDie(c)
cs.DiscoveryClient = discovery.NewDiscoveryClientForConfigOrDie(c) cs.DiscoveryClient = discovery.NewDiscoveryClientForConfigOrDie(c)
return &cs return &cs
...@@ -96,8 +103,8 @@ func NewForConfigOrDie(c *rest.Config) *Clientset { ...@@ -96,8 +103,8 @@ func NewForConfigOrDie(c *rest.Config) *Clientset {
// New creates a new Clientset for the given RESTClient. // New creates a new Clientset for the given RESTClient.
func New(c rest.Interface) *Clientset { func New(c rest.Interface) *Clientset {
var cs Clientset var cs Clientset
cs.componentsV1alpha1 = componentsv1alpha1.New(c)
cs.configurationV1alpha1 = configurationv1alpha1.New(c) cs.configurationV1alpha1 = configurationv1alpha1.New(c)
cs.eventingV1alpha1 = eventingv1alpha1.New(c)
cs.DiscoveryClient = discovery.NewDiscoveryClient(c) cs.DiscoveryClient = discovery.NewDiscoveryClient(c)
return &cs return &cs
......
...@@ -20,10 +20,10 @@ package fake ...@@ -20,10 +20,10 @@ package fake
import ( import (
clientset "github.com/actionscore/actions/pkg/client/clientset/versioned" clientset "github.com/actionscore/actions/pkg/client/clientset/versioned"
componentsv1alpha1 "github.com/actionscore/actions/pkg/client/clientset/versioned/typed/components/v1alpha1"
fakecomponentsv1alpha1 "github.com/actionscore/actions/pkg/client/clientset/versioned/typed/components/v1alpha1/fake"
configurationv1alpha1 "github.com/actionscore/actions/pkg/client/clientset/versioned/typed/configuration/v1alpha1" configurationv1alpha1 "github.com/actionscore/actions/pkg/client/clientset/versioned/typed/configuration/v1alpha1"
fakeconfigurationv1alpha1 "github.com/actionscore/actions/pkg/client/clientset/versioned/typed/configuration/v1alpha1/fake" fakeconfigurationv1alpha1 "github.com/actionscore/actions/pkg/client/clientset/versioned/typed/configuration/v1alpha1/fake"
eventingv1alpha1 "github.com/actionscore/actions/pkg/client/clientset/versioned/typed/eventing/v1alpha1"
fakeeventingv1alpha1 "github.com/actionscore/actions/pkg/client/clientset/versioned/typed/eventing/v1alpha1/fake"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
...@@ -78,12 +78,12 @@ func (c *Clientset) Tracker() testing.ObjectTracker { ...@@ -78,12 +78,12 @@ func (c *Clientset) Tracker() testing.ObjectTracker {
var _ clientset.Interface = &Clientset{} var _ clientset.Interface = &Clientset{}
// ComponentsV1alpha1 retrieves the ComponentsV1alpha1Client
func (c *Clientset) ComponentsV1alpha1() componentsv1alpha1.ComponentsV1alpha1Interface {
return &fakecomponentsv1alpha1.FakeComponentsV1alpha1{Fake: &c.Fake}
}
// ConfigurationV1alpha1 retrieves the ConfigurationV1alpha1Client // ConfigurationV1alpha1 retrieves the ConfigurationV1alpha1Client
func (c *Clientset) ConfigurationV1alpha1() configurationv1alpha1.ConfigurationV1alpha1Interface { func (c *Clientset) ConfigurationV1alpha1() configurationv1alpha1.ConfigurationV1alpha1Interface {
return &fakeconfigurationv1alpha1.FakeConfigurationV1alpha1{Fake: &c.Fake} return &fakeconfigurationv1alpha1.FakeConfigurationV1alpha1{Fake: &c.Fake}
} }
// EventingV1alpha1 retrieves the EventingV1alpha1Client
func (c *Clientset) EventingV1alpha1() eventingv1alpha1.EventingV1alpha1Interface {
return &fakeeventingv1alpha1.FakeEventingV1alpha1{Fake: &c.Fake}
}
...@@ -19,8 +19,8 @@ limitations under the License. ...@@ -19,8 +19,8 @@ limitations under the License.
package fake package fake
import ( import (
componentsv1alpha1 "github.com/actionscore/actions/pkg/apis/components/v1alpha1"
configurationv1alpha1 "github.com/actionscore/actions/pkg/apis/configuration/v1alpha1" configurationv1alpha1 "github.com/actionscore/actions/pkg/apis/configuration/v1alpha1"
eventingv1alpha1 "github.com/actionscore/actions/pkg/apis/eventing/v1alpha1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime" runtime "k8s.io/apimachinery/pkg/runtime"
schema "k8s.io/apimachinery/pkg/runtime/schema" schema "k8s.io/apimachinery/pkg/runtime/schema"
...@@ -32,8 +32,8 @@ var scheme = runtime.NewScheme() ...@@ -32,8 +32,8 @@ var scheme = runtime.NewScheme()
var codecs = serializer.NewCodecFactory(scheme) var codecs = serializer.NewCodecFactory(scheme)
var parameterCodec = runtime.NewParameterCodec(scheme) var parameterCodec = runtime.NewParameterCodec(scheme)
var localSchemeBuilder = runtime.SchemeBuilder{ var localSchemeBuilder = runtime.SchemeBuilder{
componentsv1alpha1.AddToScheme,
configurationv1alpha1.AddToScheme, configurationv1alpha1.AddToScheme,
eventingv1alpha1.AddToScheme,
} }
// AddToScheme adds all types of this clientset into the given scheme. This allows composition // AddToScheme adds all types of this clientset into the given scheme. This allows composition
......
...@@ -19,8 +19,8 @@ limitations under the License. ...@@ -19,8 +19,8 @@ limitations under the License.
package scheme package scheme
import ( import (
componentsv1alpha1 "github.com/actionscore/actions/pkg/apis/components/v1alpha1"
configurationv1alpha1 "github.com/actionscore/actions/pkg/apis/configuration/v1alpha1" configurationv1alpha1 "github.com/actionscore/actions/pkg/apis/configuration/v1alpha1"
eventingv1alpha1 "github.com/actionscore/actions/pkg/apis/eventing/v1alpha1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime" runtime "k8s.io/apimachinery/pkg/runtime"
schema "k8s.io/apimachinery/pkg/runtime/schema" schema "k8s.io/apimachinery/pkg/runtime/schema"
...@@ -32,8 +32,8 @@ var Scheme = runtime.NewScheme() ...@@ -32,8 +32,8 @@ var Scheme = runtime.NewScheme()
var Codecs = serializer.NewCodecFactory(Scheme) var Codecs = serializer.NewCodecFactory(Scheme)
var ParameterCodec = runtime.NewParameterCodec(Scheme) var ParameterCodec = runtime.NewParameterCodec(Scheme)
var localSchemeBuilder = runtime.SchemeBuilder{ var localSchemeBuilder = runtime.SchemeBuilder{
componentsv1alpha1.AddToScheme,
configurationv1alpha1.AddToScheme, configurationv1alpha1.AddToScheme,
eventingv1alpha1.AddToScheme,
} }
// AddToScheme adds all types of this clientset into the given scheme. This allows composition // AddToScheme adds all types of this clientset into the given scheme. This allows composition
......
...@@ -21,7 +21,7 @@ package v1alpha1 ...@@ -21,7 +21,7 @@ package v1alpha1
import ( import (
"time" "time"
v1alpha1 "github.com/actionscore/actions/pkg/apis/eventing/v1alpha1" v1alpha1 "github.com/actionscore/actions/pkg/apis/components/v1alpha1"
scheme "github.com/actionscore/actions/pkg/client/clientset/versioned/scheme" scheme "github.com/actionscore/actions/pkg/client/clientset/versioned/scheme"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types" types "k8s.io/apimachinery/pkg/types"
...@@ -29,45 +29,45 @@ import ( ...@@ -29,45 +29,45 @@ import (
rest "k8s.io/client-go/rest" rest "k8s.io/client-go/rest"
) )
// EventSourcesGetter has a method to return a EventSourceInterface. // ComponentsGetter has a method to return a ComponentInterface.
// A group's client should implement this interface. // A group's client should implement this interface.
type EventSourcesGetter interface { type ComponentsGetter interface {
EventSources(namespace string) EventSourceInterface Components(namespace string) ComponentInterface
} }
// EventSourceInterface has methods to work with EventSource resources. // ComponentInterface has methods to work with Component resources.
type EventSourceInterface interface { type ComponentInterface interface {
Create(*v1alpha1.EventSource) (*v1alpha1.EventSource, error) Create(*v1alpha1.Component) (*v1alpha1.Component, error)
Update(*v1alpha1.EventSource) (*v1alpha1.EventSource, error) Update(*v1alpha1.Component) (*v1alpha1.Component, error)
Delete(name string, options *v1.DeleteOptions) error Delete(name string, options *v1.DeleteOptions) error
DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error
Get(name string, options v1.GetOptions) (*v1alpha1.EventSource, error) Get(name string, options v1.GetOptions) (*v1alpha1.Component, error)
List(opts v1.ListOptions) (*v1alpha1.EventSourceList, error) List(opts v1.ListOptions) (*v1alpha1.ComponentList, error)
Watch(opts v1.ListOptions) (watch.Interface, error) Watch(opts v1.ListOptions) (watch.Interface, error)
Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.EventSource, err error) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.Component, err error)
EventSourceExpansion ComponentExpansion
} }
// eventSources implements EventSourceInterface // components implements ComponentInterface
type eventSources struct { type components struct {
client rest.Interface client rest.Interface
ns string ns string
} }
// newEventSources returns a EventSources // newComponents returns a Components
func newEventSources(c *EventingV1alpha1Client, namespace string) *eventSources { func newComponents(c *ComponentsV1alpha1Client, namespace string) *components {
return &eventSources{ return &components{
client: c.RESTClient(), client: c.RESTClient(),
ns: namespace, ns: namespace,
} }
} }
// Get takes name of the eventSource, and returns the corresponding eventSource object, and an error if there is any. // Get takes name of the component, and returns the corresponding component object, and an error if there is any.
func (c *eventSources) Get(name string, options v1.GetOptions) (result *v1alpha1.EventSource, err error) { func (c *components) Get(name string, options v1.GetOptions) (result *v1alpha1.Component, err error) {
result = &v1alpha1.EventSource{} result = &v1alpha1.Component{}
err = c.client.Get(). err = c.client.Get().
Namespace(c.ns). Namespace(c.ns).
Resource("eventsources"). Resource("components").
Name(name). Name(name).
VersionedParams(&options, scheme.ParameterCodec). VersionedParams(&options, scheme.ParameterCodec).
Do(). Do().
...@@ -75,16 +75,16 @@ func (c *eventSources) Get(name string, options v1.GetOptions) (result *v1alpha1 ...@@ -75,16 +75,16 @@ func (c *eventSources) Get(name string, options v1.GetOptions) (result *v1alpha1
return return
} }
// List takes label and field selectors, and returns the list of EventSources that match those selectors. // List takes label and field selectors, and returns the list of Components that match those selectors.
func (c *eventSources) List(opts v1.ListOptions) (result *v1alpha1.EventSourceList, err error) { func (c *components) List(opts v1.ListOptions) (result *v1alpha1.ComponentList, err error) {
var timeout time.Duration var timeout time.Duration
if opts.TimeoutSeconds != nil { if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
} }
result = &v1alpha1.EventSourceList{} result = &v1alpha1.ComponentList{}
err = c.client.Get(). err = c.client.Get().
Namespace(c.ns). Namespace(c.ns).
Resource("eventsources"). Resource("components").
VersionedParams(&opts, scheme.ParameterCodec). VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout). Timeout(timeout).
Do(). Do().
...@@ -92,8 +92,8 @@ func (c *eventSources) List(opts v1.ListOptions) (result *v1alpha1.EventSourceLi ...@@ -92,8 +92,8 @@ func (c *eventSources) List(opts v1.ListOptions) (result *v1alpha1.EventSourceLi
return return
} }
// Watch returns a watch.Interface that watches the requested eventSources. // Watch returns a watch.Interface that watches the requested components.
func (c *eventSources) Watch(opts v1.ListOptions) (watch.Interface, error) { func (c *components) Watch(opts v1.ListOptions) (watch.Interface, error) {
var timeout time.Duration var timeout time.Duration
if opts.TimeoutSeconds != nil { if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
...@@ -101,42 +101,42 @@ func (c *eventSources) Watch(opts v1.ListOptions) (watch.Interface, error) { ...@@ -101,42 +101,42 @@ func (c *eventSources) Watch(opts v1.ListOptions) (watch.Interface, error) {
opts.Watch = true opts.Watch = true
return c.client.Get(). return c.client.Get().
Namespace(c.ns). Namespace(c.ns).
Resource("eventsources"). Resource("components").
VersionedParams(&opts, scheme.ParameterCodec). VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout). Timeout(timeout).
Watch() Watch()
} }
// Create takes the representation of a eventSource and creates it. Returns the server's representation of the eventSource, and an error, if there is any. // Create takes the representation of a component and creates it. Returns the server's representation of the component, and an error, if there is any.
func (c *eventSources) Create(eventSource *v1alpha1.EventSource) (result *v1alpha1.EventSource, err error) { func (c *components) Create(component *v1alpha1.Component) (result *v1alpha1.Component, err error) {
result = &v1alpha1.EventSource{} result = &v1alpha1.Component{}
err = c.client.Post(). err = c.client.Post().
Namespace(c.ns). Namespace(c.ns).
Resource("eventsources"). Resource("components").
Body(eventSource). Body(component).
Do(). Do().
Into(result) Into(result)
return return
} }
// Update takes the representation of a eventSource and updates it. Returns the server's representation of the eventSource, and an error, if there is any. // Update takes the representation of a component and updates it. Returns the server's representation of the component, and an error, if there is any.
func (c *eventSources) Update(eventSource *v1alpha1.EventSource) (result *v1alpha1.EventSource, err error) { func (c *components) Update(component *v1alpha1.Component) (result *v1alpha1.Component, err error) {
result = &v1alpha1.EventSource{} result = &v1alpha1.Component{}
err = c.client.Put(). err = c.client.Put().
Namespace(c.ns). Namespace(c.ns).
Resource("eventsources"). Resource("components").
Name(eventSource.Name). Name(component.Name).
Body(eventSource). Body(component).
Do(). Do().
Into(result) Into(result)
return return
} }
// Delete takes name of the eventSource and deletes it. Returns an error if one occurs. // Delete takes name of the component and deletes it. Returns an error if one occurs.
func (c *eventSources) Delete(name string, options *v1.DeleteOptions) error { func (c *components) Delete(name string, options *v1.DeleteOptions) error {
return c.client.Delete(). return c.client.Delete().
Namespace(c.ns). Namespace(c.ns).
Resource("eventsources"). Resource("components").
Name(name). Name(name).
Body(options). Body(options).
Do(). Do().
...@@ -144,14 +144,14 @@ func (c *eventSources) Delete(name string, options *v1.DeleteOptions) error { ...@@ -144,14 +144,14 @@ func (c *eventSources) Delete(name string, options *v1.DeleteOptions) error {
} }
// DeleteCollection deletes a collection of objects. // DeleteCollection deletes a collection of objects.
func (c *eventSources) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { func (c *components) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error {
var timeout time.Duration var timeout time.Duration
if listOptions.TimeoutSeconds != nil { if listOptions.TimeoutSeconds != nil {
timeout = time.Duration(*listOptions.TimeoutSeconds) * time.Second timeout = time.Duration(*listOptions.TimeoutSeconds) * time.Second
} }
return c.client.Delete(). return c.client.Delete().
Namespace(c.ns). Namespace(c.ns).
Resource("eventsources"). Resource("components").
VersionedParams(&listOptions, scheme.ParameterCodec). VersionedParams(&listOptions, scheme.ParameterCodec).
Timeout(timeout). Timeout(timeout).
Body(options). Body(options).
...@@ -159,12 +159,12 @@ func (c *eventSources) DeleteCollection(options *v1.DeleteOptions, listOptions v ...@@ -159,12 +159,12 @@ func (c *eventSources) DeleteCollection(options *v1.DeleteOptions, listOptions v
Error() Error()
} }
// Patch applies the patch and returns the patched eventSource. // Patch applies the patch and returns the patched component.
func (c *eventSources) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.EventSource, err error) { func (c *components) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.Component, err error) {
result = &v1alpha1.EventSource{} result = &v1alpha1.Component{}
err = c.client.Patch(pt). err = c.client.Patch(pt).
Namespace(c.ns). Namespace(c.ns).
Resource("eventsources"). Resource("components").
SubResource(subresources...). SubResource(subresources...).
Name(name). Name(name).
Body(data). Body(data).
......
...@@ -19,27 +19,27 @@ limitations under the License. ...@@ -19,27 +19,27 @@ limitations under the License.
package v1alpha1 package v1alpha1
import ( import (
v1alpha1 "github.com/actionscore/actions/pkg/apis/eventing/v1alpha1" v1alpha1 "github.com/actionscore/actions/pkg/apis/components/v1alpha1"
"github.com/actionscore/actions/pkg/client/clientset/versioned/scheme" "github.com/actionscore/actions/pkg/client/clientset/versioned/scheme"
rest "k8s.io/client-go/rest" rest "k8s.io/client-go/rest"
) )
type EventingV1alpha1Interface interface { type ComponentsV1alpha1Interface interface {
RESTClient() rest.Interface RESTClient() rest.Interface
EventSourcesGetter ComponentsGetter
} }
// EventingV1alpha1Client is used to interact with features provided by the eventing.actions.io group. // ComponentsV1alpha1Client is used to interact with features provided by the components.actions.io group.
type EventingV1alpha1Client struct { type ComponentsV1alpha1Client struct {
restClient rest.Interface restClient rest.Interface
} }
func (c *EventingV1alpha1Client) EventSources(namespace string) EventSourceInterface { func (c *ComponentsV1alpha1Client) Components(namespace string) ComponentInterface {
return newEventSources(c, namespace) return newComponents(c, namespace)
} }
// NewForConfig creates a new EventingV1alpha1Client for the given config. // NewForConfig creates a new ComponentsV1alpha1Client for the given config.
func NewForConfig(c *rest.Config) (*EventingV1alpha1Client, error) { func NewForConfig(c *rest.Config) (*ComponentsV1alpha1Client, error) {
config := *c config := *c
if err := setConfigDefaults(&config); err != nil { if err := setConfigDefaults(&config); err != nil {
return nil, err return nil, err
...@@ -48,12 +48,12 @@ func NewForConfig(c *rest.Config) (*EventingV1alpha1Client, error) { ...@@ -48,12 +48,12 @@ func NewForConfig(c *rest.Config) (*EventingV1alpha1Client, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &EventingV1alpha1Client{client}, nil return &ComponentsV1alpha1Client{client}, nil
} }
// NewForConfigOrDie creates a new EventingV1alpha1Client for the given config and // NewForConfigOrDie creates a new ComponentsV1alpha1Client for the given config and
// panics if there is an error in the config. // panics if there is an error in the config.
func NewForConfigOrDie(c *rest.Config) *EventingV1alpha1Client { func NewForConfigOrDie(c *rest.Config) *ComponentsV1alpha1Client {
client, err := NewForConfig(c) client, err := NewForConfig(c)
if err != nil { if err != nil {
panic(err) panic(err)
...@@ -61,9 +61,9 @@ func NewForConfigOrDie(c *rest.Config) *EventingV1alpha1Client { ...@@ -61,9 +61,9 @@ func NewForConfigOrDie(c *rest.Config) *EventingV1alpha1Client {
return client return client
} }
// New creates a new EventingV1alpha1Client for the given RESTClient. // New creates a new ComponentsV1alpha1Client for the given RESTClient.
func New(c rest.Interface) *EventingV1alpha1Client { func New(c rest.Interface) *ComponentsV1alpha1Client {
return &EventingV1alpha1Client{c} return &ComponentsV1alpha1Client{c}
} }
func setConfigDefaults(config *rest.Config) error { func setConfigDefaults(config *rest.Config) error {
...@@ -81,7 +81,7 @@ func setConfigDefaults(config *rest.Config) error { ...@@ -81,7 +81,7 @@ func setConfigDefaults(config *rest.Config) error {
// RESTClient returns a RESTClient that is used to communicate // RESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation. // with API server by this client implementation.
func (c *EventingV1alpha1Client) RESTClient() rest.Interface { func (c *ComponentsV1alpha1Client) RESTClient() rest.Interface {
if c == nil { if c == nil {
return nil return nil
} }
......
...@@ -19,7 +19,7 @@ limitations under the License. ...@@ -19,7 +19,7 @@ limitations under the License.
package fake package fake
import ( import (
v1alpha1 "github.com/actionscore/actions/pkg/apis/eventing/v1alpha1" v1alpha1 "github.com/actionscore/actions/pkg/apis/components/v1alpha1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
labels "k8s.io/apimachinery/pkg/labels" labels "k8s.io/apimachinery/pkg/labels"
schema "k8s.io/apimachinery/pkg/runtime/schema" schema "k8s.io/apimachinery/pkg/runtime/schema"
...@@ -28,31 +28,31 @@ import ( ...@@ -28,31 +28,31 @@ import (
testing "k8s.io/client-go/testing" testing "k8s.io/client-go/testing"
) )
// FakeEventSources implements EventSourceInterface // FakeComponents implements ComponentInterface
type FakeEventSources struct { type FakeComponents struct {
Fake *FakeEventingV1alpha1 Fake *FakeComponentsV1alpha1
ns string ns string
} }
var eventsourcesResource = schema.GroupVersionResource{Group: "eventing.actions.io", Version: "v1alpha1", Resource: "eventsources"} var componentsResource = schema.GroupVersionResource{Group: "components.actions.io", Version: "v1alpha1", Resource: "components"}
var eventsourcesKind = schema.GroupVersionKind{Group: "eventing.actions.io", Version: "v1alpha1", Kind: "EventSource"} var componentsKind = schema.GroupVersionKind{Group: "components.actions.io", Version: "v1alpha1", Kind: "Component"}
// Get takes name of the eventSource, and returns the corresponding eventSource object, and an error if there is any. // Get takes name of the component, and returns the corresponding component object, and an error if there is any.
func (c *FakeEventSources) Get(name string, options v1.GetOptions) (result *v1alpha1.EventSource, err error) { func (c *FakeComponents) Get(name string, options v1.GetOptions) (result *v1alpha1.Component, err error) {
obj, err := c.Fake. obj, err := c.Fake.
Invokes(testing.NewGetAction(eventsourcesResource, c.ns, name), &v1alpha1.EventSource{}) Invokes(testing.NewGetAction(componentsResource, c.ns, name), &v1alpha1.Component{})
if obj == nil { if obj == nil {
return nil, err return nil, err
} }
return obj.(*v1alpha1.EventSource), err return obj.(*v1alpha1.Component), err
} }
// List takes label and field selectors, and returns the list of EventSources that match those selectors. // List takes label and field selectors, and returns the list of Components that match those selectors.
func (c *FakeEventSources) List(opts v1.ListOptions) (result *v1alpha1.EventSourceList, err error) { func (c *FakeComponents) List(opts v1.ListOptions) (result *v1alpha1.ComponentList, err error) {
obj, err := c.Fake. obj, err := c.Fake.
Invokes(testing.NewListAction(eventsourcesResource, eventsourcesKind, c.ns, opts), &v1alpha1.EventSourceList{}) Invokes(testing.NewListAction(componentsResource, componentsKind, c.ns, opts), &v1alpha1.ComponentList{})
if obj == nil { if obj == nil {
return nil, err return nil, err
...@@ -62,8 +62,8 @@ func (c *FakeEventSources) List(opts v1.ListOptions) (result *v1alpha1.EventSour ...@@ -62,8 +62,8 @@ func (c *FakeEventSources) List(opts v1.ListOptions) (result *v1alpha1.EventSour
if label == nil { if label == nil {
label = labels.Everything() label = labels.Everything()
} }
list := &v1alpha1.EventSourceList{ListMeta: obj.(*v1alpha1.EventSourceList).ListMeta} list := &v1alpha1.ComponentList{ListMeta: obj.(*v1alpha1.ComponentList).ListMeta}
for _, item := range obj.(*v1alpha1.EventSourceList).Items { for _, item := range obj.(*v1alpha1.ComponentList).Items {
if label.Matches(labels.Set(item.Labels)) { if label.Matches(labels.Set(item.Labels)) {
list.Items = append(list.Items, item) list.Items = append(list.Items, item)
} }
...@@ -71,58 +71,58 @@ func (c *FakeEventSources) List(opts v1.ListOptions) (result *v1alpha1.EventSour ...@@ -71,58 +71,58 @@ func (c *FakeEventSources) List(opts v1.ListOptions) (result *v1alpha1.EventSour
return list, err return list, err
} }
// Watch returns a watch.Interface that watches the requested eventSources. // Watch returns a watch.Interface that watches the requested components.
func (c *FakeEventSources) Watch(opts v1.ListOptions) (watch.Interface, error) { func (c *FakeComponents) Watch(opts v1.ListOptions) (watch.Interface, error) {
return c.Fake. return c.Fake.
InvokesWatch(testing.NewWatchAction(eventsourcesResource, c.ns, opts)) InvokesWatch(testing.NewWatchAction(componentsResource, c.ns, opts))
} }
// Create takes the representation of a eventSource and creates it. Returns the server's representation of the eventSource, and an error, if there is any. // Create takes the representation of a component and creates it. Returns the server's representation of the component, and an error, if there is any.
func (c *FakeEventSources) Create(eventSource *v1alpha1.EventSource) (result *v1alpha1.EventSource, err error) { func (c *FakeComponents) Create(component *v1alpha1.Component) (result *v1alpha1.Component, err error) {
obj, err := c.Fake. obj, err := c.Fake.
Invokes(testing.NewCreateAction(eventsourcesResource, c.ns, eventSource), &v1alpha1.EventSource{}) Invokes(testing.NewCreateAction(componentsResource, c.ns, component), &v1alpha1.Component{})
if obj == nil { if obj == nil {
return nil, err return nil, err
} }
return obj.(*v1alpha1.EventSource), err return obj.(*v1alpha1.Component), err
} }
// Update takes the representation of a eventSource and updates it. Returns the server's representation of the eventSource, and an error, if there is any. // Update takes the representation of a component and updates it. Returns the server's representation of the component, and an error, if there is any.
func (c *FakeEventSources) Update(eventSource *v1alpha1.EventSource) (result *v1alpha1.EventSource, err error) { func (c *FakeComponents) Update(component *v1alpha1.Component) (result *v1alpha1.Component, err error) {
obj, err := c.Fake. obj, err := c.Fake.
Invokes(testing.NewUpdateAction(eventsourcesResource, c.ns, eventSource), &v1alpha1.EventSource{}) Invokes(testing.NewUpdateAction(componentsResource, c.ns, component), &v1alpha1.Component{})
if obj == nil { if obj == nil {
return nil, err return nil, err
} }
return obj.(*v1alpha1.EventSource), err return obj.(*v1alpha1.Component), err
} }
// Delete takes name of the eventSource and deletes it. Returns an error if one occurs. // Delete takes name of the component and deletes it. Returns an error if one occurs.
func (c *FakeEventSources) Delete(name string, options *v1.DeleteOptions) error { func (c *FakeComponents) Delete(name string, options *v1.DeleteOptions) error {
_, err := c.Fake. _, err := c.Fake.
Invokes(testing.NewDeleteAction(eventsourcesResource, c.ns, name), &v1alpha1.EventSource{}) Invokes(testing.NewDeleteAction(componentsResource, c.ns, name), &v1alpha1.Component{})
return err return err
} }
// DeleteCollection deletes a collection of objects. // DeleteCollection deletes a collection of objects.
func (c *FakeEventSources) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { func (c *FakeComponents) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error {
action := testing.NewDeleteCollectionAction(eventsourcesResource, c.ns, listOptions) action := testing.NewDeleteCollectionAction(componentsResource, c.ns, listOptions)
_, err := c.Fake.Invokes(action, &v1alpha1.EventSourceList{}) _, err := c.Fake.Invokes(action, &v1alpha1.ComponentList{})
return err return err
} }
// Patch applies the patch and returns the patched eventSource. // Patch applies the patch and returns the patched component.
func (c *FakeEventSources) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.EventSource, err error) { func (c *FakeComponents) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.Component, err error) {
obj, err := c.Fake. obj, err := c.Fake.
Invokes(testing.NewPatchSubresourceAction(eventsourcesResource, c.ns, name, pt, data, subresources...), &v1alpha1.EventSource{}) Invokes(testing.NewPatchSubresourceAction(componentsResource, c.ns, name, pt, data, subresources...), &v1alpha1.Component{})
if obj == nil { if obj == nil {
return nil, err return nil, err
} }
return obj.(*v1alpha1.EventSource), err return obj.(*v1alpha1.Component), err
} }
...@@ -19,22 +19,22 @@ limitations under the License. ...@@ -19,22 +19,22 @@ limitations under the License.
package fake package fake
import ( import (
v1alpha1 "github.com/actionscore/actions/pkg/client/clientset/versioned/typed/eventing/v1alpha1" v1alpha1 "github.com/actionscore/actions/pkg/client/clientset/versioned/typed/components/v1alpha1"
rest "k8s.io/client-go/rest" rest "k8s.io/client-go/rest"
testing "k8s.io/client-go/testing" testing "k8s.io/client-go/testing"
) )
type FakeEventingV1alpha1 struct { type FakeComponentsV1alpha1 struct {
*testing.Fake *testing.Fake
} }
func (c *FakeEventingV1alpha1) EventSources(namespace string) v1alpha1.EventSourceInterface { func (c *FakeComponentsV1alpha1) Components(namespace string) v1alpha1.ComponentInterface {
return &FakeEventSources{c, namespace} return &FakeComponents{c, namespace}
} }
// RESTClient returns a RESTClient that is used to communicate // RESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation. // with API server by this client implementation.
func (c *FakeEventingV1alpha1) RESTClient() rest.Interface { func (c *FakeComponentsV1alpha1) RESTClient() rest.Interface {
var ret *rest.RESTClient var ret *rest.RESTClient
return ret return ret
} }
...@@ -18,4 +18,4 @@ limitations under the License. ...@@ -18,4 +18,4 @@ limitations under the License.
package v1alpha1 package v1alpha1
type EventSourceExpansion interface{} type ComponentExpansion interface{}
...@@ -16,10 +16,10 @@ limitations under the License. ...@@ -16,10 +16,10 @@ limitations under the License.
// Code generated by informer-gen. DO NOT EDIT. // Code generated by informer-gen. DO NOT EDIT.
package eventing package components
import ( import (
v1alpha1 "github.com/actionscore/actions/pkg/client/informers/externalversions/eventing/v1alpha1" v1alpha1 "github.com/actionscore/actions/pkg/client/informers/externalversions/components/v1alpha1"
internalinterfaces "github.com/actionscore/actions/pkg/client/informers/externalversions/internalinterfaces" internalinterfaces "github.com/actionscore/actions/pkg/client/informers/externalversions/internalinterfaces"
) )
......
...@@ -18,10 +18,10 @@ limitations under the License. ...@@ -18,10 +18,10 @@ limitations under the License.
package v1alpha1 package v1alpha1
// EventSourceListerExpansion allows custom methods to be added to // ComponentListerExpansion allows custom methods to be added to
// EventSourceLister. // ComponentLister.
type EventSourceListerExpansion interface{} type ComponentListerExpansion interface{}
// EventSourceNamespaceListerExpansion allows custom methods to be added to // ComponentNamespaceListerExpansion allows custom methods to be added to
// EventSourceNamespaceLister. // ComponentNamespaceLister.
type EventSourceNamespaceListerExpansion interface{} type ComponentNamespaceListerExpansion interface{}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册