未验证 提交 68311b20 编写于 作者: J JJW 提交者: GitHub

doc(use-case): add training model examples

* doc(trainingmodel): add readme_cn

* modify flow and add README.md
Co-authored-by: Njjwygjj <wangyaoguang@cel.la>
Co-authored-by: NC.C <fanweixiao@gmail.com>
上级 7ae59c2f
<p align="center">
<img width="200px" height="200px" src="https://yomo.run/yomo-logo.png" />
</p>
# Use Case:Combined calculation of multiple data sources
## Our customer's asked:
Our client needs to complete a calculation when there are multiple data sources generating data at high frequencies: a calculation task is performed only when all the data from all the data sources have arrived, then send computed result to the next processing session, otherwise, keeps waiting data.
Usually, our business logic code intrudes on the collection of heterogeneous data from multiple sources, multi-threading, concurrency and computation caching, which prevents us from concentrating on abstracting and describing the abstraction:
```go
var convert = func(v []byte) (interface{}, error) {
return y3.ToFloat32(v)
}
var zipper = func(_ context.Context, ia interface{}, ib interface{}) (interface{}, error) {
result := ia.(float32) + ib.(float32)
return fmt.Sprintf("⚡️ Sum(%s: %f, %s: %f) => Result: %f", "data A", ia.(float32), "data B", ib.(float32), result), nil
}
// Handler handle two event streams and calculate sum when data arrived
func Handler(rxstream rx.RxStream) rx.RxStream {
streamA := rxstream.Subscribe(0x11).OnObserve(convert)
streamB := rxstream.Subscribe(0x12).OnObserve(convert)
// Rx Zip operator: http://reactivex.io/documentation/operators/zip.html
stream := streamA.ZipFromIterable(streamB, zipper).StdOut().Encode(0x13)
return stream
}
```
## Code structure
+ `source-data-a`: Analog data source A, sending random Float32 numbers. [yomo.run/source](https://yomo.run/source)
+ `source-data-b`: Analog data source B, sending random Float32 numbers. [yomo.run/source](https://yomo.run/source)
+ `flow`: Combine simulated data source A and simulated data source B for calculation[yomo.run/flow](https://yomo.run/flow)
+ `zipper`: Setup a workflow that receives multiple sources and completes the merge calculation [yomo.run/zipper](https://yomo.run/zipper)
## Implementation
### 1. Install CLI
> **Note:** YoMo requires Go 1.15 and above, run `go version` to get the version of Go in your environment, please follow [this link](https://golang.org/doc/install) to install or upgrade if it doesn't fit the requirement.
```bash
# Ensure use $GOPATH, golang requires main and plugin highly coupled
echo $GOPATH
```
if `$GOPATH` is not set, check [Set $GOPATH and $GOBIN](#optional-set-gopath-and-gobin) first.
```bash
$ GO111MODULE=off go get github.com/yomorun/yomo
$ cd $GOPATH/src/github.com/yomorun/yomo
$ make install
```
![YoMo Tutorial 1](https://yomo.run/tutorial-1.png)
### 2. Start `flow` for streaming calculating
```bash
$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/flow
$ yomo run
2021/03/01 19:01:48 Building the Serverless Function File...
2021/03/01 19:01:49 ✅ Listening on 0.0.0.0:4242
```
### 3. Start `zipper` to orgnize stream processing workflow
```bash
$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/zipper
$ yomo wf run
2021/03/01 19:05:55 Found 1 flows in zipper config
2021/03/01 19:05:55 Flow 1: training on localhost:4242
2021/03/01 19:05:55 Found 0 sinks in zipper config
2021/03/01 19:05:55 Running YoMo workflow...
2021/03/01 19:05:55 ✅ Listening on 0.0.0.0:9999
```
### 3. Run `source-data-a`
```bash
$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/source-data-a
$ go run main.go
2021/03/01 17:35:04 ✅ Connected to yomo-zipper localhost:9999
2021/03/01 17:35:05 ✅ Emit 123.41881 to yomo-zipper
```
### 4. Run `source-data-b`
```bash
$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/source-data-b
$ go run main.go
2021/03/01 17:35:04 ✅ Connected to yomo-zipper localhost:9999
2021/03/01 17:35:05 ✅ Emit 123.41881 to yomo-zipper
```
### 5. `flow` will have a constant flow of output
```bash
[StdOut]: ⚡️ Sum(data A: 89.820206, data B: 1651.740967) => Result: 1741.561157
[StdOut]: ⚡️ Sum(data A: 17.577374, data B: 619.293457) => Result: 636.870850
[StdOut]: ⚡️ Sum(data A: 114.736366, data B: 964.614075) => Result: 1079.350464
```
At this point, try to keep `Ctrl-C` dropping `source-data-a`, start it again after a while and see what happens to the `flow` output
### 6. Congratulations!
The problem has been solved in a simpler way than ever before!
Find [More YoMo Use Cases](https://github.com/yomorun/yomo)
<p align="center">
<img width="200px" height="200px" src="https://yomo.run/yomo-logo.png" />
</p>
# YoMo应用案例:多数据源的合并计算
## 目标
当有多个高频产生数据的数据源时,我们的客户需要完成一种计算:当所有数据源的数据都到齐后,才进行一次计算任务,并将计算结果传递给下一个处理环节,否则,就一直等待。通常,我们的业务逻辑代码会侵入对多源异构数据的采集、多线程、并发和计算缓存等问题,致使我们不能专心在对业务逻辑的抽象和描述上,而借助YoMo,一切都变得简单起来,您所需要实现的,只有如下几行代码:
```go
var convert = func(v []byte) (interface{}, error) {
return y3.ToFloat32(v)
}
var zipper = func(_ context.Context, ia interface{}, ib interface{}) (interface{}, error) {
result := ia.(float32) + ib.(float32)
return fmt.Sprintf("⚡️ Sum(%s: %f, %s: %f) => Result: %f", "data A", ia.(float32), "data B", ib.(float32), result), nil
}
// Handler handle two event streams and calculate sum when data arrived
func Handler(rxstream rx.RxStream) rx.RxStream {
streamA := rxstream.Subscribe(0x11).OnObserve(convert)
streamB := rxstream.Subscribe(0x12).OnObserve(convert)
// Rx Zip operator: http://reactivex.io/documentation/operators/zip.html
stream := streamA.ZipFromIterable(streamB, zipper).StdOut().Encode(0x13)
return stream
}
```
## 代码结构
+ `source-data-a`: 模拟数据源A,发送随机 Float32 数字. [yomo.run/source](https://yomo.run/source)
+ `source-data-b`: 模拟数据源B,发送随机 Float32 数字. [yomo.run/source](https://yomo.run/source)
+ `flow`: 将模拟数据源A和模拟数据源B进行合并计算[yomo.run/flow](https://yomo.run/flow)
+ `zipper`: 设计一个workflow,接收多个source,并完成合并计算 [yomo.run/zipper](https://yomo.run/zipper)
## 实现过程
### 1. 安装CLI
> **注意:** YoMo 的运行环境要求 Go 版本为 1.15 或以上,运行 `go version` 获取当前环境的版本,如果未安装 Go 或者不符合 Go 版本要求时,请安装或者升级 Go 版本。
安装 Go 环境之后,国内用户可参考 <https://goproxy.cn/> 设置 `GOPROXY`,以便下载 YoMo 项目依赖。
```bash
# 确保设置了$GOPATH, Golang的设计里main和plugin是高度耦合的
$ echo $GOPATH
```
如果没有设置`$GOPATH`,参考这里:[如何设置$GOPATH和$GOBIN](#optional-set-gopath-and-gobin)
```bash
$ GO111MODULE=off go get github.com/yomorun/yomo
$ cd $GOPATH/src/github.com/yomorun/yomo
$ make install
```
![YoMo Tutorial 1](https://yomo.run/tutorial-1.png)
### 2. 运行 `flow`
```bash
$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/flow
$ yomo run
2021/03/01 19:01:48 Building the Serverless Function File...
2021/03/01 19:01:49 ✅ Listening on 0.0.0.0:4242
```
### 3. 运行 `zipper`
```bash
$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/zipper
$ yomo wf run
2021/03/01 19:05:55 Found 1 flows in zipper config
2021/03/01 19:05:55 Flow 1: training on localhost:4242
2021/03/01 19:05:55 Found 0 sinks in zipper config
2021/03/01 19:05:55 Running YoMo workflow...
2021/03/01 19:05:55 ✅ Listening on 0.0.0.0:9999
```
### 3. 运行 `source-data-a`
```bash
$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/source-data-a
$ go run main.go
2021/03/01 17:35:04 ✅ Connected to yomo-zipper localhost:9999
2021/03/01 17:35:05 ✅ Emit 123.41881 to yomo-zipper
```
### 4. 运行 `source-data-b`
```bash
$ cd $GOPATH/src/github.com/yomorun/yomo/example/trainingmodel/source-data-b
$ go run main.go
2021/03/01 17:35:04 ✅ Connected to yomo-zipper localhost:9999
2021/03/01 17:35:05 ✅ Emit 123.41881 to yomo-zipper
```
### 5. 观察 `flow` 窗口会有持续不断的数据
```bash
[StdOut]: ⚡️ Sum(data A: 89.820206, data B: 1651.740967) => Result: 1741.561157
[StdOut]: ⚡️ Sum(data A: 17.577374, data B: 619.293457) => Result: 636.870850
[StdOut]: ⚡️ Sum(data A: 114.736366, data B: 964.614075) => Result: 1079.350464
```
这时候,尝试不断的`Ctrl-C``source-data-a`,过一会再启动它,看看`flow`的窗口会有什么变化
### 6. 恭喜您!问题以前所未有的简单的方式解决啦!🚀
更多[使用案例](https://github.com/yomorun/yomo)
......@@ -4,38 +4,25 @@ import (
"context"
"fmt"
y3 "github.com/yomorun/y3-codec-golang"
"github.com/yomorun/y3-codec-golang"
"github.com/yomorun/yomo/pkg/rx"
)
const DataAKey = 0x11
const DataBKey = 0x12
var callback = func(v []byte) (interface{}, error) {
var convert = func(v []byte) (interface{}, error) {
return y3.ToFloat32(v)
}
var printera = func(_ context.Context, i interface{}) (interface{}, error) {
value := i.(float32)
fmt.Println(fmt.Sprintf("[%s]> value: %f", "data-a", value))
return i, nil
}
var printerb = func(_ context.Context, i interface{}) (interface{}, error) {
value := i.(float32)
fmt.Println(fmt.Sprintf("[%s]> value: %f", "data-b", value))
return i, nil
}
var zipper = func(_ context.Context, ia interface{}, ib interface{}) (interface{}, error) {
return fmt.Sprintf("⚡️ Zip [%s],[%s] -> Value: %f, %f", "dataA", "dataB", ia.(float32), ib.(float32)), nil
result := ia.(float32) + ib.(float32)
return fmt.Sprintf("⚡️ Sum(%s: %f, %s: %f) => Result: %f", "data A", ia.(float32), "data B", ib.(float32), result), nil
}
// Handler handle two event streams and calculate sum when data arrived
func Handler(rxstream rx.RxStream) rx.RxStream {
streamA := rxstream.Subscribe(DataAKey).OnObserve(callback).Map(printera)
streamB := rxstream.Subscribe(DataBKey).OnObserve(callback).Map(printerb)
streamA := rxstream.Subscribe(0x11).OnObserve(convert)
streamB := rxstream.Subscribe(0x12).OnObserve(convert)
stream := streamA.ZipFromIterable(streamB, zipper).StdOut().Encode(0x10)
// Rx Zip operator: http://reactivex.io/documentation/operators/zip.html
stream := streamA.ZipFromIterable(streamB, zipper).StdOut().Encode(0x13)
return stream
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册