未验证 提交 5918c791 编写于 作者: J JJW 提交者: GitHub

feat(core): merge multiple streams into one stream (#134)

* feat(core): Combine multiple streams into one stream
Co-authored-by: Njjwygjj <wangyaoguang@cel.la>
上级 5907059d
......@@ -23,7 +23,7 @@ type NoiseData struct {
var printer = func(_ context.Context, i interface{}) (interface{}, error) {
value := i.(NoiseData)
rightNow := time.Now().UnixNano() / int64(time.Millisecond)
fmt.Printf("[%s] %d > value: %f ⚡️=%dms\n", value.From, value.Time, value.Noise, rightNow-value.Time)
fmt.Println(fmt.Sprintf("[%s] %d > value: %f ⚡️=%dms", value.From, value.Time, value.Noise, rightNow-value.Time))
return i, nil
}
......
package main
import (
"context"
"fmt"
y3 "github.com/yomorun/y3-codec-golang"
"github.com/yomorun/yomo/pkg/rx"
)
const DataAKey = 0x11
const DataBKey = 0x12
var callback = func(v []byte) (interface{}, error) {
return y3.ToFloat32(v)
}
var printera = func(_ context.Context, i interface{}) (interface{}, error) {
value := i.(float32)
fmt.Println(fmt.Sprintf("[%s]> value: %f", "data-a", value))
return i, nil
}
var printerb = func(_ context.Context, i interface{}) (interface{}, error) {
value := i.(float32)
fmt.Println(fmt.Sprintf("[%s]> value: %f", "data-b", value))
return i, nil
}
var zipper = func(_ context.Context, ia interface{}, ib interface{}) (interface{}, error) {
return fmt.Sprintf("⚡️ Zip [%s],[%s] -> Value: %f, %f", "dataA", "dataB", ia.(float32), ib.(float32)), nil
}
func Handler(rxstream rx.RxStream) rx.RxStream {
streamA := rxstream.Subscribe(DataAKey).OnObserve(callback).Map(printera)
streamB := rxstream.Subscribe(DataBKey).OnObserve(callback).Map(printerb)
stream := streamA.ZipFromIterable(streamB, zipper).StdOut().Encode(0x10)
return stream
}
package main
import (
"context"
"log"
"math/rand"
"os"
"time"
y3 "github.com/yomorun/y3-codec-golang"
"github.com/yomorun/yomo/pkg/quic"
)
var zipperAddr = os.Getenv("YOMO_ZIPPER_ENDPOINT")
func main() {
if zipperAddr == "" {
zipperAddr = "localhost:9999"
}
err := emit(zipperAddr)
if err != nil {
log.Printf("❌ Emit the data to yomo-zipper %s failure with err: %v", zipperAddr, err)
}
}
func emit(addr string) error {
client, err := quic.NewClient(addr)
if err != nil {
return err
}
log.Printf("✅ Connected to yomo-zipper %s", addr)
stream, err := client.CreateStream(context.Background())
if err != nil {
return err
}
generateAndSendData(stream)
return nil
}
var codec = y3.NewCodec(0x11)
func generateAndSendData(stream quic.Stream) {
for {
time.Sleep(100 * time.Millisecond)
num := rand.New(rand.NewSource(time.Now().UnixNano())).Float32() * 200
sendingBuf, _ := codec.Marshal(num)
_, err := stream.Write(sendingBuf)
if err != nil {
log.Printf("❌ Emit %v to yomo-zipper failure with err: %v", num, err)
} else {
log.Printf("✅ Emit %v to yomo-zipper", num)
}
}
}
package main
import (
"context"
"log"
"math/rand"
"os"
"time"
y3 "github.com/yomorun/y3-codec-golang"
"github.com/yomorun/yomo/pkg/quic"
)
var zipperAddr = os.Getenv("YOMO_ZIPPER_ENDPOINT")
func main() {
if zipperAddr == "" {
zipperAddr = "localhost:9999"
}
err := emit(zipperAddr)
if err != nil {
log.Printf("❌ Emit the data to yomo-zipper %s failure with err: %v", zipperAddr, err)
}
}
func emit(addr string) error {
client, err := quic.NewClient(addr)
if err != nil {
return err
}
log.Printf("✅ Connected to yomo-zipper %s", addr)
stream, err := client.CreateStream(context.Background())
if err != nil {
return err
}
generateAndSendData(stream)
return nil
}
var codec = y3.NewCodec(0x12)
func generateAndSendData(stream quic.Stream) {
for {
time.Sleep(100 * time.Millisecond)
num := rand.New(rand.NewSource(time.Now().UnixNano())).Float32() * 2000
sendingBuf, _ := codec.Marshal(num)
_, err := stream.Write(sendingBuf)
if err != nil {
log.Printf("❌ Emit %v to yomo-zipper failure with err: %v", num, err)
} else {
log.Printf("✅ Emit %v to yomo-zipper", num)
}
}
}
name: Service
host: localhost
port: 9999
flows:
- name: training
host: localhost
port: 4242
\ No newline at end of file
......@@ -257,6 +257,8 @@ github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v1.1.1 h1:KfztREH0tPxJJ+geloSLaAkaPkr4ki2Er5quFV1TDo4=
github.com/spf13/cobra v1.1.1/go.mod h1:WnodtKOvamDL/PwE2M4iKs8aMDBZ5Q5klgD3qfVJQMI=
github.com/spf13/cobra v1.1.3 h1:xghbfqPkxzxP3C/f3n5DdpAbdKLj4ZE4BWQI362l53M=
github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSWzOo=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
......@@ -280,10 +282,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yomorun/y3-codec-golang v1.6.3 h1:7JTqWbdeKp9ZeIlIoZTFaMC8/PwQ2HKFVOCbxAEY2B0=
github.com/yomorun/y3-codec-golang v1.6.3/go.mod h1:R+y8hQ/AHZ1tDzWtmspVeX7omqVWFJ42gdlXIOp35rA=
github.com/yomorun/y3-codec-golang v1.6.4 h1:2XZBafqNXvHp8PoVrEE3r4WW/nOGJZg/d2Hh0A0WBuk=
github.com/yomorun/y3-codec-golang v1.6.4/go.mod h1:R+y8hQ/AHZ1tDzWtmspVeX7omqVWFJ42gdlXIOp35rA=
github.com/yomorun/y3-codec-golang v1.6.6 h1:Cmqag2WYTyt3GcuuDJRVayFTniWGHE+7r68G/LNOMqk=
github.com/yomorun/y3-codec-golang v1.6.6/go.mod h1:R+y8hQ/AHZ1tDzWtmspVeX7omqVWFJ42gdlXIOp35rA=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
......@@ -473,6 +473,7 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
......
package cmd
import (
"context"
"fmt"
"io"
"log"
"plugin"
......@@ -39,6 +41,7 @@ func NewCmdDev() *cobra.Command {
quicHandler := &quicDevHandler{
serverlessHandle: slHandler,
serverAddr: fmt.Sprintf("localhost:%d", opts.Port),
readers: make(chan io.Reader),
}
err = serverless.Run(endpoint, quicHandler)
......@@ -57,16 +60,16 @@ func NewCmdDev() *cobra.Command {
type quicDevHandler struct {
serverlessHandle plugin.Symbol
serverAddr string
readers chan io.Reader
}
func (s quicDevHandler) Listen() error {
func (s *quicDevHandler) Listen() error {
err := mocker.EmitMockDataFromCloud(s.serverAddr)
return err
}
func (s quicDevHandler) Read(st quic.Stream) error {
stream := dispatcher.Dispatcher(s.serverlessHandle, rx.FromReaderWithY3(st))
if err != nil {
return err
}
rxstream := rx.FromReaderWithY3(s.readers)
stream := dispatcher.Dispatcher(s.serverlessHandle, rxstream)
go func() {
for customer := range stream.Observe() {
if customer.Error() {
......@@ -74,5 +77,13 @@ func (s quicDevHandler) Read(st quic.Stream) error {
}
}
}()
rxstream.Connect(context.Background())
return nil
}
func (s *quicDevHandler) Read(st quic.Stream) error {
s.readers <- st
return nil
}
package cmd
import (
"context"
"fmt"
"io"
"log"
"math/rand"
"os"
"plugin"
......@@ -44,6 +47,8 @@ func NewCmdRun() *cobra.Command {
endpoint := fmt.Sprintf("0.0.0.0:%d", opts.Port)
quicHandler := &quicServerHandler{
serverlessHandle: slHandler,
readers: make(chan io.Reader),
writers: make([]io.Writer, 0),
}
err = serverless.Run(endpoint, quicHandler)
......@@ -61,23 +66,45 @@ func NewCmdRun() *cobra.Command {
type quicServerHandler struct {
serverlessHandle plugin.Symbol
readers chan io.Reader
writers []io.Writer
}
func (s quicServerHandler) Listen() error {
return nil
}
func (s *quicServerHandler) Listen() error {
rxstream := rx.FromReaderWithY3(s.readers)
func (s quicServerHandler) Read(st quic.Stream) error {
stream := dispatcher.Dispatcher(s.serverlessHandle, rx.FromReaderWithY3(st))
stream := dispatcher.Dispatcher(s.serverlessHandle, rxstream)
rxstream.Connect(context.Background())
go func() {
for customer := range stream.Observe() {
if customer.Error() {
fmt.Println(customer.E.Error())
} else if customer.V != nil {
st.Write((customer.V).([]byte))
index := rand.Intn(len(s.writers))
loop:
for i, w := range s.writers {
if index == i {
_, err := w.Write((customer.V).([]byte))
if err != nil {
index = rand.Intn(len(s.writers))
break loop
}
} else {
w.Write([]byte{0})
}
}
}
}
}()
return nil
}
func (s *quicServerHandler) Read(st quic.Stream) error {
s.readers <- st
s.writers = append(s.writers, st)
return nil
}
......@@ -4,6 +4,7 @@ import (
"fmt"
"io"
"log"
"time"
"github.com/spf13/cobra"
"github.com/yomorun/yomo/internal/conf"
......@@ -68,8 +69,9 @@ func (s *quicDevHandler) Listen() error {
}
func (s *quicDevHandler) Read(st quic.Stream) error {
id := time.Now().UnixNano()
flows, sinks := workflow.Build(s.serverlessConfig)
flows, sinks := workflow.Build(s.serverlessConfig, id)
stream := dispatcher.DispatcherWithFunc(flows, st)
......@@ -82,6 +84,9 @@ func (s *quicDevHandler) Read(st quic.Stream) error {
value := customer.V.([]byte)
if len(value) == 1 && value[0] == byte(0) {
continue
}
for _, sink := range sinks {
go func(_sink func() (io.Writer, func()), buf []byte) {
writer, cancel := _sink()
......
......@@ -4,6 +4,7 @@ import (
"fmt"
"io"
"log"
"time"
"github.com/spf13/cobra"
"github.com/yomorun/yomo/internal/conf"
......@@ -65,7 +66,8 @@ func (s *quicHandler) Listen() error {
}
func (s *quicHandler) Read(st quic.Stream) error {
flows, sinks := workflow.Build(s.serverlessConfig)
id := time.Now().UnixNano()
flows, sinks := workflow.Build(s.serverlessConfig, id)
stream := dispatcher.DispatcherWithFunc(flows, st)
......@@ -76,16 +78,19 @@ func (s *quicHandler) Read(st quic.Stream) error {
continue
}
value := customer.V.([]byte)
if len(value) == 1 && value[0] == byte(0) {
continue
}
for _, sink := range sinks {
go func(_sink func() (io.Writer, func()), buf []byte) {
writer, cancel := _sink()
if writer != nil {
_, err := writer.Write(buf)
if err != nil {
cancel()
}
}
}(sink, value)
}
......
......@@ -5,21 +5,31 @@ import (
"fmt"
"io"
"log"
"sync"
"github.com/yomorun/yomo/internal/conf"
"github.com/yomorun/yomo/pkg/quic"
)
var Clients map[string]Client
var FlowClients map[string]Client
var SinkClients map[string]Client
var flowmutex sync.RWMutex
var sinkmutex sync.RWMutex
type Client struct {
App conf.App
Stream io.ReadWriter
StreamMap map[int64]Stream
QuicClient quic.Client
}
type Stream struct {
St io.ReadWriter
CancelFunc context.CancelFunc
}
func init() {
Clients = make(map[string]Client)
FlowClients = make(map[string]Client)
SinkClients = make(map[string]Client)
}
// Run runs quic service
......@@ -30,29 +40,33 @@ func Run(endpoint string, handle quic.ServerHandler) error {
}
// Build build the workflow by config (.yaml).
func Build(wfConf *conf.WorkflowConfig) ([]func() (io.ReadWriter, func()), []func() (io.Writer, func())) {
func Build(wfConf *conf.WorkflowConfig, id int64) ([]func() (io.ReadWriter, func()), []func() (io.Writer, func())) {
//init workflow
flows := make([]func() (io.ReadWriter, func()), 0)
sinks := make([]func() (io.Writer, func()), 0)
for _, app := range wfConf.Flows {
flows = append(flows, createReadWriter(app))
flows = append(flows, createReadWriter(app, id))
}
for _, app := range wfConf.Sinks {
sinks = append(sinks, createWriter(app))
sinks = append(sinks, createWriter(app, 0))
}
return flows, sinks
}
func connectToApp(ctx context.Context, app conf.App) (quic.Stream, error) {
func connectToApp(app conf.App) (quic.Client, error) {
client, err := quic.NewClient(fmt.Sprintf("%s:%d", app.Host, app.Port))
if err != nil {
log.Print(getConnectFailedMsg(app), err)
return nil, err
}
log.Printf("✅ Connect to %s successfully.", getAppInfo(app))
return client, err
}
func createStream(ctx context.Context, client quic.Client) (quic.Stream, error) {
return client.CreateStream(ctx)
}
......@@ -68,71 +82,125 @@ func getAppInfo(app conf.App) string {
app.Port)
}
func createReadWriter(app conf.App) func() (io.ReadWriter, func()) {
func createReadWriter(app conf.App, id int64) func() (io.ReadWriter, func()) {
f := func() (io.ReadWriter, func()) {
if Clients[app.Name].Stream != nil {
return Clients[app.Name].Stream, Clients[app.Name].CancelFunc
flowmutex.Lock()
if len(FlowClients[app.Name].StreamMap) > 0 && FlowClients[app.Name].StreamMap[id].St != nil {
flowmutex.Unlock()
return FlowClients[app.Name].StreamMap[id].St, FlowClients[app.Name].StreamMap[id].CancelFunc
}
ctx, cancel := context.WithCancel(context.Background())
stream, err := connectToApp(ctx, app)
if err != nil {
Clients[app.Name] = Client{
if FlowClients[app.Name].StreamMap == nil || (FlowClients[app.Name].StreamMap != nil && FlowClients[app.Name].QuicClient == nil) {
client, err := connectToApp(app)
if err != nil {
flowmutex.Unlock()
return nil, nil
}
streammap := make(map[int64]Stream)
FlowClients[app.Name] = Client{
App: app,
Stream: nil,
CancelFunc: cancelStream(cancel, app),
StreamMap: streammap,
QuicClient: client,
}
return nil, cancelStream(cancel, app)
}
Clients[app.Name] = Client{
App: app,
Stream: stream,
CancelFunc: cancelStream(cancel, app),
ctx, cancel := context.WithCancel(context.Background())
stream, err := createStream(ctx, FlowClients[app.Name].QuicClient)
if err != nil {
if err.Error() == "NO_ERROR: No recent network activity" {
FlowClients[app.Name] = Client{
App: app,
StreamMap: nil,
QuicClient: nil,
}
}
flowmutex.Unlock()
return nil, cancelFlowStream(cancel, app, id)
}
return stream, cancelStream(cancel, app)
FlowClients[app.Name].StreamMap[id] = Stream{
St: stream,
CancelFunc: cancelFlowStream(cancel, app, id),
}
flowmutex.Unlock()
return stream, cancelFlowStream(cancel, app, id)
}
return f
}
func createWriter(app conf.App) func() (io.Writer, func()) {
func createWriter(app conf.App, id int64) func() (io.Writer, func()) {
f := func() (io.Writer, func()) {
if Clients[app.Name].Stream != nil {
return Clients[app.Name].Stream, Clients[app.Name].CancelFunc
sinkmutex.Lock()
if len(SinkClients[app.Name].StreamMap) > 0 && SinkClients[app.Name].StreamMap[id].St != nil {
sinkmutex.Unlock()
return SinkClients[app.Name].StreamMap[id].St, SinkClients[app.Name].StreamMap[id].CancelFunc
}
ctx, cancel := context.WithCancel(context.Background())
stream, err := connectToApp(ctx, app)
if err != nil {
Clients[app.Name] = Client{
if SinkClients[app.Name].StreamMap == nil || (SinkClients[app.Name].StreamMap != nil && SinkClients[app.Name].QuicClient == nil) {
client, err := connectToApp(app)
if err != nil {
sinkmutex.Unlock()
return nil, nil
}
streammap := make(map[int64]Stream)
SinkClients[app.Name] = Client{
App: app,
Stream: nil,
CancelFunc: cancelStream(cancel, app),
StreamMap: streammap,
QuicClient: client,
}
return nil, cancelStream(cancel, app)
}
Clients[app.Name] = Client{
App: app,
Stream: stream,
CancelFunc: cancelStream(cancel, app),
}
return stream, cancelStream(cancel, app)
ctx, cancel := context.WithCancel(context.Background())
stream, err := createStream(ctx, SinkClients[app.Name].QuicClient)
if err != nil {
if err.Error() == "NO_ERROR: No recent network activity" {
SinkClients[app.Name] = Client{
App: app,
StreamMap: nil,
QuicClient: nil,
}
}
sinkmutex.Unlock()
return nil, cancelSinkStream(cancel, app, id)
}
SinkClients[app.Name].StreamMap[id] = Stream{
St: stream,
CancelFunc: cancelSinkStream(cancel, app, id),
}
sinkmutex.Unlock()
return stream, cancelSinkStream(cancel, app, id)
}
return f
}
func cancelStream(cancel context.CancelFunc, app conf.App) func() {
func cancelFlowStream(cancel context.CancelFunc, app conf.App, id int64) func() {
f := func() {
flowmutex.Lock()
if FlowClients[app.Name].StreamMap != nil {
stream := FlowClients[app.Name].StreamMap[id]
stream.St = nil
FlowClients[app.Name].StreamMap[id] = stream
}
flowmutex.Unlock()
}
return f
}
func cancelSinkStream(cancel context.CancelFunc, app conf.App, id int64) func() {
f := func() {
cancel()
Clients[app.Name] = Client{
App: app,
Stream: nil,
sinkmutex.Lock()
if SinkClients[app.Name].StreamMap != nil {
stream := SinkClients[app.Name].StreamMap[id]
stream.St = nil
SinkClients[app.Name].StreamMap[id] = stream
}
sinkmutex.Unlock()
}
return f
}
......@@ -51,20 +51,29 @@ func (s *quicGoServer) ListenAndServe(ctx context.Context, addr string) error {
}
for {
session, err := listener.Accept(context.Background())
ctx, cancel := context.WithCancel(context.Background())
session, err := listener.Accept(ctx)
if err != nil {
cancel()
return err
}
stream, err := session.AcceptStream(context.Background())
if err != nil {
return err
}
if s.handler != nil {
s.handler.Read(stream)
} else {
log.Print("handler isn't set in QUIC server")
}
go func(session quicGo.Session, cancel context.CancelFunc) {
defer cancel()
for {
stream, err := session.AcceptStream(context.Background())
if err != nil {
break
}
if s.handler != nil {
s.handler.Read(stream)
} else {
log.Print("handler isn't set in QUIC server")
break
}
}
}(session, cancel)
}
}
......@@ -79,7 +88,7 @@ func (c *quicGoClient) Connect(addr string) error {
}
session, err := quicGo.DialAddr(addr, tlsConf, &quicGo.Config{
MaxIdleTimeout: time.Second * 5,
MaxIdleTimeout: 500 * time.Millisecond,
KeepAlive: true,
MaxIncomingStreams: 1000000,
MaxIncomingUniStreams: 1000000,
......
package quic
import "io"
import "github.com/lucas-clemente/quic-go"
// Stream is the QUIC stream
type Stream interface {
ReceiveStream
SendStream
}
// ReceiveStream is an unidirectional Receive Stream.
type ReceiveStream interface {
io.Reader
}
// A SendStream is an unidirectional Send Stream.
type SendStream interface {
io.Writer
io.Closer
quic.Stream
}
package rx
import (
"bytes"
"context"
"fmt"
"io"
......@@ -9,8 +10,21 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/reactivex/rxgo/v2"
y3 "github.com/yomorun/y3-codec-golang"
"github.com/yomorun/yomo/pkg/yy3"
)
type echo struct {
buf *bytes.Buffer
}
func (e *echo) Write(p []byte) (n int, err error) {
return e.buf.Write(p)
}
func (e *echo) Read(p []byte) (n int, err error) {
return e.buf.Read(p)
}
func FromChannel(channel chan []byte) RxStream {
f := func(ctx context.Context, next chan rxgo.Item) {
defer close(next)
......@@ -55,9 +69,42 @@ func FromReader(reader io.Reader) RxStream {
return ConvertObservable(rxgo.FromChannel(next))
}
func FromReaderWithY3(reader io.Reader) RxStream {
source := y3.FromStream(reader)
return ConvertObservableWithY3(source)
func FromReaderWithY3(readers chan io.Reader) RxStream {
f := func(ctx context.Context, next chan rxgo.Item) {
defer close(next)
for {
select {
case <-ctx.Done():
return
case item, ok := <-readers:
if !ok {
return
}
r, w := io.Pipe()
if !Of(yy3.FromStream(r)).SendContext(ctx, next) {
return
}
go func() {
time.Sleep(time.Millisecond)
defer w.Close()
for {
buf := make([]byte, 3*1024)
n, err := item.Read(buf)
if err != nil {
break
} else {
value := buf[:n]
w.Write(value)
}
}
}()
}
}
}
return CreateObservable(f, rxgo.WithPublishStrategy())
}
func FromReaderWithFunc(f func() io.Reader) RxStream {
......@@ -92,7 +139,6 @@ func Of(i interface{}) rxgo.Item {
type RxStreamImpl struct {
observable rxgo.Observable
y3 y3.Observable
}
func (s *RxStreamImpl) All(predicate rxgo.Predicate, opts ...rxgo.Option) RxStream {
......@@ -544,15 +590,37 @@ func (s *RxStreamImpl) MergeReadWriterWithFunc(rwf func() (io.ReadWriter, func()
}
func (s *RxStreamImpl) Subscribe(key byte) RxStream {
return ConvertObservableWithY3(s.y3.Subscribe(key))
f := func(ctx context.Context, next chan rxgo.Item) {
defer close(next)
observe := s.Observe()
for {
select {
case <-ctx.Done():
return
case item, ok := <-observe:
if !ok {
return
}
if item.Error() {
return
}
y3stream := (item.V).(yy3.Observable)
if !Of(y3stream.Subscribe(key)).SendContext(ctx, next) {
return
}
}
}
}
return CreateObservable(f)
}
func (s *RxStreamImpl) OnObserve(function func(v []byte) (interface{}, error)) RxStream {
f := func(ctx context.Context, next chan rxgo.Item) {
defer close(next)
observe := s.y3.OnObserve(function)
observe := s.Observe()
for {
select {
case <-ctx.Done():
......@@ -561,10 +629,26 @@ func (s *RxStreamImpl) OnObserve(function func(v []byte) (interface{}, error)) R
if !ok {
return
}
if !Of(item).SendContext(ctx, next) {
if item.Error() {
return
}
go func() {
onObserve := (item.V).(yy3.Observable).OnObserve(function)
for {
select {
case <-ctx.Done():
return
case item, ok := <-onObserve:
if !ok {
return
}
if !Of(item).SendContext(ctx, next) {
return
}
}
}
}()
}
}
}
......@@ -618,10 +702,6 @@ func ConvertObservable(observable rxgo.Observable) RxStream {
return &RxStreamImpl{observable: observable}
}
func ConvertObservableWithY3(observable y3.Observable) RxStream {
return &RxStreamImpl{y3: observable}
}
type infiniteWriter struct {
io.Writer
}
......
package yy3
import (
"io"
"sync"
"github.com/yomorun/y3-codec-golang/pkg/common"
)
// Iterable iterate through and get the data of observe
type Iterable interface {
Observe() <-chan interface{}
}
// Observable provide subscription and notification processing
type Observable interface {
Iterable
Subscribe(key byte) Observable
OnObserve(function func(v []byte) (interface{}, error)) chan interface{}
}
type observableImpl struct {
iterable Iterable
}
type iterableImpl struct {
next chan interface{}
subscribers []chan interface{}
mutex sync.RWMutex
producerAlreadyCreated bool
}
func (i *iterableImpl) Observe() <-chan interface{} {
ch := make(chan interface{})
i.mutex.Lock()
i.subscribers = append(i.subscribers, ch)
i.mutex.Unlock()
i.connect()
return ch
}
func (i *iterableImpl) connect() {
i.mutex.Lock()
if !i.producerAlreadyCreated {
go i.produce()
i.producerAlreadyCreated = true
}
i.mutex.Unlock()
}
func (i *iterableImpl) produce() {
defer func() {
i.mutex.RLock()
for _, subscriber := range i.subscribers {
close(subscriber)
}
i.mutex.RUnlock()
}()
for {
select {
case item, ok := <-i.next:
if !ok {
return
}
i.mutex.RLock()
for _, subscriber := range i.subscribers {
subscriber <- item
}
i.mutex.RUnlock()
}
}
}
func (o *observableImpl) Observe() <-chan interface{} {
return o.iterable.Observe()
}
//FromStream reads data from reader
func FromStream(reader io.Reader) Observable {
f := func(next chan interface{}) {
defer close(next)
for {
buf := make([]byte, 3*1024)
n, err := reader.Read(buf)
if err != nil {
break
} else {
value := buf[:n]
next <- value
}
}
}
return createObservable(f)
}
//Processing callback function when there is data
func (o *observableImpl) OnObserve(function func(v []byte) (interface{}, error)) chan interface{} {
_next := make(chan interface{})
f := func(next chan interface{}) {
defer close(next)
observe := o.Observe()
for {
select {
case item, ok := <-observe:
if !ok {
return
}
buf := item.([]byte)
value, err := function(buf)
if err != nil {
return
}
next <- value
}
}
}
go f(_next)
return _next
}
//Get the value of the subscribe key from the stream
func (o *observableImpl) Subscribe(key byte) Observable {
f := func(next chan interface{}) {
defer close(next)
buffer := make([]byte, 0)
var (
index int32 = 0 //vernier
state string = "RS" //RS,RLS,TS,LS,VS,REJECT
length int32 = 0
value int32 = 0
limit int32 = 0
)
observe := o.Observe()
for {
select {
case item, ok := <-observe:
if !ok {
return
}
buf := item.([]byte)
for i := 0; i < len(buf); i++ {
b := buf[i]
switch state {
case "RS":
if common.IsRootTag(b) {
index++
state = "RLS"
} else {
buffer = make([]byte, 0)
length = 0
value = 0
index = 0
limit = 0
}
continue
case "RLS":
index++
buffer = append(buffer, b)
l, err := common.DecodeLength(buffer)
if err != nil {
continue
}
limit = index + l
state = "TS"
buffer = make([]byte, 0)
continue
case "TS":
index++
buffer = make([]byte, 0)
buffer = append(buffer, b)
state = "LS"
continue
case "LS":
index++
buffer = append(buffer, b)
l, err := common.DecodeLength(buffer[1:])
if err != nil {
continue
}
length = int32(len(buffer[1:]))
value = l
state = "VS"
continue
case "VS":
tail := int32(len(buf[i:]))
buflength := int32(len(buffer))
if tail >= ((1 + length + value) - buflength) {
start := i
end := int32(i) + (1 + length + value) - buflength
buffer = append(buffer, buf[start:end]...)
index += ((1 + length + value) - buflength)
i += (int((1+length+value)-buflength) - 1)
//check key
k := (buffer[0] << 2) >> 2
if k == key {
next <- buffer
if limit == index {
state = "RS"
length = 0
value = 0
index = 0
limit = 0
buffer = make([]byte, 0)
} else {
state = "REJECT"
}
} else {
if limit == index {
state = "RS"
length = 0
value = 0
index = 0
limit = 0
buffer = make([]byte, 0)
} else {
state = "TS"
length = 0
value = 0
}
}
continue
} else {
buffer = append(buffer, buf[i:]...)
index += tail
break
}
case "REJECT":
tail := int32(len(buf[i:]))
if limit == index {
state = "RS"
length = 0
value = 0
index = 0
limit = 0
buffer = make([]byte, 0)
continue
} else if tail >= (limit - index) {
i += (int(limit-index) - 1)
state = "RS"
length = 0
value = 0
index = 0
limit = 0
buffer = make([]byte, 0)
continue
} else {
index += tail
break
}
}
}
}
}
}
return createObservable(f)
}
func createObservable(f func(next chan interface{})) Observable {
next := make(chan interface{})
subscribers := make([]chan interface{}, 0)
go f(next)
return &observableImpl{iterable: &iterableImpl{next: next, subscribers: subscribers}}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册