diff --git a/cmd/yomo/main.go b/cmd/yomo/main.go index aa77ce8a495c3b1179ee54a3423f2c29060f3eb3..fe25a85fd8af269e4aeaf013a1e30b45c024eaee 100644 --- a/cmd/yomo/main.go +++ b/cmd/yomo/main.go @@ -19,7 +19,7 @@ func main() { Long: "Wf is the commands for YoMo workflow.", } wfCmd.AddCommand( - // wf.NewCmdRun(), + wf.NewCmdRun(), wf.NewCmdDev(), ) diff --git a/example/flow/app.go b/example/flow/app.go index 1c9922ad81476899f2c99a89ee729e3803e6f3e2..c038379dc38a564be58b5ae3daae47381dd7b845 100644 --- a/example/flow/app.go +++ b/example/flow/app.go @@ -24,7 +24,7 @@ var printer = func(_ context.Context, i interface{}) (interface{}, error) { value := i.(NoiseData) rightNow := time.Now().UnixNano() / int64(time.Millisecond) fmt.Println(fmt.Sprintf("[%s] %d > value: %f ⚡️=%dms", value.From, value.Time, value.Noise, rightNow-value.Time)) - return i, nil + return value.Noise, nil } var callback = func(v []byte) (interface{}, error) { @@ -45,7 +45,7 @@ func Handler(rxstream rx.RxStream) rx.RxStream { Debounce(rxgo.WithDuration(50 * time.Millisecond)). Map(printer). StdOut(). - Encode(0x10) + Encode(0x11) return stream } diff --git a/example/sink/app.go b/example/sink/app.go index c07edfa13a90c8e69e23831b292f5ea83a751aca..7d14a0cc6a4d101922407f88af3687845f1ac7fe 100644 --- a/example/sink/app.go +++ b/example/sink/app.go @@ -26,6 +26,6 @@ func Handler(rxstream rx.RxStream) rx.RxStream { OnObserve(callback). AuditTime(100 * time.Millisecond). Map(store). - Encode(0x10) + Encode(0x12) return stream } diff --git a/example/trainingmodel/flow/app.go b/example/trainingmodel/flow/app.go index 332e9772546dd8ea23ba9a75e7c5eb708fb68a84..ec3042b490454663fd67e746bd702f1cba5627a9 100644 --- a/example/trainingmodel/flow/app.go +++ b/example/trainingmodel/flow/app.go @@ -8,13 +8,17 @@ import ( "github.com/yomorun/yomo/pkg/rx" ) -const DataAKey = 0x11 -const DataBKey = 0x12 +const DataAKey = 0x3a +const DataBKey = 0x3b -var callback = func(v []byte) (interface{}, error) { +var callbacka = func(v []byte) (interface{}, error) { return y3.ToFloat32(v) } +var callbackb = func(v []byte) (interface{}, error) { + return y3.ToUTF8String(v) +} + var printera = func(_ context.Context, i interface{}) (interface{}, error) { value := i.(float32) fmt.Println(fmt.Sprintf("[%s]> value: %f", "data-a", value)) @@ -22,19 +26,19 @@ var printera = func(_ context.Context, i interface{}) (interface{}, error) { } var printerb = func(_ context.Context, i interface{}) (interface{}, error) { - value := i.(float32) - fmt.Println(fmt.Sprintf("[%s]> value: %f", "data-b", value)) + value := i.(string) + fmt.Println(fmt.Sprintf("[%s]> value: %s", "data-b", value)) return i, nil } var zipper = func(_ context.Context, ia interface{}, ib interface{}) (interface{}, error) { - return fmt.Sprintf("⚡️ Zip [%s],[%s] -> Value: %f, %f", "dataA", "dataB", ia.(float32), ib.(float32)), nil + return fmt.Sprintf("⚡️ Zip [%s],[%s] -> Value: %f, %s", "dataA", "dataB", ia.(float32), ib.(string)), nil } func Handler(rxstream rx.RxStream) rx.RxStream { - streamA := rxstream.Subscribe(DataAKey).OnObserve(callback).Map(printera) - streamB := rxstream.Subscribe(DataBKey).OnObserve(callback).Map(printerb) + streamA := rxstream.Subscribe(DataAKey).OnObserve(callbacka).Map(printera) + streamB := rxstream.Subscribe(DataBKey).OnObserve(callbackb).Map(printerb) stream := streamA.ZipFromIterable(streamB, zipper).StdOut().Encode(0x10) return stream diff --git a/example/trainingmodel/source-data-a/main.go b/example/trainingmodel/source-data-a/main.go index 10e32d0a0386719d52cecedf6ee6ef00b2ef99dd..11f1eef724099c3fbf8ecc72b52d2f9e97ad2b65 100644 --- a/example/trainingmodel/source-data-a/main.go +++ b/example/trainingmodel/source-data-a/main.go @@ -1,21 +1,23 @@ package main import ( - "context" + "io" "log" "math/rand" "os" + "strconv" + "strings" "time" y3 "github.com/yomorun/y3-codec-golang" - "github.com/yomorun/yomo/pkg/quic" + "github.com/yomorun/yomo/pkg/client" ) var zipperAddr = os.Getenv("YOMO_ZIPPER_ENDPOINT") func main() { if zipperAddr == "" { - zipperAddr = "localhost:9999" + zipperAddr = "localhost:9000" } err := emit(zipperAddr) if err != nil { @@ -24,36 +26,32 @@ func main() { } func emit(addr string) error { - client, err := quic.NewClient(addr) - if err != nil { - return err - } - log.Printf("✅ Connected to yomo-zipper %s", addr) + host := strings.Split(addr, ":")[0] + port, err := strconv.Atoi(strings.Split(addr, ":")[1]) - stream, err := client.CreateStream(context.Background()) + cli, err := client.Connect(host, port).Name("source-a").Stream() if err != nil { - return err + panic(err) } - - generateAndSendData(stream) + generateAndSendData(cli) return nil } -var codec = y3.NewCodec(0x11) +var codec = y3.NewCodec(0x3a) -func generateAndSendData(stream quic.Stream) { +func generateAndSendData(writer io.Writer) { for { - time.Sleep(100 * time.Millisecond) + time.Sleep(200 * time.Millisecond) num := rand.New(rand.NewSource(time.Now().UnixNano())).Float32() * 200 sendingBuf, _ := codec.Marshal(num) - _, err := stream.Write(sendingBuf) + _, err := writer.Write(sendingBuf) if err != nil { - log.Printf("❌ Emit %v to yomo-zipper failure with err: %v", num, err) + log.Printf("❌ Emit %v to yomo-zipper failure with err: %f", num, err) } else { - log.Printf("✅ Emit %v to yomo-zipper", num) + log.Printf("✅ Emit %f to yomo-zipper", num) } } } diff --git a/example/trainingmodel/source-data-b/main.go b/example/trainingmodel/source-data-b/main.go index c8dc40dfb9bff0e8b2c72e79766bf121e58a9a87..397f5bdb35207b9d2addb64c7e3e8046de01044a 100644 --- a/example/trainingmodel/source-data-b/main.go +++ b/example/trainingmodel/source-data-b/main.go @@ -1,21 +1,27 @@ package main import ( - "context" + "io" "log" "math/rand" "os" + "strconv" + "strings" "time" y3 "github.com/yomorun/y3-codec-golang" - "github.com/yomorun/yomo/pkg/quic" + "github.com/yomorun/yomo/pkg/client" ) var zipperAddr = os.Getenv("YOMO_ZIPPER_ENDPOINT") +const charset = "abcdefghijklmnopqrstuvwxyz" + +var seed *rand.Rand = rand.New(rand.NewSource(time.Now().UnixNano())) + func main() { if zipperAddr == "" { - zipperAddr = "localhost:9999" + zipperAddr = "localhost:9000" } err := emit(zipperAddr) if err != nil { @@ -25,38 +31,43 @@ func main() { func emit(addr string) error { - client, err := quic.NewClient(addr) - if err != nil { - return err - } - log.Printf("✅ Connected to yomo-zipper %s", addr) + host := strings.Split(addr, ":")[0] + port, err := strconv.Atoi(strings.Split(addr, ":")[1]) - stream, err := client.CreateStream(context.Background()) + cli, err := client.Connect(host, port).Name("source-b").Stream() if err != nil { - return err + panic(err) } - generateAndSendData(stream) + generateAndSendData(cli) return nil } -var codec = y3.NewCodec(0x12) +var codec = y3.NewCodec(0x3b) -func generateAndSendData(stream quic.Stream) { +func generateAndSendData(writer io.Writer) { for { - time.Sleep(100 * time.Millisecond) + time.Sleep(200 * time.Millisecond) - num := rand.New(rand.NewSource(time.Now().UnixNano())).Float32() * 2000 + str := generateString() - sendingBuf, _ := codec.Marshal(num) + sendingBuf, _ := codec.Marshal(str) - _, err := stream.Write(sendingBuf) + _, err := writer.Write(sendingBuf) if err != nil { - log.Printf("❌ Emit %v to yomo-zipper failure with err: %v", num, err) + log.Printf("❌ Emit %v to yomo-zipper failure with err: %s", str, err) } else { - log.Printf("✅ Emit %v to yomo-zipper", num) + log.Printf("✅ Emit %s to yomo-zipper", str) } } } + +func generateString() string { + b := make([]byte, 10) + for i := range b { + b[i] = charset[seed.Intn(len(charset))] + } + return string(b) +} diff --git a/example/trainingmodel/zipper/workflow.yaml b/example/trainingmodel/zipper/workflow.yaml index 9407277ce01fbf781c71f75f7c85fa97b16ace3d..f9cea708de1f54590606b4f6400ac88a5b4fd7e8 100644 --- a/example/trainingmodel/zipper/workflow.yaml +++ b/example/trainingmodel/zipper/workflow.yaml @@ -1,7 +1,5 @@ name: Service host: localhost -port: 9999 +port: 9000 flows: - - name: training - host: localhost - port: 4242 \ No newline at end of file + - name: training \ No newline at end of file diff --git a/example/workflow.yaml b/example/workflow.yaml index f35543ba7a6e8062dcd5a4f09bea606ad46487dc..942ec68ef2108026da04f75bf7589503d4824d1b 100644 --- a/example/workflow.yaml +++ b/example/workflow.yaml @@ -1,11 +1,7 @@ name: Service host: localhost -port: 9999 +port: 9000 flows: - name: Noise - host: localhost - port: 4242 -# sinks: -# - name: Mock DB -# host: localhost -# port: 4141 \ No newline at end of file +sinks: + - name: MockDB \ No newline at end of file diff --git a/internal/cmd/run.go b/internal/cmd/run.go index 091f9d04b0128dcc20d87ed7f1a61c40a3cb38a8..2cdb8069a67ac6079868442355abb7b30e3851a4 100644 --- a/internal/cmd/run.go +++ b/internal/cmd/run.go @@ -15,7 +15,8 @@ import ( type RunOptions struct { baseOptions // Port is the port number of UDP host for Serverless function (default is 4242). - Endpoint string + Url string + Name string } // NewCmdRun creates a new command run. @@ -36,19 +37,24 @@ func NewCmdRun() *cobra.Command { if env != "" { log.Printf("Get YOMO_ENV: %s", env) } + if opts.Url == "" { + opts.Url = "localhost:9000" + } - host := strings.Split(opts.Endpoint, ":")[0] - port, _ := strconv.Atoi(strings.Split(opts.Endpoint, ":")[1]) - cli, err := client.Connect(host, port).Name("Noise").Stream() + host := strings.Split(opts.Url, ":")[0] + port, _ := strconv.Atoi(strings.Split(opts.Url, ":")[1]) + cli, err := client.Connect(host, port).Name(opts.Name).Stream() hanlder := slHandler.(func(rxStream rx.RxStream) rx.RxStream) + log.Print("Running the Serverless Function.") cli.Pipe(hanlder) }, } cmd.Flags().StringVarP(&opts.Filename, "file-name", "f", "app.go", "Serverless function file (default is app.go)") - cmd.Flags().StringVarP(&opts.Endpoint, "endpoint", "e", "localhost:9999", "xxx") + cmd.Flags().StringVarP(&opts.Url, "url", "u", "localhost:9000", "zipper server endpoint addr (default is localhost:9000)") + cmd.Flags().StringVarP(&opts.Name, "name", "n", "yomo-app", "yomo serverless app name(default is yomo-app)") return cmd } diff --git a/internal/cmd/wf/dev.go b/internal/cmd/wf/dev.go index f6b833827e033b637ec31d043e820326a5e9594c..332e3357f3c0bd64d3a63a6d8afcf4cf25ff006b 100644 --- a/internal/cmd/wf/dev.go +++ b/internal/cmd/wf/dev.go @@ -88,11 +88,9 @@ func (s *quicDevHandler) Listen() error { } value := customer.V.([]byte) - for _, sink := range sinks { go func(_sink func() (io.Writer, func()), buf []byte) { writer, cancel := _sink() - if writer != nil { _, err := writer.Write(buf) if err != nil { diff --git a/internal/cmd/wf/run.go b/internal/cmd/wf/run.go index 17c2a522b58858c92daf5aacf2db9495a7739d19..521b0fc8aae08fa08f2f6348c7451cf91e67b723 100644 --- a/internal/cmd/wf/run.go +++ b/internal/cmd/wf/run.go @@ -1,101 +1,137 @@ package wf -// import ( -// "fmt" -// "io" -// "log" -// "time" - -// "github.com/spf13/cobra" -// "github.com/yomorun/yomo/internal/conf" -// "github.com/yomorun/yomo/internal/dispatcher" -// "github.com/yomorun/yomo/internal/workflow" -// "github.com/yomorun/yomo/pkg/quic" -// ) - -// // RunOptions are the options for run command. -// type RunOptions struct { -// baseOptions -// } - -// // NewCmdRun creates a new command run. -// func NewCmdRun() *cobra.Command { -// var opts = &RunOptions{} - -// var cmd = &cobra.Command{ -// Use: "run", -// Short: "Run a YoMo Serverless Function", -// Long: "Run a YoMo Serverless Function", -// Run: func(cmd *cobra.Command, args []string) { -// conf, err := parseConfig(&opts.baseOptions, args) -// if err != nil { -// log.Print("❌ ", err) -// return -// } -// printZipperConf(conf) - -// quicHandler := &quicHandler{ -// serverlessConfig: conf, -// mergeChan: make(chan []byte, 20), -// } - -// endpoint := fmt.Sprintf("0.0.0.0:%d", conf.Port) - -// log.Print("Running YoMo workflow...") -// err = workflow.Run(endpoint, quicHandler) -// if err != nil { -// log.Print("❌ ", err) -// return -// } -// }, -// } - -// cmd.Flags().StringVarP(&opts.Config, "config", "c", "workflow.yaml", "Workflow config file (default is workflow.yaml)") - -// return cmd -// } - -// type quicHandler struct { -// serverlessConfig *conf.WorkflowConfig -// mergeChan chan []byte -// } - -// func (s *quicHandler) Listen() error { - -// return nil -// } - -// func (s *quicHandler) Read(st quic.Stream) error { -// id := time.Now().UnixNano() -// flows, sinks := workflow.Build(s.serverlessConfig, id) - -// stream := dispatcher.DispatcherWithFunc(flows, st) - -// go func() { -// for customer := range stream.Observe() { -// if customer.Error() { -// fmt.Println(customer.E.Error()) -// continue -// } -// value := customer.V.([]byte) -// if len(value) == 1 && value[0] == byte(0) { -// continue -// } - -// for _, sink := range sinks { -// go func(_sink func() (io.Writer, func()), buf []byte) { -// writer, cancel := _sink() -// if writer != nil { -// _, err := writer.Write(buf) -// if err != nil { -// cancel() -// } - -// } -// }(sink, value) -// } -// } -// }() - -// return nil -// } +import ( + "fmt" + "io" + "log" + "sync" + + "github.com/spf13/cobra" + "github.com/yomorun/yomo/internal/conf" + "github.com/yomorun/yomo/internal/dispatcher" + "github.com/yomorun/yomo/internal/workflow" + "github.com/yomorun/yomo/pkg/quic" +) + +// RunOptions are the options for run command. +type RunOptions struct { + baseOptions +} + +// NewCmdRun creates a new command run. +func NewCmdRun() *cobra.Command { + var opts = &RunOptions{} + + var cmd = &cobra.Command{ + Use: "run", + Short: "Run a YoMo Serverless Function", + Long: "Run a YoMo Serverless Function", + Run: func(cmd *cobra.Command, args []string) { + conf, err := parseConfig(&opts.baseOptions, args) + if err != nil { + log.Print("❌ ", err) + return + } + printZipperConf(conf) + + quicHandler := &quicHandler{ + serverlessConfig: conf, + connMap: map[int64]*workflow.QuicConn{}, + build: make(chan quic.Stream), + index: 0, + } + + endpoint := fmt.Sprintf("0.0.0.0:%d", conf.Port) + + log.Print("Running YoMo workflow...") + err = workflow.Run(endpoint, quicHandler) + if err != nil { + log.Print("❌ ", err) + return + } + }, + } + + cmd.Flags().StringVarP(&opts.Config, "config", "c", "workflow.yaml", "Workflow config file (default is workflow.yaml)") + + return cmd +} + +type quicHandler struct { + serverlessConfig *conf.WorkflowConfig + connMap map[int64]*workflow.QuicConn + build chan quic.Stream + index int + mutex sync.RWMutex +} + +func (s *quicHandler) Listen() error { + go func() { + for { + select { + case item, ok := <-s.build: + if !ok { + return + } + + flows, sinks := workflow.Build(s.serverlessConfig, &s.connMap, s.index) + stream := dispatcher.DispatcherWithFunc(flows, item) + + go func() { + for customer := range stream.Observe() { + if customer.Error() { + fmt.Println(customer.E.Error()) + continue + } + + value := customer.V.([]byte) + + for _, sink := range sinks { + go func(_sink func() (io.Writer, func()), buf []byte) { + writer, cancel := _sink() + + if writer != nil { + _, err := writer.Write(buf) + if err != nil { + cancel() + } + } + }(sink, value) + } + } + }() + s.index++ + + } + } + }() + return nil +} + +func (s *quicHandler) Read(id int64, sess quic.Session, st quic.Stream) error { + s.mutex.Lock() + + if conn, ok := s.connMap[id]; ok { + if conn.StreamType == "source" { + conn.Stream = append(conn.Stream, st) + s.build <- st + } else { + conn.Stream = append(conn.Stream, st) + } + } else { + conn := &workflow.QuicConn{ + Session: sess, + Signal: st, + Stream: make([]io.ReadWriter, 0), + StreamType: "", + Name: "", + Heartbeat: make(chan byte), + IsClose: false, + Ready: true, + } + conn.Init(s.serverlessConfig) + s.connMap[id] = conn + } + s.mutex.Unlock() + return nil +} diff --git a/internal/cmd/wf/utils.go b/internal/cmd/wf/utils.go index af06694db3d41e98cdb01c0139d09ecc9a2052eb..e5ad09c91db48de5050caae98e54b3e46e9bdb52 100644 --- a/internal/cmd/wf/utils.go +++ b/internal/cmd/wf/utils.go @@ -60,7 +60,7 @@ func validateConfig(wfConf *conf.WorkflowConfig) error { missingParams := []string{} for k, apps := range m { for _, app := range apps { - if app.Name == "" || app.Host == "" || app.Port <= 0 { + if app.Name == "" { missingParams = append(missingParams, k) } } @@ -85,11 +85,11 @@ func validateConfig(wfConf *conf.WorkflowConfig) error { func printZipperConf(wfConf *conf.WorkflowConfig) { log.Printf("Found %d flows in zipper config", len(wfConf.Flows)) for i, flow := range wfConf.Flows { - log.Printf("Flow %d: %s on %s:%d", i+1, flow.Name, flow.Host, flow.Port) + log.Printf("Flow %d: %s", i+1, flow.Name) } log.Printf("Found %d sinks in zipper config", len(wfConf.Sinks)) for i, sink := range wfConf.Sinks { - log.Printf("Sink %d: %s on %s:%d", i+1, sink.Name, sink.Host, sink.Port) + log.Printf("Sink %d: %s", i+1, sink.Name) } } diff --git a/internal/conf/workflow.go b/internal/conf/workflow.go index 3718b4628ed2718113a8892a5854dc85c6caef90..be0f9737becace49ab1aa8f1756287f064c518db 100644 --- a/internal/conf/workflow.go +++ b/internal/conf/workflow.go @@ -8,8 +8,6 @@ import ( type App struct { Name string `yaml:"name"` - Host string `yaml:"host"` - Port int `yaml:"port"` } type Workflow struct { @@ -18,7 +16,9 @@ type Workflow struct { } type WorkflowConfig struct { - App `yaml:",inline"` + Name string `yaml:"name"` + Host string `yaml:"host"` + Port int `yaml:"port"` Workflow `yaml:",inline"` } diff --git a/internal/workflow/runtime.go b/internal/workflow/runtime.go index 3770fa44ed97a8a5f8f1649740efe82e627ba8f5..67c58da76dc3425721db41d6105e8420eb328d9c 100644 --- a/internal/workflow/runtime.go +++ b/internal/workflow/runtime.go @@ -53,7 +53,7 @@ func (c *QuicConn) Init(conf *conf.WorkflowConfig) { } } index++ - fmt.Println("Receive App:", c.Name, c.StreamType, index) + fmt.Println("Receive App:", c.Name, c.StreamType) if c.StreamType == "source" { c.Signal.Write([]byte{1}) } else { @@ -142,6 +142,7 @@ func Build(wfConf *conf.WorkflowConfig, connMap *map[int64]*QuicConn, index int) } func createReadWriter(app conf.App, connMap *map[int64]*QuicConn, index int) func() (io.ReadWriter, func()) { + fmt.Println("flow s.index.:", index) f := func() (io.ReadWriter, func()) { if app.Name != GlobalApp { index = 0 @@ -159,6 +160,7 @@ func createReadWriter(app conf.App, connMap *map[int64]*QuicConn, index int) fun if conn == nil { return nil, func() {} } else if len(conn.Stream) > index && conn.Stream[index] != nil { + conn.Ready = true return conn.Stream[index], cancelStream(app, conn, connMap, id) } else { if conn.Ready { @@ -174,11 +176,11 @@ func createReadWriter(app conf.App, connMap *map[int64]*QuicConn, index int) fun } func createWriter(app conf.App, connMap *map[int64]*QuicConn, index int) func() (io.Writer, func()) { - + fmt.Println("sink s.index.:", index) f := func() (io.Writer, func()) { - if app.Name != GlobalApp { - index = 0 - } + // if app.Name != GlobalApp { + // index = 0 + // } var conn *QuicConn = nil var id int64 = 0 @@ -193,6 +195,7 @@ func createWriter(app conf.App, connMap *map[int64]*QuicConn, index int) func() if conn == nil { return nil, func() {} } else if len(conn.Stream) > index && conn.Stream[index] != nil { + conn.Ready = true return conn.Stream[index], cancelStream(app, conn, connMap, id) } else { if conn.Ready { diff --git a/pkg/client/client.go b/pkg/client/client.go index 34c6ba31d1fe8eb6e65ae5aa6a0eecd9ea3ee6f5..5da8484f2b8405593212a724f862e43e14e841d6 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -162,7 +162,11 @@ func (c *client) reTry() { // source func (c *client) Write(b []byte) (int, error) { - return c.stream.Write(b) + if c.stream != nil { + return c.stream.Write(b) + } else { + return 0, errors.New("not found stream") + } } // flow || sink @@ -177,7 +181,6 @@ func (c *client) Pipe(f func(rxstream rx.RxStream) rx.RxStream) { panic(customer.E) } else if customer.V != nil { index := rand.Intn(len(c.writers)) - loop: for i, w := range c.writers { if index == i { @@ -186,6 +189,8 @@ func (c *client) Pipe(f func(rxstream rx.RxStream) rx.RxStream) { index = rand.Intn(len(c.writers)) break loop } + } else { + w.Write([]byte{0}) } } @@ -195,8 +200,6 @@ func (c *client) Pipe(f func(rxstream rx.RxStream) rx.RxStream) { func (c *client) Close() { c.session.Close() - close(c.accepted) - close(c.heartbeat) c.writers = make([]io.Writer, 0) c.accepted = make(chan int) c.heartbeat = make(chan byte)