eventhubs.go 2.5 KB
Newer Older
Y
Yaron Schneider 已提交
1
package eventhubs
Y
yaron2 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14

import (
	"bytes"
	"context"
	"crypto/tls"
	"encoding/gob"
	"encoding/json"
	"net/http"
	"os"
	"os/signal"

	eventhub "github.com/Azure/azure-event-hubs-go"
	log "github.com/Sirupsen/logrus"
Y
Yaron Schneider 已提交
15
	"github.com/actionscore/actions/pkg/components/bindings"
Y
yaron2 已提交
16 17 18
)

type AzureEventHubs struct {
Y
Yaron Schneider 已提交
19
	Spec bindings.Metadata
Y
yaron2 已提交
20 21 22 23 24 25 26 27 28 29
}

type AzureEventHubsMetadata struct {
	ConnectionString string `json:"connectionString"`
}

func NewAzureEventHubs() *AzureEventHubs {
	return &AzureEventHubs{}
}

Y
Yaron Schneider 已提交
30 31
func (a *AzureEventHubs) Init(metadata bindings.Metadata) error {
	a.Spec = metadata
Y
yaron2 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44
	return nil
}

func GetBytes(key interface{}) ([]byte, error) {
	var buf bytes.Buffer
	enc := gob.NewEncoder(&buf)
	err := enc.Encode(key)
	if err != nil {
		return nil, err
	}
	return buf.Bytes(), nil
}

Y
Yaron Schneider 已提交
45
func (a *AzureEventHubs) Write(req *bindings.WriteRequest) error {
Y
yaron2 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
	http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}

	b, err := json.Marshal(a.Spec.ConnectionInfo)
	if err != nil {
		return err
	}

	var connInfo AzureEventHubsMetadata
	err = json.Unmarshal(b, &connInfo)
	if err != nil {
		return err
	}

	connStr := connInfo.ConnectionString

	hub, err := eventhub.NewHubFromConnectionString(connStr)
	if err != nil {
		return err
	}

	err = hub.Send(context.Background(), &eventhub.Event{
Y
Yaron Schneider 已提交
67
		Data: req.Data,
Y
yaron2 已提交
68 69 70 71 72 73 74 75 76
	})
	if err != nil {
		return err
	}

	log.Info("EventHubs event sent successfully")
	return nil
}

Y
Yaron Schneider 已提交
77
func (a *AzureEventHubs) Read(handler func(*bindings.ReadResponse) error) error {
Y
yaron2 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
	http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}

	b, err := json.Marshal(a.Spec.ConnectionInfo)
	if err != nil {
		return err
	}

	var connInfo AzureEventHubsMetadata
	err = json.Unmarshal(b, &connInfo)
	if err != nil {
		return err
	}

	connStr := connInfo.ConnectionString

	hub, err := eventhub.NewHubFromConnectionString(connStr)
	if err != nil {
		return err
	}

Y
Yaron Schneider 已提交
98 99 100 101 102 103 104 105
	callback := func(c context.Context, event *eventhub.Event) error {
		if event != nil {
			handler(&bindings.ReadResponse{
				Data: event.Data,
			})
		}

		return nil
Y
yaron2 已提交
106 107 108 109 110 111 112 113 114
	}

	ctx := context.Background()
	runtimeInfo, err := hub.GetRuntimeInformation(ctx)
	if err != nil {
		return err
	}

	for _, partitionID := range runtimeInfo.PartitionIDs {
Y
Yaron Schneider 已提交
115
		_, err := hub.Receive(ctx, partitionID, callback, eventhub.ReceiveWithLatestOffset())
Y
yaron2 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128
		if err != nil {
			return err
		}
	}

	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, os.Interrupt, os.Kill)
	<-signalChan

	hub.Close(context.Background())

	return nil
}