提交 42dc9a28 编写于 作者: J jjwygjj

fix cmd opts

上级 144d637a
......@@ -19,7 +19,7 @@ func main() {
Long: "Wf is the commands for YoMo workflow.",
}
wfCmd.AddCommand(
// wf.NewCmdRun(),
wf.NewCmdRun(),
wf.NewCmdDev(),
)
......
......@@ -24,7 +24,7 @@ var printer = func(_ context.Context, i interface{}) (interface{}, error) {
value := i.(NoiseData)
rightNow := time.Now().UnixNano() / int64(time.Millisecond)
fmt.Println(fmt.Sprintf("[%s] %d > value: %f ⚡️=%dms", value.From, value.Time, value.Noise, rightNow-value.Time))
return i, nil
return value.Noise, nil
}
var callback = func(v []byte) (interface{}, error) {
......@@ -45,7 +45,7 @@ func Handler(rxstream rx.RxStream) rx.RxStream {
Debounce(rxgo.WithDuration(50 * time.Millisecond)).
Map(printer).
StdOut().
Encode(0x10)
Encode(0x11)
return stream
}
......@@ -26,6 +26,6 @@ func Handler(rxstream rx.RxStream) rx.RxStream {
OnObserve(callback).
AuditTime(100 * time.Millisecond).
Map(store).
Encode(0x10)
Encode(0x12)
return stream
}
......@@ -8,13 +8,17 @@ import (
"github.com/yomorun/yomo/pkg/rx"
)
const DataAKey = 0x11
const DataBKey = 0x12
const DataAKey = 0x3a
const DataBKey = 0x3b
var callback = func(v []byte) (interface{}, error) {
var callbacka = func(v []byte) (interface{}, error) {
return y3.ToFloat32(v)
}
var callbackb = func(v []byte) (interface{}, error) {
return y3.ToUTF8String(v)
}
var printera = func(_ context.Context, i interface{}) (interface{}, error) {
value := i.(float32)
fmt.Println(fmt.Sprintf("[%s]> value: %f", "data-a", value))
......@@ -22,19 +26,19 @@ var printera = func(_ context.Context, i interface{}) (interface{}, error) {
}
var printerb = func(_ context.Context, i interface{}) (interface{}, error) {
value := i.(float32)
fmt.Println(fmt.Sprintf("[%s]> value: %f", "data-b", value))
value := i.(string)
fmt.Println(fmt.Sprintf("[%s]> value: %s", "data-b", value))
return i, nil
}
var zipper = func(_ context.Context, ia interface{}, ib interface{}) (interface{}, error) {
return fmt.Sprintf("⚡️ Zip [%s],[%s] -> Value: %f, %f", "dataA", "dataB", ia.(float32), ib.(float32)), nil
return fmt.Sprintf("⚡️ Zip [%s],[%s] -> Value: %f, %s", "dataA", "dataB", ia.(float32), ib.(string)), nil
}
func Handler(rxstream rx.RxStream) rx.RxStream {
streamA := rxstream.Subscribe(DataAKey).OnObserve(callback).Map(printera)
streamB := rxstream.Subscribe(DataBKey).OnObserve(callback).Map(printerb)
streamA := rxstream.Subscribe(DataAKey).OnObserve(callbacka).Map(printera)
streamB := rxstream.Subscribe(DataBKey).OnObserve(callbackb).Map(printerb)
stream := streamA.ZipFromIterable(streamB, zipper).StdOut().Encode(0x10)
return stream
......
package main
import (
"context"
"io"
"log"
"math/rand"
"os"
"strconv"
"strings"
"time"
y3 "github.com/yomorun/y3-codec-golang"
"github.com/yomorun/yomo/pkg/quic"
"github.com/yomorun/yomo/pkg/client"
)
var zipperAddr = os.Getenv("YOMO_ZIPPER_ENDPOINT")
func main() {
if zipperAddr == "" {
zipperAddr = "localhost:9999"
zipperAddr = "localhost:9000"
}
err := emit(zipperAddr)
if err != nil {
......@@ -24,36 +26,32 @@ func main() {
}
func emit(addr string) error {
client, err := quic.NewClient(addr)
if err != nil {
return err
}
log.Printf("✅ Connected to yomo-zipper %s", addr)
host := strings.Split(addr, ":")[0]
port, err := strconv.Atoi(strings.Split(addr, ":")[1])
stream, err := client.CreateStream(context.Background())
cli, err := client.Connect(host, port).Name("source-a").Stream()
if err != nil {
return err
panic(err)
}
generateAndSendData(stream)
generateAndSendData(cli)
return nil
}
var codec = y3.NewCodec(0x11)
var codec = y3.NewCodec(0x3a)
func generateAndSendData(stream quic.Stream) {
func generateAndSendData(writer io.Writer) {
for {
time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
num := rand.New(rand.NewSource(time.Now().UnixNano())).Float32() * 200
sendingBuf, _ := codec.Marshal(num)
_, err := stream.Write(sendingBuf)
_, err := writer.Write(sendingBuf)
if err != nil {
log.Printf("❌ Emit %v to yomo-zipper failure with err: %v", num, err)
log.Printf("❌ Emit %v to yomo-zipper failure with err: %f", num, err)
} else {
log.Printf("✅ Emit %v to yomo-zipper", num)
log.Printf("✅ Emit %f to yomo-zipper", num)
}
}
}
package main
import (
"context"
"io"
"log"
"math/rand"
"os"
"strconv"
"strings"
"time"
y3 "github.com/yomorun/y3-codec-golang"
"github.com/yomorun/yomo/pkg/quic"
"github.com/yomorun/yomo/pkg/client"
)
var zipperAddr = os.Getenv("YOMO_ZIPPER_ENDPOINT")
const charset = "abcdefghijklmnopqrstuvwxyz"
var seed *rand.Rand = rand.New(rand.NewSource(time.Now().UnixNano()))
func main() {
if zipperAddr == "" {
zipperAddr = "localhost:9999"
zipperAddr = "localhost:9000"
}
err := emit(zipperAddr)
if err != nil {
......@@ -25,38 +31,43 @@ func main() {
func emit(addr string) error {
client, err := quic.NewClient(addr)
if err != nil {
return err
}
log.Printf("✅ Connected to yomo-zipper %s", addr)
host := strings.Split(addr, ":")[0]
port, err := strconv.Atoi(strings.Split(addr, ":")[1])
stream, err := client.CreateStream(context.Background())
cli, err := client.Connect(host, port).Name("source-b").Stream()
if err != nil {
return err
panic(err)
}
generateAndSendData(stream)
generateAndSendData(cli)
return nil
}
var codec = y3.NewCodec(0x12)
var codec = y3.NewCodec(0x3b)
func generateAndSendData(stream quic.Stream) {
func generateAndSendData(writer io.Writer) {
for {
time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
num := rand.New(rand.NewSource(time.Now().UnixNano())).Float32() * 2000
str := generateString()
sendingBuf, _ := codec.Marshal(num)
sendingBuf, _ := codec.Marshal(str)
_, err := stream.Write(sendingBuf)
_, err := writer.Write(sendingBuf)
if err != nil {
log.Printf("❌ Emit %v to yomo-zipper failure with err: %v", num, err)
log.Printf("❌ Emit %v to yomo-zipper failure with err: %s", str, err)
} else {
log.Printf("✅ Emit %v to yomo-zipper", num)
log.Printf("✅ Emit %s to yomo-zipper", str)
}
}
}
func generateString() string {
b := make([]byte, 10)
for i := range b {
b[i] = charset[seed.Intn(len(charset))]
}
return string(b)
}
name: Service
host: localhost
port: 9999
port: 9000
flows:
- name: training
host: localhost
port: 4242
\ No newline at end of file
- name: training
\ No newline at end of file
name: Service
host: localhost
port: 9999
port: 9000
flows:
- name: Noise
host: localhost
port: 4242
# sinks:
# - name: Mock DB
# host: localhost
# port: 4141
\ No newline at end of file
sinks:
- name: MockDB
\ No newline at end of file
......@@ -15,7 +15,8 @@ import (
type RunOptions struct {
baseOptions
// Port is the port number of UDP host for Serverless function (default is 4242).
Endpoint string
Url string
Name string
}
// NewCmdRun creates a new command run.
......@@ -36,19 +37,24 @@ func NewCmdRun() *cobra.Command {
if env != "" {
log.Printf("Get YOMO_ENV: %s", env)
}
if opts.Url == "" {
opts.Url = "localhost:9000"
}
host := strings.Split(opts.Endpoint, ":")[0]
port, _ := strconv.Atoi(strings.Split(opts.Endpoint, ":")[1])
cli, err := client.Connect(host, port).Name("Noise").Stream()
host := strings.Split(opts.Url, ":")[0]
port, _ := strconv.Atoi(strings.Split(opts.Url, ":")[1])
cli, err := client.Connect(host, port).Name(opts.Name).Stream()
hanlder := slHandler.(func(rxStream rx.RxStream) rx.RxStream)
log.Print("Running the Serverless Function.")
cli.Pipe(hanlder)
},
}
cmd.Flags().StringVarP(&opts.Filename, "file-name", "f", "app.go", "Serverless function file (default is app.go)")
cmd.Flags().StringVarP(&opts.Endpoint, "endpoint", "e", "localhost:9999", "xxx")
cmd.Flags().StringVarP(&opts.Url, "url", "u", "localhost:9000", "zipper server endpoint addr (default is localhost:9000)")
cmd.Flags().StringVarP(&opts.Name, "name", "n", "yomo-app", "yomo serverless app name(default is yomo-app)")
return cmd
}
......@@ -88,11 +88,9 @@ func (s *quicDevHandler) Listen() error {
}
value := customer.V.([]byte)
for _, sink := range sinks {
go func(_sink func() (io.Writer, func()), buf []byte) {
writer, cancel := _sink()
if writer != nil {
_, err := writer.Write(buf)
if err != nil {
......
package wf
// import (
// "fmt"
// "io"
// "log"
// "time"
// "github.com/spf13/cobra"
// "github.com/yomorun/yomo/internal/conf"
// "github.com/yomorun/yomo/internal/dispatcher"
// "github.com/yomorun/yomo/internal/workflow"
// "github.com/yomorun/yomo/pkg/quic"
// )
// // RunOptions are the options for run command.
// type RunOptions struct {
// baseOptions
// }
// // NewCmdRun creates a new command run.
// func NewCmdRun() *cobra.Command {
// var opts = &RunOptions{}
// var cmd = &cobra.Command{
// Use: "run",
// Short: "Run a YoMo Serverless Function",
// Long: "Run a YoMo Serverless Function",
// Run: func(cmd *cobra.Command, args []string) {
// conf, err := parseConfig(&opts.baseOptions, args)
// if err != nil {
// log.Print("❌ ", err)
// return
// }
// printZipperConf(conf)
// quicHandler := &quicHandler{
// serverlessConfig: conf,
// mergeChan: make(chan []byte, 20),
// }
// endpoint := fmt.Sprintf("0.0.0.0:%d", conf.Port)
// log.Print("Running YoMo workflow...")
// err = workflow.Run(endpoint, quicHandler)
// if err != nil {
// log.Print("❌ ", err)
// return
// }
// },
// }
// cmd.Flags().StringVarP(&opts.Config, "config", "c", "workflow.yaml", "Workflow config file (default is workflow.yaml)")
// return cmd
// }
// type quicHandler struct {
// serverlessConfig *conf.WorkflowConfig
// mergeChan chan []byte
// }
// func (s *quicHandler) Listen() error {
// return nil
// }
// func (s *quicHandler) Read(st quic.Stream) error {
// id := time.Now().UnixNano()
// flows, sinks := workflow.Build(s.serverlessConfig, id)
// stream := dispatcher.DispatcherWithFunc(flows, st)
// go func() {
// for customer := range stream.Observe() {
// if customer.Error() {
// fmt.Println(customer.E.Error())
// continue
// }
// value := customer.V.([]byte)
// if len(value) == 1 && value[0] == byte(0) {
// continue
// }
// for _, sink := range sinks {
// go func(_sink func() (io.Writer, func()), buf []byte) {
// writer, cancel := _sink()
// if writer != nil {
// _, err := writer.Write(buf)
// if err != nil {
// cancel()
// }
// }
// }(sink, value)
// }
// }
// }()
// return nil
// }
import (
"fmt"
"io"
"log"
"sync"
"github.com/spf13/cobra"
"github.com/yomorun/yomo/internal/conf"
"github.com/yomorun/yomo/internal/dispatcher"
"github.com/yomorun/yomo/internal/workflow"
"github.com/yomorun/yomo/pkg/quic"
)
// RunOptions are the options for run command.
type RunOptions struct {
baseOptions
}
// NewCmdRun creates a new command run.
func NewCmdRun() *cobra.Command {
var opts = &RunOptions{}
var cmd = &cobra.Command{
Use: "run",
Short: "Run a YoMo Serverless Function",
Long: "Run a YoMo Serverless Function",
Run: func(cmd *cobra.Command, args []string) {
conf, err := parseConfig(&opts.baseOptions, args)
if err != nil {
log.Print("❌ ", err)
return
}
printZipperConf(conf)
quicHandler := &quicHandler{
serverlessConfig: conf,
connMap: map[int64]*workflow.QuicConn{},
build: make(chan quic.Stream),
index: 0,
}
endpoint := fmt.Sprintf("0.0.0.0:%d", conf.Port)
log.Print("Running YoMo workflow...")
err = workflow.Run(endpoint, quicHandler)
if err != nil {
log.Print("❌ ", err)
return
}
},
}
cmd.Flags().StringVarP(&opts.Config, "config", "c", "workflow.yaml", "Workflow config file (default is workflow.yaml)")
return cmd
}
type quicHandler struct {
serverlessConfig *conf.WorkflowConfig
connMap map[int64]*workflow.QuicConn
build chan quic.Stream
index int
mutex sync.RWMutex
}
func (s *quicHandler) Listen() error {
go func() {
for {
select {
case item, ok := <-s.build:
if !ok {
return
}
flows, sinks := workflow.Build(s.serverlessConfig, &s.connMap, s.index)
stream := dispatcher.DispatcherWithFunc(flows, item)
go func() {
for customer := range stream.Observe() {
if customer.Error() {
fmt.Println(customer.E.Error())
continue
}
value := customer.V.([]byte)
for _, sink := range sinks {
go func(_sink func() (io.Writer, func()), buf []byte) {
writer, cancel := _sink()
if writer != nil {
_, err := writer.Write(buf)
if err != nil {
cancel()
}
}
}(sink, value)
}
}
}()
s.index++
}
}
}()
return nil
}
func (s *quicHandler) Read(id int64, sess quic.Session, st quic.Stream) error {
s.mutex.Lock()
if conn, ok := s.connMap[id]; ok {
if conn.StreamType == "source" {
conn.Stream = append(conn.Stream, st)
s.build <- st
} else {
conn.Stream = append(conn.Stream, st)
}
} else {
conn := &workflow.QuicConn{
Session: sess,
Signal: st,
Stream: make([]io.ReadWriter, 0),
StreamType: "",
Name: "",
Heartbeat: make(chan byte),
IsClose: false,
Ready: true,
}
conn.Init(s.serverlessConfig)
s.connMap[id] = conn
}
s.mutex.Unlock()
return nil
}
......@@ -60,7 +60,7 @@ func validateConfig(wfConf *conf.WorkflowConfig) error {
missingParams := []string{}
for k, apps := range m {
for _, app := range apps {
if app.Name == "" || app.Host == "" || app.Port <= 0 {
if app.Name == "" {
missingParams = append(missingParams, k)
}
}
......@@ -85,11 +85,11 @@ func validateConfig(wfConf *conf.WorkflowConfig) error {
func printZipperConf(wfConf *conf.WorkflowConfig) {
log.Printf("Found %d flows in zipper config", len(wfConf.Flows))
for i, flow := range wfConf.Flows {
log.Printf("Flow %d: %s on %s:%d", i+1, flow.Name, flow.Host, flow.Port)
log.Printf("Flow %d: %s", i+1, flow.Name)
}
log.Printf("Found %d sinks in zipper config", len(wfConf.Sinks))
for i, sink := range wfConf.Sinks {
log.Printf("Sink %d: %s on %s:%d", i+1, sink.Name, sink.Host, sink.Port)
log.Printf("Sink %d: %s", i+1, sink.Name)
}
}
......@@ -8,8 +8,6 @@ import (
type App struct {
Name string `yaml:"name"`
Host string `yaml:"host"`
Port int `yaml:"port"`
}
type Workflow struct {
......@@ -18,7 +16,9 @@ type Workflow struct {
}
type WorkflowConfig struct {
App `yaml:",inline"`
Name string `yaml:"name"`
Host string `yaml:"host"`
Port int `yaml:"port"`
Workflow `yaml:",inline"`
}
......
......@@ -53,7 +53,7 @@ func (c *QuicConn) Init(conf *conf.WorkflowConfig) {
}
}
index++
fmt.Println("Receive App:", c.Name, c.StreamType, index)
fmt.Println("Receive App:", c.Name, c.StreamType)
if c.StreamType == "source" {
c.Signal.Write([]byte{1})
} else {
......@@ -142,6 +142,7 @@ func Build(wfConf *conf.WorkflowConfig, connMap *map[int64]*QuicConn, index int)
}
func createReadWriter(app conf.App, connMap *map[int64]*QuicConn, index int) func() (io.ReadWriter, func()) {
fmt.Println("flow s.index.:", index)
f := func() (io.ReadWriter, func()) {
if app.Name != GlobalApp {
index = 0
......@@ -159,6 +160,7 @@ func createReadWriter(app conf.App, connMap *map[int64]*QuicConn, index int) fun
if conn == nil {
return nil, func() {}
} else if len(conn.Stream) > index && conn.Stream[index] != nil {
conn.Ready = true
return conn.Stream[index], cancelStream(app, conn, connMap, id)
} else {
if conn.Ready {
......@@ -174,11 +176,11 @@ func createReadWriter(app conf.App, connMap *map[int64]*QuicConn, index int) fun
}
func createWriter(app conf.App, connMap *map[int64]*QuicConn, index int) func() (io.Writer, func()) {
fmt.Println("sink s.index.:", index)
f := func() (io.Writer, func()) {
if app.Name != GlobalApp {
index = 0
}
// if app.Name != GlobalApp {
// index = 0
// }
var conn *QuicConn = nil
var id int64 = 0
......@@ -193,6 +195,7 @@ func createWriter(app conf.App, connMap *map[int64]*QuicConn, index int) func()
if conn == nil {
return nil, func() {}
} else if len(conn.Stream) > index && conn.Stream[index] != nil {
conn.Ready = true
return conn.Stream[index], cancelStream(app, conn, connMap, id)
} else {
if conn.Ready {
......
......@@ -162,7 +162,11 @@ func (c *client) reTry() {
// source
func (c *client) Write(b []byte) (int, error) {
return c.stream.Write(b)
if c.stream != nil {
return c.stream.Write(b)
} else {
return 0, errors.New("not found stream")
}
}
// flow || sink
......@@ -177,7 +181,6 @@ func (c *client) Pipe(f func(rxstream rx.RxStream) rx.RxStream) {
panic(customer.E)
} else if customer.V != nil {
index := rand.Intn(len(c.writers))
loop:
for i, w := range c.writers {
if index == i {
......@@ -186,6 +189,8 @@ func (c *client) Pipe(f func(rxstream rx.RxStream) rx.RxStream) {
index = rand.Intn(len(c.writers))
break loop
}
} else {
w.Write([]byte{0})
}
}
......@@ -195,8 +200,6 @@ func (c *client) Pipe(f func(rxstream rx.RxStream) rx.RxStream) {
func (c *client) Close() {
c.session.Close()
close(c.accepted)
close(c.heartbeat)
c.writers = make([]io.Writer, 0)
c.accepted = make(chan int)
c.heartbeat = make(chan byte)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册