diff --git a/example/flow/app.go b/example/flow/app.go index c038379dc38a564be58b5ae3daae47381dd7b845..16f591ba4df18d24e32e24c798aa1811c9472fc7 100644 --- a/example/flow/app.go +++ b/example/flow/app.go @@ -10,9 +10,18 @@ import ( "github.com/yomorun/yomo/pkg/rx" ) -// NoiseDataKey represents the Tag of a Y3 encoded data packet +// NoiseDataKey represents the Tag of a Y3 encoded data packet. const NoiseDataKey = 0x10 +// ThresholdSingleValue is the threshold of a single value. +const ThresholdSingleValue = 60 + +// ThresholdAverageValue is the threshold of the average value after a sliding window. +const ThresholdAverageValue = 55 + +// SlidingWindowSeconds is the time in seconds of the sliding window. +const SlidingWindowSeconds = 30 + // NoiseData represents the structure of data type NoiseData struct { Noise float32 `y3:"0x11"` @@ -24,6 +33,11 @@ var printer = func(_ context.Context, i interface{}) (interface{}, error) { value := i.(NoiseData) rightNow := time.Now().UnixNano() / int64(time.Millisecond) fmt.Println(fmt.Sprintf("[%s] %d > value: %f ⚡️=%dms", value.From, value.Time, value.Noise, rightNow-value.Time)) + + if value.Noise >= ThresholdSingleValue { + fmt.Println(fmt.Sprintf("❗ value: %f reaches the threshold %d!", value.Noise, ThresholdSingleValue)) + } + return value.Noise, nil } @@ -37,13 +51,29 @@ var callback = func(v []byte) (interface{}, error) { return mold, nil } +var slidingWindowHandler = func(i interface{}) error { + values, ok := i.([]interface{}) + if ok { + var total float32 = 0 + for _, value := range values { + total += value.(float32) + } + avg := total / float32(len(values)) + if avg >= ThresholdAverageValue { + fmt.Println(fmt.Sprintf("❗ average value in last %d seconds: %f reaches the threshold %d!", SlidingWindowSeconds, avg, ThresholdAverageValue)) + } + } + return nil +} + // Handler will handle data in Rx way func Handler(rxstream rx.RxStream) rx.RxStream { stream := rxstream. Subscribe(NoiseDataKey). OnObserve(callback). - Debounce(rxgo.WithDuration(50 * time.Millisecond)). + Debounce(rxgo.WithDuration(50*time.Millisecond)). Map(printer). + SlidingWindowWithTime(SlidingWindowSeconds*time.Second, 1*time.Second, slidingWindowHandler). StdOut(). Encode(0x11) diff --git a/pkg/rx/rxstream.go b/pkg/rx/rxstream.go index d672010c5ef96abb6460109eb5f35c78e0aec861..9d9b37c4a3e5fdf891b61111e591f648d986dec0 100644 --- a/pkg/rx/rxstream.go +++ b/pkg/rx/rxstream.go @@ -92,4 +92,12 @@ type RxStream interface { WindowWithTime(timespan rxgo.Duration, opts ...rxgo.Option) RxStream WindowWithTimeOrCount(timespan rxgo.Duration, count int, opts ...rxgo.Option) RxStream ZipFromIterable(iterable rxgo.Iterable, zipper rxgo.Func2, opts ...rxgo.Option) RxStream + + // SlidingWindowWithCount buffers the data in the specified sliding window size, the buffered data can be processed in the handler func. + // It returns the orginal data to RxStream, not the buffered slice. + SlidingWindowWithCount(windowSize int, slideSize int, handler Handler, opts ...rxgo.Option) RxStream + + // SlidingWindowWithTime buffers the data in the specified sliding window time, the buffered data can be processed in the handler func. + // It returns the orginal data to RxStream, not the buffered slice. + SlidingWindowWithTime(windowTimespan time.Duration, slideTimespan time.Duration, handler Handler, opts ...rxgo.Option) RxStream } diff --git a/pkg/rx/rxstream_operator.go b/pkg/rx/rxstream_operator.go index 3b59c71e713adc70cb9f665d43b98d08ba7d4353..4892fa11cc83e05a42fd8b22f54b3b3d229f8118 100644 --- a/pkg/rx/rxstream_operator.go +++ b/pkg/rx/rxstream_operator.go @@ -3,8 +3,10 @@ package rx import ( "bytes" "context" + "errors" "fmt" "io" + "sync" "time" "github.com/cenkalti/backoff/v4" @@ -691,6 +693,173 @@ func (s *RxStreamImpl) Encode(key byte, opts ...rxgo.Option) RxStream { return CreateObservable(f, opts...) } +// SlidingWindowWithCount buffers the data in the specified sliding window size, the buffered data can be processed in the handler func. +// It returns the orginal data to RxStream, not the buffered slice. +func (s *RxStreamImpl) SlidingWindowWithCount(windowSize int, slideSize int, handler Handler, opts ...rxgo.Option) RxStream { + if windowSize <= 0 { + return s.thrown(errors.New("windowSize must be positive")) + } + if slideSize <= 0 { + return s.thrown(errors.New("slideSize must be positive")) + } + + f := func(ctx context.Context, next chan rxgo.Item) { + defer close(next) + observe := s.Observe() + + windowCount := 0 + currentSlideCount := 0 + buf := make([]interface{}, windowSize) + firstTimeSend := true + mutex := sync.Mutex{} + + for { + select { + case <-ctx.Done(): + return + case item, ok := <-observe: + if !ok { + return + } + if item.Error() { + return + } + + mutex.Lock() + if windowCount < windowSize { + buf[windowCount] = item.V + windowCount++ + } + + if windowCount == windowSize { + // start sliding + currentSlideCount++ + + // append slide item to buffer + if !firstTimeSend { + buf = append(buf[1:windowSize], item.V) + } + + // reach slide size + if currentSlideCount%slideSize == 0 { + err := handler(buf) + firstTimeSend = false + if err != nil { + rxgo.Error(err).SendContext(ctx, next) + return + } + } + } + mutex.Unlock() + // immediately send the original item to downstream + Of(item.V).SendContext(ctx, next) + } + } + } + return CreateObservable(f, opts...) +} + +// SlidingWindowWithTime buffers the data in the specified sliding window time, the buffered data can be processed in the handler func. +// It returns the orginal data to RxStream, not the buffered slice. +func (s *RxStreamImpl) SlidingWindowWithTime(windowTimespan time.Duration, slideTimespan time.Duration, handler Handler, opts ...rxgo.Option) RxStream { + f := func(ctx context.Context, next chan rxgo.Item) { + observe := s.Observe() + buf := make([]slidingWithTimeItem, 0) + stop := make(chan struct{}) + firstTimeSend := true + mutex := sync.Mutex{} + + checkBuffer := func() { + mutex.Lock() + // filter items by item + updatedBuf := make([]slidingWithTimeItem, 0) + availableItems := make([]interface{}, 0) + t := time.Now().Add(-windowTimespan) + for _, item := range buf { + if item.timestamp.After(t) || item.timestamp.Equal(t) { + updatedBuf = append(updatedBuf, item) + availableItems = append(availableItems, item.data) + } + } + buf = updatedBuf + + // apply and send items + if len(availableItems) != 0 { + err := handler(availableItems) + if err != nil { + rxgo.Error(err).SendContext(ctx, next) + return + } + } + firstTimeSend = false + mutex.Unlock() + } + + go func() { + defer close(next) + for { + select { + case <-stop: + checkBuffer() + return + case <-ctx.Done(): + return + case <-time.After(windowTimespan): + if firstTimeSend { + checkBuffer() + } + case <-time.After(slideTimespan): + checkBuffer() + } + } + }() + + for { + select { + case <-ctx.Done(): + close(stop) + return + case item, ok := <-observe: + if !ok { + close(stop) + return + } + if item.Error() { + item.SendContext(ctx, next) + close(stop) + return + } else { + mutex.Lock() + // buffer data + buf = append(buf, slidingWithTimeItem{ + timestamp: time.Now(), + data: item.V, + }) + mutex.Unlock() + } + // immediately send the original item to downstream + Of(item.V).SendContext(ctx, next) + } + } + } + return CreateObservable(f, opts...) +} + +type slidingWithTimeItem struct { + timestamp time.Time + data interface{} +} + +// Handler defines a function that handle the input value. +type Handler func(interface{}) error + +func (s *RxStreamImpl) thrown(err error) RxStream { + next := make(chan rxgo.Item, 1) + next <- rxgo.Error(err) + defer close(next) + return &RxStreamImpl{observable: rxgo.FromChannel(next)} +} + func CreateObservable(f func(ctx context.Context, next chan rxgo.Item), opts ...rxgo.Option) RxStream { next := make(chan rxgo.Item) ctx := context.Background()