未验证 提交 2e358eeb 编写于 作者: weixin_53053927's avatar weixin_53053927 提交者: GitHub

feat(rx): Add SlidingWindow (#149)

上级 46d38d40
......@@ -10,9 +10,18 @@ import (
"github.com/yomorun/yomo/pkg/rx"
)
// NoiseDataKey represents the Tag of a Y3 encoded data packet
// NoiseDataKey represents the Tag of a Y3 encoded data packet.
const NoiseDataKey = 0x10
// ThresholdSingleValue is the threshold of a single value.
const ThresholdSingleValue = 60
// ThresholdAverageValue is the threshold of the average value after a sliding window.
const ThresholdAverageValue = 55
// SlidingWindowSeconds is the time in seconds of the sliding window.
const SlidingWindowSeconds = 30
// NoiseData represents the structure of data
type NoiseData struct {
Noise float32 `y3:"0x11"`
......@@ -24,6 +33,11 @@ var printer = func(_ context.Context, i interface{}) (interface{}, error) {
value := i.(NoiseData)
rightNow := time.Now().UnixNano() / int64(time.Millisecond)
fmt.Println(fmt.Sprintf("[%s] %d > value: %f ⚡️=%dms", value.From, value.Time, value.Noise, rightNow-value.Time))
if value.Noise >= ThresholdSingleValue {
fmt.Println(fmt.Sprintf("❗ value: %f reaches the threshold %d!", value.Noise, ThresholdSingleValue))
}
return value.Noise, nil
}
......@@ -37,13 +51,29 @@ var callback = func(v []byte) (interface{}, error) {
return mold, nil
}
var slidingWindowHandler = func(i interface{}) error {
values, ok := i.([]interface{})
if ok {
var total float32 = 0
for _, value := range values {
total += value.(float32)
}
avg := total / float32(len(values))
if avg >= ThresholdAverageValue {
fmt.Println(fmt.Sprintf("❗ average value in last %d seconds: %f reaches the threshold %d!", SlidingWindowSeconds, avg, ThresholdAverageValue))
}
}
return nil
}
// Handler will handle data in Rx way
func Handler(rxstream rx.RxStream) rx.RxStream {
stream := rxstream.
Subscribe(NoiseDataKey).
OnObserve(callback).
Debounce(rxgo.WithDuration(50 * time.Millisecond)).
Debounce(rxgo.WithDuration(50*time.Millisecond)).
Map(printer).
SlidingWindowWithTime(SlidingWindowSeconds*time.Second, 1*time.Second, slidingWindowHandler).
StdOut().
Encode(0x11)
......
......@@ -92,4 +92,12 @@ type RxStream interface {
WindowWithTime(timespan rxgo.Duration, opts ...rxgo.Option) RxStream
WindowWithTimeOrCount(timespan rxgo.Duration, count int, opts ...rxgo.Option) RxStream
ZipFromIterable(iterable rxgo.Iterable, zipper rxgo.Func2, opts ...rxgo.Option) RxStream
// SlidingWindowWithCount buffers the data in the specified sliding window size, the buffered data can be processed in the handler func.
// It returns the orginal data to RxStream, not the buffered slice.
SlidingWindowWithCount(windowSize int, slideSize int, handler Handler, opts ...rxgo.Option) RxStream
// SlidingWindowWithTime buffers the data in the specified sliding window time, the buffered data can be processed in the handler func.
// It returns the orginal data to RxStream, not the buffered slice.
SlidingWindowWithTime(windowTimespan time.Duration, slideTimespan time.Duration, handler Handler, opts ...rxgo.Option) RxStream
}
......@@ -3,8 +3,10 @@ package rx
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
......@@ -691,6 +693,173 @@ func (s *RxStreamImpl) Encode(key byte, opts ...rxgo.Option) RxStream {
return CreateObservable(f, opts...)
}
// SlidingWindowWithCount buffers the data in the specified sliding window size, the buffered data can be processed in the handler func.
// It returns the orginal data to RxStream, not the buffered slice.
func (s *RxStreamImpl) SlidingWindowWithCount(windowSize int, slideSize int, handler Handler, opts ...rxgo.Option) RxStream {
if windowSize <= 0 {
return s.thrown(errors.New("windowSize must be positive"))
}
if slideSize <= 0 {
return s.thrown(errors.New("slideSize must be positive"))
}
f := func(ctx context.Context, next chan rxgo.Item) {
defer close(next)
observe := s.Observe()
windowCount := 0
currentSlideCount := 0
buf := make([]interface{}, windowSize)
firstTimeSend := true
mutex := sync.Mutex{}
for {
select {
case <-ctx.Done():
return
case item, ok := <-observe:
if !ok {
return
}
if item.Error() {
return
}
mutex.Lock()
if windowCount < windowSize {
buf[windowCount] = item.V
windowCount++
}
if windowCount == windowSize {
// start sliding
currentSlideCount++
// append slide item to buffer
if !firstTimeSend {
buf = append(buf[1:windowSize], item.V)
}
// reach slide size
if currentSlideCount%slideSize == 0 {
err := handler(buf)
firstTimeSend = false
if err != nil {
rxgo.Error(err).SendContext(ctx, next)
return
}
}
}
mutex.Unlock()
// immediately send the original item to downstream
Of(item.V).SendContext(ctx, next)
}
}
}
return CreateObservable(f, opts...)
}
// SlidingWindowWithTime buffers the data in the specified sliding window time, the buffered data can be processed in the handler func.
// It returns the orginal data to RxStream, not the buffered slice.
func (s *RxStreamImpl) SlidingWindowWithTime(windowTimespan time.Duration, slideTimespan time.Duration, handler Handler, opts ...rxgo.Option) RxStream {
f := func(ctx context.Context, next chan rxgo.Item) {
observe := s.Observe()
buf := make([]slidingWithTimeItem, 0)
stop := make(chan struct{})
firstTimeSend := true
mutex := sync.Mutex{}
checkBuffer := func() {
mutex.Lock()
// filter items by item
updatedBuf := make([]slidingWithTimeItem, 0)
availableItems := make([]interface{}, 0)
t := time.Now().Add(-windowTimespan)
for _, item := range buf {
if item.timestamp.After(t) || item.timestamp.Equal(t) {
updatedBuf = append(updatedBuf, item)
availableItems = append(availableItems, item.data)
}
}
buf = updatedBuf
// apply and send items
if len(availableItems) != 0 {
err := handler(availableItems)
if err != nil {
rxgo.Error(err).SendContext(ctx, next)
return
}
}
firstTimeSend = false
mutex.Unlock()
}
go func() {
defer close(next)
for {
select {
case <-stop:
checkBuffer()
return
case <-ctx.Done():
return
case <-time.After(windowTimespan):
if firstTimeSend {
checkBuffer()
}
case <-time.After(slideTimespan):
checkBuffer()
}
}
}()
for {
select {
case <-ctx.Done():
close(stop)
return
case item, ok := <-observe:
if !ok {
close(stop)
return
}
if item.Error() {
item.SendContext(ctx, next)
close(stop)
return
} else {
mutex.Lock()
// buffer data
buf = append(buf, slidingWithTimeItem{
timestamp: time.Now(),
data: item.V,
})
mutex.Unlock()
}
// immediately send the original item to downstream
Of(item.V).SendContext(ctx, next)
}
}
}
return CreateObservable(f, opts...)
}
type slidingWithTimeItem struct {
timestamp time.Time
data interface{}
}
// Handler defines a function that handle the input value.
type Handler func(interface{}) error
func (s *RxStreamImpl) thrown(err error) RxStream {
next := make(chan rxgo.Item, 1)
next <- rxgo.Error(err)
defer close(next)
return &RxStreamImpl{observable: rxgo.FromChannel(next)}
}
func CreateObservable(f func(ctx context.Context, next chan rxgo.Item), opts ...rxgo.Option) RxStream {
next := make(chan rxgo.Item)
ctx := context.Background()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册