diff --git a/internal/cmd/init.go b/internal/cmd/init.go index b41b91e053ae5b552e02349192ad0fd57e3e8f1c..d5efdc9bf67c9d82227cddfe764baa83849cff6d 100644 --- a/internal/cmd/init.go +++ b/internal/cmd/init.go @@ -109,9 +109,9 @@ import ( ) type NoiseData struct { - Noise float32 ` + "`yomo:\"0x11\"`" +` - Time int64 ` + "`yomo:\"0x12\"`" +` - From string ` + "`yomo:\"0x13\"`" +` + Noise float32 ` + "`yomo:\"0x11\"`" + ` + Time int64 ` + "`yomo:\"0x12\"`" + ` + From string ` + "`yomo:\"0x13\"`" + ` } var printer = func(_ context.Context, i interface{}) (interface{}, error) { diff --git a/internal/cmd/wf/dev.go b/internal/cmd/wf/dev.go index 03f03b73e548e06a7a83f0d4473c23457629e85e..0abf2f98aea9d2b5e51c7a5fc12b0f4cfc20d26f 100644 --- a/internal/cmd/wf/dev.go +++ b/internal/cmd/wf/dev.go @@ -32,6 +32,7 @@ func NewCmdDev() *cobra.Command { log.Print("❌ ", err) return } + printZipperConf(conf) log.Print("Running YoMo workflow...") endpoint := fmt.Sprintf("0.0.0.0:%d", conf.Port) diff --git a/internal/cmd/wf/run.go b/internal/cmd/wf/run.go index 9ed90dd4306d8ed8dee68cd5796c8ffa4d5a51ab..12252b9a144a040b693b52cf1b0926fa72efe0c6 100644 --- a/internal/cmd/wf/run.go +++ b/internal/cmd/wf/run.go @@ -31,6 +31,7 @@ func NewCmdRun() *cobra.Command { log.Print("❌ ", err) return } + printZipperConf(conf) quicHandler := &quicHandler{ serverlessConfig: conf, diff --git a/internal/cmd/wf/utils.go b/internal/cmd/wf/utils.go index 5df351bcfcb8ed4a397c5394e457452c91c5b38f..af06694db3d41e98cdb01c0139d09ecc9a2052eb 100644 --- a/internal/cmd/wf/utils.go +++ b/internal/cmd/wf/utils.go @@ -2,6 +2,7 @@ package wf import ( "errors" + "log" "strings" "github.com/yomorun/yomo/internal/conf" @@ -33,6 +34,7 @@ func parseConfig(opts *baseOptions, args []string) (*conf.WorkflowConfig, error) return nil, errors.New("Parse the workflow config failure with the error: " + err.Error()) } + // validate err = validateConfig(wfConf) if err != nil { return nil, err @@ -46,14 +48,48 @@ func validateConfig(wfConf *conf.WorkflowConfig) error { return errors.New("conf is nil") } + if len(wfConf.Flows) == 0 && len(wfConf.Sinks) == 0 { + return errors.New("At least one flow or sink is required") + } + + m := map[string][]conf.App{ + "Flows": wfConf.Flows, + "Sinks": wfConf.Sinks, + } + + missingParams := []string{} + for k, apps := range m { + for _, app := range apps { + if app.Name == "" || app.Host == "" || app.Port <= 0 { + missingParams = append(missingParams, k) + } + } + } + errMsg := "" if wfConf.Name == "" || wfConf.Host == "" || wfConf.Port <= 0 { errMsg = "Missing name, host or port in workflow config. " } + if len(missingParams) > 0 { + errMsg += "Missing name, host or port in " + strings.Join(missingParams, ", "+". ") + } + if errMsg != "" { return errors.New(errMsg) } return nil } + +func printZipperConf(wfConf *conf.WorkflowConfig) { + log.Printf("Found %d flows in zipper config", len(wfConf.Flows)) + for i, flow := range wfConf.Flows { + log.Printf("Flow %d: %s on %s:%d", i+1, flow.Name, flow.Host, flow.Port) + } + + log.Printf("Found %d sinks in zipper config", len(wfConf.Sinks)) + for i, sink := range wfConf.Sinks { + log.Printf("Sink %d: %s on %s:%d", i+1, sink.Name, sink.Host, sink.Port) + } +}