From 68311b20a784a1ec27977edcbf551a82e6c3b151 Mon Sep 17 00:00:00 2001 From: JJW <382689420@qq.com> Date: Mon, 1 Mar 2021 23:46:25 +0800 Subject: [PATCH] doc(use-case): add training model examples * doc(trainingmodel): add readme_cn * modify flow and add README.md Co-authored-by: jjwygjj Co-authored-by: C.C --- example/trainingmodel/README.md | 131 +++++++++++++++++++++++++++++ example/trainingmodel/README_CN.md | 128 ++++++++++++++++++++++++++++ example/trainingmodel/flow/app.go | 31 ++----- 3 files changed, 268 insertions(+), 22 deletions(-) create mode 100644 example/trainingmodel/README.md create mode 100644 example/trainingmodel/README_CN.md diff --git a/example/trainingmodel/README.md b/example/trainingmodel/README.md new file mode 100644 index 0000000..06eb196 --- /dev/null +++ b/example/trainingmodel/README.md @@ -0,0 +1,131 @@ +

+ +

+ +# Use Case:Combined calculation of multiple data sources + +## Our customer's asked: + +Our client needs to complete a calculation when there are multiple data sources generating data at high frequencies: a calculation task is performed only when all the data from all the data sources have arrived, then send computed result to the next processing session, otherwise, keeps waiting data. + +Usually, our business logic code intrudes on the collection of heterogeneous data from multiple sources, multi-threading, concurrency and computation caching, which prevents us from concentrating on abstracting and describing the abstraction: + +```go +var convert = func(v []byte) (interface{}, error) { + return y3.ToFloat32(v) +} + +var zipper = func(_ context.Context, ia interface{}, ib interface{}) (interface{}, error) { + result := ia.(float32) + ib.(float32) + return fmt.Sprintf("⚡️ Sum(%s: %f, %s: %f) => Result: %f", "data A", ia.(float32), "data B", ib.(float32), result), nil +} + +// Handler handle two event streams and calculate sum when data arrived +func Handler(rxstream rx.RxStream) rx.RxStream { + streamA := rxstream.Subscribe(0x11).OnObserve(convert) + streamB := rxstream.Subscribe(0x12).OnObserve(convert) + + // Rx Zip operator: http://reactivex.io/documentation/operators/zip.html + stream := streamA.ZipFromIterable(streamB, zipper).StdOut().Encode(0x13) + return stream +} + +``` + +## Code structure + ++ `source-data-a`: Analog data source A, sending random Float32 numbers. [yomo.run/source](https://yomo.run/source) ++ `source-data-b`: Analog data source B, sending random Float32 numbers. [yomo.run/source](https://yomo.run/source) ++ `flow`: Combine simulated data source A and simulated data source B for calculation[yomo.run/flow](https://yomo.run/flow) ++ `zipper`: Setup a workflow that receives multiple sources and completes the merge calculation [yomo.run/zipper](https://yomo.run/zipper) + +## Implementation + +### 1. Install CLI + +> **Note:** YoMo requires Go 1.15 and above, run `go version` to get the version of Go in your environment, please follow [this link](https://golang.org/doc/install) to install or upgrade if it doesn't fit the requirement. + +```bash +# Ensure use $GOPATH, golang requires main and plugin highly coupled +○ echo $GOPATH + +``` + +if `$GOPATH` is not set, check [Set $GOPATH and $GOBIN](#optional-set-gopath-and-gobin) first. + +```bash +$ GO111MODULE=off go get github.com/yomorun/yomo + +$ cd $GOPATH/src/github.com/yomorun/yomo + +$ make install +``` + +![YoMo Tutorial 1](https://yomo.run/tutorial-1.png) + +### 2. Start `flow` for streaming calculating + +```bash +$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/flow + +$ yomo run + +2021/03/01 19:01:48 Building the Serverless Function File... +2021/03/01 19:01:49 ✅ Listening on 0.0.0.0:4242 + +``` + +### 3. Start `zipper` to orgnize stream processing workflow + +```bash +$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/zipper + +$ yomo wf run + +2021/03/01 19:05:55 Found 1 flows in zipper config +2021/03/01 19:05:55 Flow 1: training on localhost:4242 +2021/03/01 19:05:55 Found 0 sinks in zipper config +2021/03/01 19:05:55 Running YoMo workflow... +2021/03/01 19:05:55 ✅ Listening on 0.0.0.0:9999 + +``` + +### 3. Run `source-data-a` + +```bash +$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/source-data-a + +$ go run main.go + +2021/03/01 17:35:04 ✅ Connected to yomo-zipper localhost:9999 +2021/03/01 17:35:05 ✅ Emit 123.41881 to yomo-zipper + +``` + +### 4. Run `source-data-b` + +```bash +$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/source-data-b + +$ go run main.go + +2021/03/01 17:35:04 ✅ Connected to yomo-zipper localhost:9999 +2021/03/01 17:35:05 ✅ Emit 123.41881 to yomo-zipper + +``` + +### 5. `flow` will have a constant flow of output + +```bash +[StdOut]: ⚡️ Sum(data A: 89.820206, data B: 1651.740967) => Result: 1741.561157 +[StdOut]: ⚡️ Sum(data A: 17.577374, data B: 619.293457) => Result: 636.870850 +[StdOut]: ⚡️ Sum(data A: 114.736366, data B: 964.614075) => Result: 1079.350464 +``` + +At this point, try to keep `Ctrl-C` dropping `source-data-a`, start it again after a while and see what happens to the `flow` output + +### 6. Congratulations! + +The problem has been solved in a simpler way than ever before! + +Find [More YoMo Use Cases](https://github.com/yomorun/yomo) diff --git a/example/trainingmodel/README_CN.md b/example/trainingmodel/README_CN.md new file mode 100644 index 0000000..a2a6ca0 --- /dev/null +++ b/example/trainingmodel/README_CN.md @@ -0,0 +1,128 @@ +

+ +

+ +# YoMo应用案例:多数据源的合并计算 + +## 目标 + +当有多个高频产生数据的数据源时,我们的客户需要完成一种计算:当所有数据源的数据都到齐后,才进行一次计算任务,并将计算结果传递给下一个处理环节,否则,就一直等待。通常,我们的业务逻辑代码会侵入对多源异构数据的采集、多线程、并发和计算缓存等问题,致使我们不能专心在对业务逻辑的抽象和描述上,而借助YoMo,一切都变得简单起来,您所需要实现的,只有如下几行代码: + +```go +var convert = func(v []byte) (interface{}, error) { + return y3.ToFloat32(v) +} + +var zipper = func(_ context.Context, ia interface{}, ib interface{}) (interface{}, error) { + result := ia.(float32) + ib.(float32) + return fmt.Sprintf("⚡️ Sum(%s: %f, %s: %f) => Result: %f", "data A", ia.(float32), "data B", ib.(float32), result), nil +} + +// Handler handle two event streams and calculate sum when data arrived +func Handler(rxstream rx.RxStream) rx.RxStream { + streamA := rxstream.Subscribe(0x11).OnObserve(convert) + streamB := rxstream.Subscribe(0x12).OnObserve(convert) + + // Rx Zip operator: http://reactivex.io/documentation/operators/zip.html + stream := streamA.ZipFromIterable(streamB, zipper).StdOut().Encode(0x13) + return stream +} + +``` + +## 代码结构 + ++ `source-data-a`: 模拟数据源A,发送随机 Float32 数字. [yomo.run/source](https://yomo.run/source) ++ `source-data-b`: 模拟数据源B,发送随机 Float32 数字. [yomo.run/source](https://yomo.run/source) ++ `flow`: 将模拟数据源A和模拟数据源B进行合并计算[yomo.run/flow](https://yomo.run/flow) ++ `zipper`: 设计一个workflow,接收多个source,并完成合并计算 [yomo.run/zipper](https://yomo.run/zipper) + +## 实现过程 + +### 1. 安装CLI + +> **注意:** YoMo 的运行环境要求 Go 版本为 1.15 或以上,运行 `go version` 获取当前环境的版本,如果未安装 Go 或者不符合 Go 版本要求时,请安装或者升级 Go 版本。 +安装 Go 环境之后,国内用户可参考 设置 `GOPROXY`,以便下载 YoMo 项目依赖。 + +```bash +# 确保设置了$GOPATH, Golang的设计里main和plugin是高度耦合的 +$ echo $GOPATH + +``` + +如果没有设置`$GOPATH`,参考这里:[如何设置$GOPATH和$GOBIN](#optional-set-gopath-and-gobin)。 + +```bash +$ GO111MODULE=off go get github.com/yomorun/yomo + +$ cd $GOPATH/src/github.com/yomorun/yomo + +$ make install +``` + +![YoMo Tutorial 1](https://yomo.run/tutorial-1.png) + +### 2. 运行 `flow` + +```bash +$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/flow + +$ yomo run + +2021/03/01 19:01:48 Building the Serverless Function File... +2021/03/01 19:01:49 ✅ Listening on 0.0.0.0:4242 + +``` + +### 3. 运行 `zipper` + +```bash +$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/zipper + +$ yomo wf run + +2021/03/01 19:05:55 Found 1 flows in zipper config +2021/03/01 19:05:55 Flow 1: training on localhost:4242 +2021/03/01 19:05:55 Found 0 sinks in zipper config +2021/03/01 19:05:55 Running YoMo workflow... +2021/03/01 19:05:55 ✅ Listening on 0.0.0.0:9999 + +``` + +### 3. 运行 `source-data-a` + +```bash +$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/source-data-a + +$ go run main.go + +2021/03/01 17:35:04 ✅ Connected to yomo-zipper localhost:9999 +2021/03/01 17:35:05 ✅ Emit 123.41881 to yomo-zipper + +``` + +### 4. 运行 `source-data-b` + +```bash +$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/source-data-b + +$ go run main.go + +2021/03/01 17:35:04 ✅ Connected to yomo-zipper localhost:9999 +2021/03/01 17:35:05 ✅ Emit 123.41881 to yomo-zipper + +``` + +### 5. 观察 `flow` 窗口会有持续不断的数据 + +```bash +[StdOut]: ⚡️ Sum(data A: 89.820206, data B: 1651.740967) => Result: 1741.561157 +[StdOut]: ⚡️ Sum(data A: 17.577374, data B: 619.293457) => Result: 636.870850 +[StdOut]: ⚡️ Sum(data A: 114.736366, data B: 964.614075) => Result: 1079.350464 +``` + +这时候,尝试不断的`Ctrl-C`掉`source-data-a`,过一会再启动它,看看`flow`的窗口会有什么变化 + +### 6. 恭喜您!问题以前所未有的简单的方式解决啦!🚀 + +更多[使用案例](https://github.com/yomorun/yomo) diff --git a/example/trainingmodel/flow/app.go b/example/trainingmodel/flow/app.go index 332e977..46a459c 100644 --- a/example/trainingmodel/flow/app.go +++ b/example/trainingmodel/flow/app.go @@ -4,38 +4,25 @@ import ( "context" "fmt" - y3 "github.com/yomorun/y3-codec-golang" + "github.com/yomorun/y3-codec-golang" "github.com/yomorun/yomo/pkg/rx" ) -const DataAKey = 0x11 -const DataBKey = 0x12 - -var callback = func(v []byte) (interface{}, error) { +var convert = func(v []byte) (interface{}, error) { return y3.ToFloat32(v) } -var printera = func(_ context.Context, i interface{}) (interface{}, error) { - value := i.(float32) - fmt.Println(fmt.Sprintf("[%s]> value: %f", "data-a", value)) - return i, nil -} - -var printerb = func(_ context.Context, i interface{}) (interface{}, error) { - value := i.(float32) - fmt.Println(fmt.Sprintf("[%s]> value: %f", "data-b", value)) - return i, nil -} - var zipper = func(_ context.Context, ia interface{}, ib interface{}) (interface{}, error) { - - return fmt.Sprintf("⚡️ Zip [%s],[%s] -> Value: %f, %f", "dataA", "dataB", ia.(float32), ib.(float32)), nil + result := ia.(float32) + ib.(float32) + return fmt.Sprintf("⚡️ Sum(%s: %f, %s: %f) => Result: %f", "data A", ia.(float32), "data B", ib.(float32), result), nil } +// Handler handle two event streams and calculate sum when data arrived func Handler(rxstream rx.RxStream) rx.RxStream { - streamA := rxstream.Subscribe(DataAKey).OnObserve(callback).Map(printera) - streamB := rxstream.Subscribe(DataBKey).OnObserve(callback).Map(printerb) + streamA := rxstream.Subscribe(0x11).OnObserve(convert) + streamB := rxstream.Subscribe(0x12).OnObserve(convert) - stream := streamA.ZipFromIterable(streamB, zipper).StdOut().Encode(0x10) + // Rx Zip operator: http://reactivex.io/documentation/operators/zip.html + stream := streamA.ZipFromIterable(streamB, zipper).StdOut().Encode(0x13) return stream } -- GitLab