未验证 提交 4ac4b684 编写于 作者: A Alessandro (Ale) Segala 提交者: GitHub

Actor reminders improvements (#6039)

Fixes race conditions for the most part (fixes #6018)

Includes improvements to perf:

- Update actor reminders data structures
- Switch data field to be json.RawMessage
- Reduce code duplication & tech debt
Signed-off-by: NItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: NArtur Souza <asouza.pro@gmail.com>
上级 735f4300
......@@ -9,7 +9,7 @@ require (
github.com/PuerkitoBio/purell v1.2.0
github.com/cenkalti/backoff/v4 v4.2.0
github.com/dapr/components-contrib v1.10.3
github.com/dapr/kit v0.0.4
github.com/dapr/kit v0.0.5-0.20230307192505-b5bafe889a81
github.com/fasthttp/router v1.4.15
github.com/ghodss/yaml v1.0.0
github.com/go-logr/logr v1.2.3
......@@ -37,7 +37,7 @@ require (
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.39.0
github.com/sony/gobreaker v0.5.0
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.2
github.com/tidwall/transform v0.0.0-20201103190739-32f242e2dbde
github.com/valyala/fasthttp v1.44.0
go.opencensus.io v0.24.0
......@@ -52,8 +52,8 @@ require (
go.temporal.io/sdk v1.20.0
go.uber.org/automaxprocs v1.5.1
go.uber.org/ratelimit v0.2.0
golang.org/x/exp v0.0.0-20230125214544-b3c2aaf6208d
golang.org/x/net v0.5.0
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2
golang.org/x/net v0.6.0
golang.org/x/sync v0.1.0
google.golang.org/genproto v0.0.0-20230124163310-31e0e69b6fc2
google.golang.org/grpc v1.52.3
......@@ -377,12 +377,12 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/oauth2 v0.4.0 // indirect
golang.org/x/sys v0.4.1-0.20230105183443-b8be2fde2a9e // indirect
golang.org/x/term v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/term v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.2.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
......
......@@ -718,8 +718,8 @@ github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuA
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
github.com/dapr/components-contrib v1.10.3 h1:9kNx++EmcOh+LeFqGD5VURBgNn2EFQX4sAsrPGKuywc=
github.com/dapr/components-contrib v1.10.3/go.mod h1:21BqAjEn5hOjg2Y8/mu3nf/hD+9TSpQ45HdjesN24mQ=
github.com/dapr/kit v0.0.4 h1:i+7TIN4crC1Mo0JFyWpIkwAE8orlliA0O6/ibvs2AaE=
github.com/dapr/kit v0.0.4/go.mod h1:RFN6r5pZzhrelB0SUr8Dha44ckRBl7t+B01X5aw8WeE=
github.com/dapr/kit v0.0.5-0.20230307192505-b5bafe889a81 h1:8vCcvFXpCH4xvbG4JuG0g9bFk0T3cgY0infitTxG7oA=
github.com/dapr/kit v0.0.5-0.20230307192505-b5bafe889a81/go.mod h1:JXPc/7O0s0ieBe+GpOUuYiyxRcgip1MQwSwCmQPYSVE=
github.com/dave/jennifer v1.4.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
......@@ -1770,8 +1770,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
......@@ -1990,8 +1991,9 @@ golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE=
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
......@@ -2006,8 +2008,8 @@ golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20200331195152-e8c3332aa8e5/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw=
golang.org/x/exp v0.0.0-20230125214544-b3c2aaf6208d h1:9Bio0JlZpJ1P4NXsK5i8Rf2MclrRzMGzJWOIkhZ5Um8=
golang.org/x/exp v0.0.0-20230125214544-b3c2aaf6208d/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 h1:Jvc7gsqn21cJHCmAWx0LiimpP18LZmUxkT5Mp7EZ1mI=
golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
......@@ -2114,8 +2116,9 @@ golang.org/x/net v0.0.0-20221012135044-0b7e1fb9d458/go.mod h1:YDH+HFinaLZZlnHAfS
golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
......@@ -2282,14 +2285,15 @@ golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.1-0.20230105183443-b8be2fde2a9e h1:Lw2b7QX5zDuEsD5ZkJNRUGEGkLuho3UAKsO25Ucv140=
golang.org/x/sys v0.4.1-0.20230105183443-b8be2fde2a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA=
golang.org/x/term v0.4.0 h1:O7UWfv5+A2qiuulQk30kVinPoMtoIPeVaKLEgLpVkvg=
golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ=
golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
......@@ -2303,8 +2307,9 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k=
golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
......
......@@ -35,9 +35,9 @@ type actor struct {
actorID string
// actorLock is the lock to maintain actor's turn-based concurrency with allowance for reentrancy if configured.
actorLock ActorLock
actorLock *ActorLock
// pendingActorCalls is the number of the current pending actor calls by turn-based concurrency.
pendingActorCalls *atomic.Int32
pendingActorCalls atomic.Int32
// When consistent hashing tables are updated, actor runtime drains actor to rebalance actors
// across actor hosts after drainOngoingCallTimeout or until all pending actor calls are completed.
......@@ -46,14 +46,13 @@ type actor struct {
lastUsedTime time.Time
// disposeLock guards disposed and disposeCh.
disposeLock *sync.RWMutex
disposeLock sync.RWMutex
// disposed is true when actor is already disposed.
disposed bool
// disposeCh is the channel to signal when all pending actor calls are completed. This channel
// is used when runtime drains actor.
disposeCh chan struct{}
once sync.Once
clock clock.Clock
}
......@@ -62,15 +61,11 @@ func newActor(actorType, actorID string, maxReentrancyDepth *int, cl clock.Clock
cl = &clock.RealClock{}
}
return &actor{
actorType: actorType,
actorID: actorID,
actorLock: NewActorLock(int32(*maxReentrancyDepth)),
pendingActorCalls: &atomic.Int32{},
disposeLock: &sync.RWMutex{},
disposeCh: nil,
disposed: false,
clock: cl,
lastUsedTime: cl.Now().UTC(),
actorType: actorType,
actorID: actorID,
actorLock: NewActorLock(int32(*maxReentrancyDepth)),
clock: cl,
lastUsedTime: cl.Now().UTC(),
}
}
......@@ -84,15 +79,23 @@ func (a *actor) isBusy() bool {
// channel creates or get new dispose channel. This channel is used for draining the actor.
func (a *actor) channel() chan struct{} {
a.once.Do(func() {
a.disposeLock.RLock()
disposeCh := a.disposeCh
a.disposeLock.RUnlock()
if disposeCh == nil {
// If disposeCh is nil, acquire a write lock and retry
// We need to retry after acquiring a write lock because another goroutine could race us
a.disposeLock.Lock()
a.disposeCh = make(chan struct{})
disposeCh = a.disposeCh
if disposeCh == nil {
disposeCh = make(chan struct{})
a.disposeCh = disposeCh
}
a.disposeLock.Unlock()
})
}
a.disposeLock.RLock()
defer a.disposeLock.RUnlock()
return a.disposeCh
return disposeCh
}
// lock holds the lock for turn-based concurrency.
......@@ -121,14 +124,12 @@ func (a *actor) lock(reentrancyID *string) error {
func (a *actor) unlock() {
pending := a.pendingActorCalls.Add(-1)
if pending == 0 {
func() {
a.disposeLock.Lock()
defer a.disposeLock.Unlock()
if !a.disposed && a.disposeCh != nil {
a.disposed = true
close(a.disposeCh)
}
}()
a.disposeLock.Lock()
if !a.disposed && a.disposeCh != nil {
a.disposed = true
close(a.disposeCh)
}
a.disposeLock.Unlock()
} else if pending < 0 {
log.Error("BUGBUG: tried to unlock actor before locking actor.")
return
......
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
// ActorHostedRequest is the request object for checking if an actor is hosted on this instance.
type ActorHostedRequest struct {
ActorID string `json:"actorId"`
ActorType string `json:"actorType"`
}
......@@ -22,19 +22,15 @@ import (
var ErrMaxStackDepthExceeded = errors.New("maximum stack depth exceeded")
type ActorLock struct {
methodLock *sync.Mutex
requestLock *sync.Mutex
methodLock sync.Mutex
requestLock sync.Mutex
activeRequest *string
stackDepth *atomic.Int32
stackDepth atomic.Int32
maxStackDepth int32
}
func NewActorLock(maxStackDepth int32) ActorLock {
return ActorLock{
methodLock: &sync.Mutex{},
requestLock: &sync.Mutex{},
activeRequest: nil,
stackDepth: &atomic.Int32{},
func NewActorLock(maxStackDepth int32) *ActorLock {
return &ActorLock{
maxStackDepth: maxStackDepth,
}
}
......
......@@ -30,7 +30,7 @@ func TestLockBaseCase(t *testing.T) {
err := lock.Lock(requestID)
assert.Nil(t, err)
assert.NoError(t, err)
if requestID == nil {
assert.Nil(t, lock.activeRequest)
} else {
......@@ -52,7 +52,7 @@ func TestLockBypassWithMatchingID(t *testing.T) {
for i := 1; i < 5; i++ {
err := lock.Lock(requestID)
assert.Nil(t, err)
assert.NoError(t, err)
assert.Equal(t, *requestID, *lock.activeRequest)
assert.Equal(t, int32(i), lock.stackDepth.Load())
}
......@@ -85,13 +85,13 @@ func TestStackDepthLimit(t *testing.T) {
err := lock.Lock(requestID)
assert.Nil(t, err)
assert.NoError(t, err)
assert.Equal(t, *requestID, *lock.activeRequest)
assert.Equal(t, int32(1), lock.stackDepth.Load())
err = lock.Lock(requestID)
assert.NotNil(t, err)
assert.Error(t, err)
assert.Equal(t, "maximum stack depth exceeded", err.Error())
}
......
此差异已折叠。
......@@ -24,6 +24,7 @@ import (
mock "github.com/stretchr/testify/mock"
"github.com/dapr/dapr/pkg/actors/reminders"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
daprt "github.com/dapr/dapr/pkg/testing"
)
......@@ -201,15 +202,15 @@ func (_m *MockActors) TransactionalStateOperation(ctx context.Context, req *Tran
}
// GetReminder provides a mock function with given fields: req
func (_m *MockActors) GetReminder(ctx context.Context, req *GetReminderRequest) (*Reminder, error) {
func (_m *MockActors) GetReminder(ctx context.Context, req *GetReminderRequest) (*reminders.Reminder, error) {
ret := _m.Called(req)
var r0 *Reminder
if rf, ok := ret.Get(0).(func(*GetReminderRequest) *Reminder); ok {
var r0 *reminders.Reminder
if rf, ok := ret.Get(0).(func(*GetReminderRequest) *reminders.Reminder); ok {
r0 = rf(req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*Reminder)
r0 = ret.Get(0).(*reminders.Reminder)
}
}
......@@ -281,7 +282,7 @@ func (f *FailingActors) TransactionalStateOperation(ctx context.Context, req *Tr
return nil
}
func (f *FailingActors) GetReminder(ctx context.Context, req *GetReminderRequest) (*Reminder, error) {
func (f *FailingActors) GetReminder(ctx context.Context, req *GetReminderRequest) (*reminders.Reminder, error) {
return nil, nil
}
......
此差异已折叠。
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
// CreateReminderRequest is the request object to create a new reminder.
type CreateReminderRequest struct {
Name string
ActorType string
ActorID string
Data interface{} `json:"data"`
DueTime string `json:"dueTime"`
Period string `json:"period"`
TTL string `json:"ttl"`
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
// CreateTimerRequest is the request object to create a new timer.
type CreateTimerRequest struct {
Name string
ActorType string
ActorID string
DueTime string `json:"dueTime"`
Period string `json:"period"`
TTL string `json:"ttl"`
Callback string `json:"callback"`
Data interface{} `json:"data"`
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
// DeleteReminderRequest is the request object for deleting a reminder.
type DeleteReminderRequest struct {
Name string
ActorType string
ActorID string
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
// DeleteStateRequest is the request object for deleting an actor state.
type DeleteStateRequest struct {
ActorID string `json:"actorId"`
ActorType string `json:"actorType"`
Key string `json:"key"`
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
// DeleteTimerRequest is a request object for deleting a timer.
type DeleteTimerRequest struct {
Name string
ActorType string
ActorID string
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
// GetReminderRequest is the request object to get an existing reminder.
type GetReminderRequest struct {
Name string
ActorType string
ActorID string
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
// GetStateRequest is the request object for getting actor state.
type GetStateRequest struct {
ActorID string `json:"actorId"`
ActorType string `json:"actorType"`
Key string `json:"key"`
}
......@@ -12,14 +12,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dapr/dapr/pkg/actors/reminders"
"github.com/dapr/dapr/pkg/config"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
"github.com/dapr/dapr/pkg/resiliency"
......@@ -100,7 +105,6 @@ func newTestActorsRuntimeWithInternalActors(internalActors map[string]InternalAc
config := NewConfig(ConfigOpts{
AppID: TestAppID,
PlacementAddresses: []string{"placement:5050"},
AppConfig: config.ApplicationConfig{},
})
a := NewActors(ActorsOpts{
StateStore: store,
......@@ -135,9 +139,7 @@ func TestInternalActorCall(t *testing.T) {
internalActors := make(map[string]InternalActor)
internalActors[testActorType] = &mockInternalActor{TestOutput: testOutput}
testActorRuntime, err := newTestActorsRuntimeWithInternalActors(internalActors)
if !assert.NoError(t, err) {
return
}
require.NoError(t, err)
req := invokev1.NewInvokeMethodRequest(testMethod).
WithActor(testActorType, testActorID).
......@@ -146,9 +148,7 @@ func TestInternalActorCall(t *testing.T) {
defer req.Close()
resp, err := testActorRuntime.callLocalActor(context.Background(), req)
if !assert.NoError(t, err) {
return
}
require.NoError(t, err)
defer resp.Close()
if assert.NoError(t, err) && assert.NotNil(t, resp) {
......@@ -160,9 +160,8 @@ func TestInternalActorCall(t *testing.T) {
// Verify the actor got all the expected inputs (which are echoed back to us)
info, err := decodeTestResponse(data)
if !assert.NoError(t, err) || !assert.NotNil(t, info) {
return
}
require.NoError(t, err)
require.NotNil(t, info)
assert.Equal(t, testActorID, info.ActorID)
assert.Equal(t, testMethod, info.MethodName)
assert.Equal(t, []byte(testInput), info.Input)
......@@ -178,38 +177,38 @@ func TestInternalActorReminder(t *testing.T) {
internalActors := make(map[string]InternalActor)
internalActors[testActorType] = ia
testActorRuntime, err := newTestActorsRuntimeWithInternalActors(internalActors)
if !assert.NoError(t, err) {
return
}
require.NoError(t, err)
testReminder := &Reminder{
ActorType: testActorType,
ActorID: "myActor",
DueTime: "2s",
Period: "2s",
Name: "reminder1",
Data: testReminderData{
SomeBytes: []byte("こんにちは!"),
SomeInt: 42,
SomeString: "Hello!",
},
}
if err = testActorRuntime.executeReminder(testReminder); !assert.NoError(t, err) {
return
}
if !assert.Len(t, ia.InvokedReminders, 1) {
return
period, _ := reminders.NewReminderPeriod("2s")
data, _ := json.Marshal(testReminderData{
SomeBytes: []byte("こんにちは!"),
SomeInt: 42,
SomeString: "Hello!",
})
testReminder := &reminders.Reminder{
ActorType: testActorType,
ActorID: "myActor",
RegisteredTime: time.Now().Add(2 * time.Second),
DueTime: "2s",
Period: period,
Name: "reminder1",
Data: data,
}
err = testActorRuntime.executeReminder(testReminder, false)
require.NoError(t, err)
require.Len(t, ia.InvokedReminders, 1)
invokedReminder := ia.InvokedReminders[0]
assert.Equal(t, testReminder.ActorID, invokedReminder.ActorID)
assert.Equal(t, testReminder.Name, invokedReminder.Name)
assert.Equal(t, testReminder.DueTime, invokedReminder.DueTime)
assert.Equal(t, testReminder.Period, invokedReminder.Period)
assert.Equal(t, testReminder.Period.String(), invokedReminder.Period)
// Reminder data gets marshaled to JSON and unmarshaled back to map[string]interface{}
var actualData testReminderData
DecodeInternalActorReminderData(invokedReminder.Data, &actualData)
assert.Equal(t, testReminder.Data, actualData)
enc, err := json.Marshal(actualData)
require.NoError(t, err)
assert.Equal(t, []byte(testReminder.Data), enc)
}
func TestInternalActorDeactivation(t *testing.T) {
......@@ -221,9 +220,7 @@ func TestInternalActorDeactivation(t *testing.T) {
internalActors := make(map[string]InternalActor)
internalActors[testActorType] = ia
testActorRuntime, err := newTestActorsRuntimeWithInternalActors(internalActors)
if !assert.NoError(t, err) {
return
}
require.NoError(t, err)
// Call the internal actor to "activate" it
req := invokev1.NewInvokeMethodRequest("Foo").WithActor(testActorType, testActorID)
......@@ -231,24 +228,23 @@ func TestInternalActorDeactivation(t *testing.T) {
var resp *invokev1.InvokeMethodResponse
resp, err = testActorRuntime.callLocalActor(context.Background(), req)
if !assert.NoError(t, err) {
return
}
require.NoError(t, err)
defer resp.Close()
assert.NoError(t, err)
// Deactivate the actor, ensuring no errors and that the correct actor ID was provided.
if err = testActorRuntime.deactivateActor(testActorType, testActorID); assert.NoError(t, err) {
if assert.Len(t, ia.DeactivationCalls, 1) {
assert.Equal(t, testActorID, ia.DeactivationCalls[0])
}
err = testActorRuntime.deactivateActor(testActorType, testActorID)
require.NoError(t, err)
if assert.Len(t, ia.DeactivationCalls, 1) {
assert.Equal(t, testActorID, ia.DeactivationCalls[0])
}
}
func decodeTestResponse(data []byte) (*invokeMethodCallInfo, error) {
info := new(invokeMethodCallInfo)
if err := DecodeInternalActorData(data, info); err != nil {
err := DecodeInternalActorData(data, info)
if err != nil {
return nil, err
}
return info, nil
......@@ -260,9 +256,7 @@ func TestInternalActorsNotCounted(t *testing.T) {
internalActors := make(map[string]InternalActor)
internalActors[InternalActorTypePrefix+"wfengine.workflow"] = &mockInternalActor{}
testActorRuntime, err := newTestActorsRuntimeWithInternalActors(internalActors)
if !assert.NoError(t, err) {
return
}
require.NoError(t, err)
actorCounts := testActorRuntime.GetActiveActorsCount(context.Background())
assert.Empty(t, actorCounts)
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
// Reminder represents a persisted reminder for a unique actor.
type Reminder struct {
ActorID string `json:"actorID,omitempty"`
ActorType string `json:"actorType,omitempty"`
Name string `json:"name,omitempty"`
Data interface{} `json:"data"`
Period string `json:"period"`
DueTime string `json:"dueTime"`
RegisteredTime string `json:"registeredTime,omitempty"`
ExpirationTime string `json:"expirationTime,omitempty"`
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
// ReminderResponse is the payload that is sent to an Actor SDK API for execution.
type ReminderResponse struct {
Data interface{} `json:"data"`
DueTime string `json:"dueTime"`
Period string `json:"period"`
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reminders
import (
"encoding/json"
"fmt"
"time"
timeutils "github.com/dapr/kit/time"
)
const daprSeparator = "||"
// Reminder represents a reminder or timer for a unique actor.
type Reminder struct {
ActorID string `json:"actorID,omitempty"`
ActorType string `json:"actorType,omitempty"`
Name string `json:"name,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
Period ReminderPeriod `json:"period,omitempty"`
RegisteredTime time.Time `json:"registeredTime,omitempty"`
DueTime string `json:"dueTime,omitempty"` // Exact input value from user
ExpirationTime time.Time `json:"expirationTime,omitempty"`
Callback string `json:"callback,omitempty"` // Used by timers only
}
// ActorKey returns the key of the actor for this reminder.
func (r Reminder) ActorKey() string {
return r.ActorType + daprSeparator + r.ActorID
}
// Key returns the key for this unique reminder.
func (r Reminder) Key() string {
return r.ActorType + daprSeparator + r.ActorID + daprSeparator + r.Name
}
// NextTick returns the time the reminder should tick again next.
func (r Reminder) NextTick() time.Time {
return r.RegisteredTime
}
// HasRepeats returns true if the reminder has repeats left.
func (r Reminder) HasRepeats() bool {
return r.Period.HasRepeats()
}
// RepeatsLeft returns the number of repeats left.
func (r Reminder) RepeatsLeft() int {
return r.Period.repeats
}
// TickExecuted should be called after a reminder has been executed.
// "done" will be true if the reminder is done, i.e. no more executions should happen.
// If the reminder is not done, call "NextTick" to get the time it should tick next.
// Note: this method is not concurrency-safe.
func (r *Reminder) TickExecuted() (done bool) {
if r.Period.repeats > 0 {
r.Period.repeats--
}
if !r.HasRepeats() {
return true
}
r.RegisteredTime = r.Period.GetFollowing(r.RegisteredTime)
return false
}
// UpdateFromTrack updates the reminder with data from the track object.
func (r *Reminder) UpdateFromTrack(track *ReminderTrack) {
if track == nil || track.LastFiredTime.IsZero() {
return
}
r.Period.repeats = track.RepetitionLeft
r.RegisteredTime = r.Period.GetFollowing(track.LastFiredTime)
}
func (r *Reminder) MarshalJSON() ([]byte, error) {
type reminderAlias Reminder
// Custom serializer that encodes times (RegisteredTime and ExpirationTime) in the RFC3339 format.
// Also adds a custom serializer for Period to omit empty strings.
// This is for backwards-compatibility and also because we don't need to store precision with less than seconds
m := struct {
RegisteredTime string `json:"registeredTime,omitempty"`
ExpirationTime string `json:"expirationTime,omitempty"`
Period string `json:"period,omitempty"`
Data *json.RawMessage `json:"data,omitempty"`
*reminderAlias
}{
reminderAlias: (*reminderAlias)(r),
}
if !r.RegisteredTime.IsZero() {
m.RegisteredTime = r.RegisteredTime.Format(time.RFC3339)
}
if !r.ExpirationTime.IsZero() {
m.ExpirationTime = r.ExpirationTime.Format(time.RFC3339)
}
m.Period = r.Period.String()
if len(r.Data) > 0 {
m.Data = &r.Data
}
return json.Marshal(m)
}
func (r *Reminder) UnmarshalJSON(data []byte) error {
type reminderAlias Reminder
*r = Reminder{
Period: NewEmptyReminderPeriod(),
}
// Parse RegisteredTime and ExpirationTime as dates in the RFC3339 format
m := &struct {
ExpirationTime string `json:"expirationTime"`
RegisteredTime string `json:"registeredTime"`
*reminderAlias
}{
reminderAlias: (*reminderAlias)(r),
}
err := json.Unmarshal(data, &m)
if err != nil {
return err
}
if m.RegisteredTime != "" {
r.RegisteredTime, err = time.Parse(time.RFC3339, m.RegisteredTime)
if err != nil {
return fmt.Errorf("failed to parse RegisteredTime: %w", err)
}
r.RegisteredTime = r.RegisteredTime.Truncate(time.Second)
}
if m.ExpirationTime != "" {
r.ExpirationTime, err = time.Parse(time.RFC3339, m.ExpirationTime)
if err != nil {
return fmt.Errorf("failed to parse ExpirationTime: %w", err)
}
r.ExpirationTime = r.ExpirationTime.Truncate(time.Second)
}
return nil
}
// String implements fmt.Stringer and is used for debugging.
func (r Reminder) String() string {
hasData := r.Data != nil
dueTime := "nil"
if !r.RegisteredTime.IsZero() {
dueTime = "'" + r.RegisteredTime.Format(time.RFC3339) + "'"
}
expirationTime := "nil"
if !r.ExpirationTime.IsZero() {
expirationTime = "'" + r.ExpirationTime.Format(time.RFC3339) + "'"
}
return fmt.Sprintf(
"name='%s' hasData=%v period='%s' dueTime=%s expirationTime=%s",
r.Key(), hasData, r.Period, dueTime, expirationTime,
)
}
// parseTimeTruncateSeconds is a wrapper around timeutils.ParseTime that truncates the time to seconds.
func parseTimeTruncateSeconds(val string, now *time.Time) (time.Time, error) {
t, err := timeutils.ParseTime(val, now)
if err != nil {
return t, err
}
t = t.Truncate(time.Second)
return t, nil
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reminders
import (
"encoding/json"
"errors"
"fmt"
"time"
timeutils "github.com/dapr/kit/time"
)
// ReminderPeriod contains the parsed period for a reminder.
type ReminderPeriod struct {
value string // Raw value as received from the user
years int
months int
days int
period time.Duration
repeats int
}
// NewReminderPeriod parses a reminder period from a string and validates it.
func NewReminderPeriod(val string) (p ReminderPeriod, err error) {
p = NewEmptyReminderPeriod()
if val != "" {
p.value = val
err = parseReminderPeriod(&p)
}
return p, err
}
// NewEmptyReminderPeriod returns an empty ReminderPeriod, which has unlimited repeats.
func NewEmptyReminderPeriod() ReminderPeriod {
return ReminderPeriod{
repeats: -1,
}
}
// HasRepeats returns true if the period will repeat.
func (p ReminderPeriod) HasRepeats() bool {
return p.repeats != 0 &&
(p.years != 0 || p.months != 0 || p.days != 0 || p.period != 0)
}
// GetNext returns the next time the periodic reminder should fire after a given time.
func (p ReminderPeriod) GetFollowing(t time.Time) time.Time {
return t.AddDate(p.years, p.months, p.days).Add(p.period)
}
// String implements fmt.Stringer. It returns the value.
func (p ReminderPeriod) String() string {
return p.value
}
func (p ReminderPeriod) MarshalJSON() ([]byte, error) {
return json.Marshal(p.value)
}
func (p *ReminderPeriod) UnmarshalJSON(data []byte) error {
if len(data) >= 2 && data[0] == '"' && data[len(data)-1] == '"' {
data = data[1 : len(data)-1]
}
*p = ReminderPeriod{
value: string(data),
repeats: -1,
}
if p.value == "" {
return nil
}
return parseReminderPeriod(p)
}
func parseReminderPeriod(p *ReminderPeriod) (err error) {
p.years, p.months, p.days, p.period, p.repeats, err = timeutils.ParseDuration(p.value)
if err != nil {
return fmt.Errorf("parse error: %w", err)
}
// Error on timers with zero repetitions
if p.repeats == 0 {
return errors.New("has zero repetitions")
}
return nil
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reminders
import (
"bytes"
"encoding/json"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestReminderPeriod(t *testing.T) {
t.Run("empty value", func(t *testing.T) {
p, err := NewReminderPeriod("")
require.NoError(t, err)
expect := ReminderPeriod{
value: "",
repeats: -1,
}
assert.Truef(t, reflect.DeepEqual(p, expect), "Got: '%#v' Expected: '%#v'", p, expect)
})
t.Run("interval in Go format", func(t *testing.T) {
p, err := NewReminderPeriod("2s")
require.NoError(t, err)
expect := ReminderPeriod{
value: "2s",
period: 2 * time.Second,
repeats: -1,
}
assert.Truef(t, reflect.DeepEqual(p, expect), "Got: '%#v' Expected: '%#v'", p, expect)
})
t.Run("interval in ISO8601 format", func(t *testing.T) {
p, err := NewReminderPeriod("P2WT1M")
require.NoError(t, err)
expect := ReminderPeriod{
value: "P2WT1M",
days: 14,
period: time.Minute,
repeats: -1,
}
assert.Truef(t, reflect.DeepEqual(p, expect), "Got: '%#v' Expected: '%#v'", p, expect)
})
t.Run("interval in ISO8601 format with repeats", func(t *testing.T) {
p, err := NewReminderPeriod("R3/P2WT1M")
require.NoError(t, err)
expect := ReminderPeriod{
value: "R3/P2WT1M",
days: 14,
period: time.Minute,
repeats: 3,
}
assert.Truef(t, reflect.DeepEqual(p, expect), "Got: '%#v' Expected: '%#v'", p, expect)
})
t.Run("repeats only", func(t *testing.T) {
p, err := NewReminderPeriod("R2")
require.NoError(t, err)
expect := ReminderPeriod{
value: "R2",
repeats: 2,
}
assert.Truef(t, reflect.DeepEqual(p, expect), "Got: '%#v' Expected: '%#v'", p, expect)
})
t.Run("invalid interval", func(t *testing.T) {
_, err := NewReminderPeriod("invalid")
require.Error(t, err)
})
t.Run("invalid with zero repeats", func(t *testing.T) {
_, err := NewReminderPeriod("R0")
require.Error(t, err)
})
}
func TestReminderPeriodJSON(t *testing.T) {
jsonEqual := func(t *testing.T, p ReminderPeriod, wantJSON string) {
// Marshal
got, err := json.Marshal(p)
require.NoError(t, err)
// Compact the JSON before checking for equality
out := &bytes.Buffer{}
err = json.Compact(out, got)
require.NoError(t, err)
assert.Equal(t, wantJSON, out.String())
// Unmarshal
dec := ReminderPeriod{}
err = json.Unmarshal(got, &dec)
require.NoError(t, err)
assert.True(t, reflect.DeepEqual(dec, p), "Got: `%#v`. Expected: `%#v`", dec, p)
}
t.Run("interval in Go format", func(t *testing.T) {
p, err := NewReminderPeriod("2s")
require.NoError(t, err)
jsonEqual(t, p, `"2s"`)
})
t.Run("interval in ISO8601 format", func(t *testing.T) {
p, err := NewReminderPeriod("P2WT1M")
require.NoError(t, err)
jsonEqual(t, p, `"P2WT1M"`)
})
t.Run("interval in ISO8601 format with repeats", func(t *testing.T) {
p, err := NewReminderPeriod("R3/P2WT1M")
require.NoError(t, err)
jsonEqual(t, p, `"R3/P2WT1M"`)
})
t.Run("empty JSON value", func(t *testing.T) {
expect := ReminderPeriod{
value: "",
repeats: -1,
}
dec := ReminderPeriod{}
err := dec.UnmarshalJSON([]byte{}) // Note this is an empty value
require.NoError(t, err)
assert.True(t, reflect.DeepEqual(dec, expect), "Got: `%#v`. Expected: `%#v`", dec, expect)
})
t.Run("empty JSON string", func(t *testing.T) {
expect := ReminderPeriod{
value: "",
repeats: -1,
}
dec := ReminderPeriod{}
err := json.Unmarshal([]byte(`""`), &dec) // Note this is an empty string
require.NoError(t, err)
assert.True(t, reflect.DeepEqual(dec, expect), "Got: `%#v`. Expected: `%#v`", dec, expect)
})
t.Run("not a JSON string", func(t *testing.T) {
dec := ReminderPeriod{}
err := json.Unmarshal([]byte("[]"), &dec)
require.Error(t, err)
})
t.Run("invalid JSON string", func(t *testing.T) {
dec := ReminderPeriod{}
err := json.Unmarshal([]byte(`"invalid"`), &dec)
require.Error(t, err)
})
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reminders
import (
"bytes"
"encoding/json"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestReminderProperties(t *testing.T) {
time1, _ := time.Parse(time.RFC3339, "2023-03-07T18:29:04Z")
r := Reminder{
ActorID: "id",
ActorType: "type",
Name: "name",
RegisteredTime: time1,
}
t.Run("ActorKey", func(t *testing.T) {
require.Equal(t, r.ActorKey(), "type||id")
})
t.Run("Key", func(t *testing.T) {
require.Equal(t, r.Key(), "type||id||name")
})
t.Run("NextTick", func(t *testing.T) {
require.Equal(t, time1, r.NextTick())
})
t.Run("without repeats", func(t *testing.T) {
require.False(t, r.HasRepeats())
require.Equal(t, 0, r.RepeatsLeft())
require.Equal(t, 0, r.Period.repeats)
require.True(t, r.TickExecuted()) // It's done, no more repeats
require.Equal(t, 0, r.Period.repeats)
})
// Update the object to add a period
var err error
r.Period, err = NewReminderPeriod("2s")
require.NoError(t, err)
t.Run("with unlimited repeats", func(t *testing.T) {
require.True(t, r.HasRepeats())
require.Equal(t, -1, r.RepeatsLeft())
require.Equal(t, -1, r.Period.repeats)
require.Equal(t, time1, r.NextTick())
// Execute the tick
require.False(t, r.TickExecuted()) // Will repeat
require.Equal(t, -1, r.Period.repeats)
require.Equal(t, time1.Add(2*time.Second), r.NextTick())
})
// Update the object to add limited repeats
r.RegisteredTime = time1
r.Period, err = NewReminderPeriod("R4/PT2S")
require.NoError(t, err)
t.Run("with limited repeats", func(t *testing.T) {
require.True(t, r.HasRepeats())
// Execute the tick 4 times
for i := 4; i > 0; i-- {
require.Equal(t, i, r.RepeatsLeft())
require.Equal(t, i, r.Period.repeats)
require.Equal(t, time1.Add(2*time.Second*time.Duration(4-i)), r.NextTick())
if i == 1 {
require.True(t, r.TickExecuted()) // Done, won't repeat
} else {
require.False(t, r.TickExecuted()) // Will repeat
}
}
})
}
func TestReminderJSON(t *testing.T) {
time1, _ := time.Parse(time.RFC3339, "2023-03-07T18:29:04Z")
time2, _ := time.Parse(time.RFC3339, "2023-02-01T11:02:01Z")
type fields struct {
ActorID string
ActorType string
Name string
Data any
Period string
RegisteredTime time.Time
DueTime string
ExpirationTime time.Time
}
tests := []struct {
name string
fields fields
want string
}{
{name: "base test", fields: fields{ActorID: "id", ActorType: "type", Name: "name"}, want: `{"actorID":"id","actorType":"type","name":"name"}`},
{name: "with data", fields: fields{ActorID: "id", ActorType: "type", Name: "name", Data: "hi"}, want: `{"data":"hi","actorID":"id","actorType":"type","name":"name"}`},
{name: "with period", fields: fields{ActorID: "id", ActorType: "type", Name: "name", Period: "2s"}, want: `{"period":"2s","actorID":"id","actorType":"type","name":"name"}`},
{name: "with due time", fields: fields{ActorID: "id", ActorType: "type", Name: "name", Period: "2s", DueTime: "2m", RegisteredTime: time1}, want: `{"registeredTime":"2023-03-07T18:29:04Z","period":"2s","actorID":"id","actorType":"type","name":"name","dueTime":"2m"}`},
{name: "with expiration time", fields: fields{ActorID: "id", ActorType: "type", Name: "name", Period: "2s", ExpirationTime: time2}, want: `{"expirationTime":"2023-02-01T11:02:01Z","period":"2s","actorID":"id","actorType":"type","name":"name"}`},
{name: "with data as JSON object", fields: fields{ActorID: "id", ActorType: "type", Name: "name", Data: json.RawMessage(`{ "foo": [ 12, 4 ] } `)}, want: `{"data":{"foo":[12,4]},"actorID":"id","actorType":"type","name":"name"}`},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var err error
r := Reminder{
ActorID: tt.fields.ActorID,
ActorType: tt.fields.ActorType,
Name: tt.fields.Name,
RegisteredTime: tt.fields.RegisteredTime,
DueTime: tt.fields.DueTime,
ExpirationTime: tt.fields.ExpirationTime,
}
if tt.fields.Data != nil {
if j, ok := tt.fields.Data.(json.RawMessage); ok {
r.Data = compactJSON(t, j)
} else {
r.Data, _ = json.Marshal(tt.fields.Data)
}
}
r.Period, err = NewReminderPeriod(tt.fields.Period)
require.NoError(t, err)
// Marshal
got, err := json.Marshal(&r)
require.NoError(t, err)
// Compact the JSON before checking for equality
got = compactJSON(t, got)
assert.Equal(t, tt.want, string(got))
// Unmarshal
dec := Reminder{}
err = json.Unmarshal(got, &dec)
require.NoError(t, err)
assert.True(t, reflect.DeepEqual(dec, r), "Got: `%#v`. Expected: `%#v`", dec, r)
})
}
t.Run("slice", func(t *testing.T) {
const payload = `[{"data":{"foo":[12,4]},"actorID":"id","actorType":"type","name":"name"},{"registeredTime":"2023-03-07T18:29:04Z","period":"2s","actorID":"id","actorType":"type","name":"name","dueTime":"2m"}]`
dec := []Reminder{}
err := json.Unmarshal([]byte(payload), &dec)
require.NoError(t, err)
// Marshal
enc, err := json.Marshal(dec)
require.NoError(t, err)
require.Equal(t, payload, string(enc))
})
t.Run("failed to marshal", func(t *testing.T) {
t.Run("invalid JSON in data", func(t *testing.T) {
r := Reminder{
ActorID: "id",
ActorType: "type",
Name: "name",
Data: json.RawMessage("invalid_json"),
}
res, err := json.Marshal(r)
require.Error(t, err)
require.ErrorContains(t, err, "json: error calling MarshalJSON for type json.RawMessage")
assert.Empty(t, res)
})
})
t.Run("failed to unmarshal", func(t *testing.T) {
t.Run("cannot decode RegisteredTime", func(t *testing.T) {
const enc = `{"registeredTime":"invalid date","period":"2s","actorID":"id","actorType":"type","name":"name","dueTime":"2m"}`
err := json.Unmarshal([]byte(enc), &Reminder{})
require.Error(t, err)
require.ErrorContains(t, err, "failed to parse RegisteredTime")
})
t.Run("cannot decode ExpirationTime", func(t *testing.T) {
const enc = `{"expirationTime":"invalid date","period":"2s","actorID":"id","actorType":"type","name":"name","dueTime":"2m"}`
err := json.Unmarshal([]byte(enc), &Reminder{})
require.Error(t, err)
require.ErrorContains(t, err, "failed to parse ExpirationTime")
})
})
}
func TestReminderString(t *testing.T) {
time1, _ := time.Parse(time.RFC3339, "2023-03-07T18:29:04Z")
time2, _ := time.Parse(time.RFC3339, "2023-02-01T11:02:01Z")
type fields struct {
ActorID string
ActorType string
Name string
Data json.RawMessage
Period string
DueTime time.Time
DueTimeReq string
ExpirationTime time.Time
}
tests := []struct {
name string
fields fields
want string
}{
{name: "base test", fields: fields{ActorID: "id", ActorType: "type", Name: "name"}, want: `name='type||id||name' hasData=false period='' dueTime=nil expirationTime=nil`},
{name: "with data", fields: fields{ActorID: "id", ActorType: "type", Name: "name", Data: json.RawMessage(`"hi"`)}, want: `name='type||id||name' hasData=true period='' dueTime=nil expirationTime=nil`},
{name: "with period", fields: fields{ActorID: "id", ActorType: "type", Name: "name", Period: "2s"}, want: `name='type||id||name' hasData=false period='2s' dueTime=nil expirationTime=nil`},
{name: "with due time", fields: fields{ActorID: "id", ActorType: "type", Name: "name", Period: "2s", DueTimeReq: "2m", DueTime: time1}, want: `name='type||id||name' hasData=false period='2s' dueTime='2023-03-07T18:29:04Z' expirationTime=nil`},
{name: "with expiration time", fields: fields{ActorID: "id", ActorType: "type", Name: "name", Period: "2s", ExpirationTime: time2}, want: `name='type||id||name' hasData=false period='2s' dueTime=nil expirationTime='2023-02-01T11:02:01Z'`},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var err error
r := Reminder{
ActorID: tt.fields.ActorID,
ActorType: tt.fields.ActorType,
Name: tt.fields.Name,
RegisteredTime: tt.fields.DueTime,
DueTime: tt.fields.DueTimeReq,
ExpirationTime: tt.fields.ExpirationTime,
Data: tt.fields.Data,
}
r.Period, err = NewReminderPeriod(tt.fields.Period)
require.NoError(t, err)
// Encode to string
got := r.String()
assert.Equal(t, tt.want, got)
})
}
}
func compactJSON(t *testing.T, data []byte) []byte {
out := &bytes.Buffer{}
err := json.Compact(out, data)
require.NoError(t, err)
return out.Bytes()
}
......@@ -11,11 +11,61 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
package reminders
import (
"encoding/json"
"fmt"
"time"
)
// ReminderTrack is a persisted object that keeps track of the last time a reminder fired.
type ReminderTrack struct {
LastFiredTime string `json:"lastFiredTime"`
RepetitionLeft int `json:"repetitionLeft"`
Etag *string
LastFiredTime time.Time `json:"lastFiredTime"`
RepetitionLeft int `json:"repetitionLeft"`
Etag *string `json:",omitempty"`
}
func (r *ReminderTrack) MarshalJSON() ([]byte, error) {
type reminderTrackAlias ReminderTrack
// Custom serializer that encodes times (LastFiredTime) in the RFC3339 format, for backwards-compatibility
m := &struct {
LastFiredTime string `json:"lastFiredTime,omitempty"`
*reminderTrackAlias
}{
reminderTrackAlias: (*reminderTrackAlias)(r),
}
if !r.LastFiredTime.IsZero() {
m.LastFiredTime = r.LastFiredTime.Format(time.RFC3339)
}
return json.Marshal(m)
}
func (r *ReminderTrack) UnmarshalJSON(data []byte) error {
type reminderTrackAlias ReminderTrack
// Parse RegisteredTime and ExpirationTime as dates in the RFC3339 format
m := &struct {
LastFiredTime string `json:"lastFiredTime"`
*reminderTrackAlias
}{
reminderTrackAlias: (*reminderTrackAlias)(r),
}
err := json.Unmarshal(data, &m)
if err != nil {
return err
}
if m.LastFiredTime != "" {
r.LastFiredTime, err = time.Parse(time.RFC3339, m.LastFiredTime)
if err != nil {
return fmt.Errorf("failed to parse LastFiredTime: %w", err)
}
r.LastFiredTime = r.LastFiredTime.Truncate(time.Second)
}
return nil
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reminders
import (
"encoding/json"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dapr/kit/ptr"
)
func TestReminderTrackJSON(t *testing.T) {
time1, _ := time.Parse(time.RFC3339, "2023-03-07T18:29:04Z")
type fields struct {
LastFiredTime time.Time
RepetitionLeft int
Etag *string
}
tests := []struct {
name string
fields fields
want string
}{
{
name: "basic test",
fields: fields{LastFiredTime: time1, RepetitionLeft: 2},
want: `{"lastFiredTime":"2023-03-07T18:29:04Z","repetitionLeft":2}`,
},
{
name: "has etag",
fields: fields{LastFiredTime: time1, RepetitionLeft: 2, Etag: ptr.Of("foo")},
want: `{"lastFiredTime":"2023-03-07T18:29:04Z","repetitionLeft":2,"Etag":"foo"}`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := ReminderTrack{
LastFiredTime: tt.fields.LastFiredTime,
RepetitionLeft: tt.fields.RepetitionLeft,
Etag: tt.fields.Etag,
}
// Marshal
got, err := json.Marshal(&r)
require.NoError(t, err)
// Compact the JSON before checking for equality
got = compactJSON(t, got)
assert.Equal(t, tt.want, string(got))
// Unmarshal
dec := ReminderTrack{}
err = json.Unmarshal(got, &dec)
require.NoError(t, err)
assert.True(t, reflect.DeepEqual(dec, r), "Got: `%#v`. Expected: `%#v`", dec, r)
})
}
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reminders
import (
"bytes"
"encoding/json"
"fmt"
"time"
)
// CreateReminderRequest is the request object to create a new reminder.
type CreateReminderRequest struct {
Name string
ActorType string
ActorID string
Data json.RawMessage `json:"data"`
DueTime string `json:"dueTime"`
Period string `json:"period"`
TTL string `json:"ttl"`
}
// CreateTimerRequest is the request object to create a new timer.
type CreateTimerRequest struct {
Name string
ActorType string
ActorID string
DueTime string `json:"dueTime"`
Period string `json:"period"`
TTL string `json:"ttl"`
Callback string `json:"callback"`
Data json.RawMessage `json:"data"`
}
// NewReminderFromCreateReminderRequest returns a new Reminder from a CreateReminderRequest object.
func NewReminderFromCreateReminderRequest(req *CreateReminderRequest, now time.Time) (reminder *Reminder, err error) {
reminder = &Reminder{
ActorID: req.ActorID,
ActorType: req.ActorType,
Name: req.Name,
}
err = setReminderData(reminder, req.Data, "reminder")
if err != nil {
return nil, err
}
err = setReminderTimes(reminder, req.DueTime, req.Period, req.TTL, now, "reminder")
if err != nil {
return nil, err
}
return reminder, nil
}
// NewReminderFromCreateTimerRequest returns a new Timer from a CreateTimerRequest object.
func NewReminderFromCreateTimerRequest(req *CreateTimerRequest, now time.Time) (reminder *Reminder, err error) {
reminder = &Reminder{
ActorID: req.ActorID,
ActorType: req.ActorType,
Name: req.Name,
Callback: req.Callback,
}
err = setReminderData(reminder, req.Data, "timer")
if err != nil {
return nil, err
}
err = setReminderTimes(reminder, req.DueTime, req.Period, req.TTL, now, "timer")
if err != nil {
return nil, err
}
return reminder, nil
}
func setReminderData(reminder *Reminder, data json.RawMessage, logMsg string) error {
if len(data) == 0 {
return nil
}
// Compact the data before setting it
buf := &bytes.Buffer{}
err := json.Compact(buf, data)
if err != nil {
return fmt.Errorf("failed to compact %s data: %w", logMsg, err)
}
if buf.Len() > 0 {
reminder.Data = buf.Bytes()
}
return nil
}
func setReminderTimes(reminder *Reminder, dueTime string, period string, ttl string, now time.Time, logMsg string) (err error) {
// Due time and registered time
reminder.RegisteredTime = now
reminder.DueTime = dueTime
if dueTime != "" {
reminder.RegisteredTime, err = parseTimeTruncateSeconds(dueTime, &now)
if err != nil {
return fmt.Errorf("error parsing %s due time: %w", logMsg, err)
}
}
// Parse period and check correctness
reminder.Period, err = NewReminderPeriod(period)
if err != nil {
return fmt.Errorf("invalid %s period: %w", logMsg, err)
}
// Set expiration time if configured
if ttl != "" {
reminder.ExpirationTime, err = parseTimeTruncateSeconds(ttl, &reminder.RegisteredTime)
if err != nil {
return fmt.Errorf("error parsing %s TTL: %w", logMsg, err)
}
// check if already expired
if now.After(reminder.ExpirationTime) || reminder.RegisteredTime.After(reminder.ExpirationTime) {
return fmt.Errorf("%s %s has already expired: dueTime: %s TTL: %s",
logMsg, reminder.Key(), reminder.RegisteredTime, ttl)
}
}
return nil
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reminders
import (
"encoding/json"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestNewReminderFromCreateReminderRequest(t *testing.T) {
now := time.Now().UTC().Truncate(time.Second)
tests := []struct {
name string
req func(r *CreateReminderRequest)
wantReminder func(r *Reminder)
wantErr bool
}{
{
name: "base test",
req: func(r *CreateReminderRequest) { return },
wantReminder: func(r *Reminder) { return },
},
{
name: "with data",
req: func(r *CreateReminderRequest) {
r.Data = json.RawMessage(`"hi"`)
},
wantReminder: func(r *Reminder) {
r.Data = json.RawMessage(`"hi"`)
},
},
{
name: "with data as JSON object",
req: func(r *CreateReminderRequest) {
r.Data = json.RawMessage(`{ "foo": [ 12, 4 ] } `)
},
wantReminder: func(r *Reminder) {
// Gets compacted automatically
r.Data = json.RawMessage(`{"foo":[12,4]}`)
},
},
{
name: "with period",
req: func(r *CreateReminderRequest) {
r.Period = "2s"
},
wantReminder: func(r *Reminder) {
r.Period, _ = NewReminderPeriod("2s")
},
},
{
name: "with due time as duration",
req: func(r *CreateReminderRequest) {
r.DueTime = "2m"
},
wantReminder: func(r *Reminder) {
r.DueTime = "2m"
r.RegisteredTime = r.RegisteredTime.Add(2 * time.Minute)
},
},
{
name: "with due time as absolute",
req: func(r *CreateReminderRequest) {
r.DueTime = now.Add(10 * time.Minute).Format(time.RFC3339)
},
wantReminder: func(r *Reminder) {
r.DueTime = now.Add(10 * time.Minute).Format(time.RFC3339)
r.RegisteredTime = now.Add(10 * time.Minute)
},
},
{
name: "with TTL as duration",
req: func(r *CreateReminderRequest) {
r.TTL = "10m"
},
wantReminder: func(r *Reminder) {
r.ExpirationTime = now.Add(10 * time.Minute)
},
},
{
name: "with TTL as absolute",
req: func(r *CreateReminderRequest) {
r.TTL = now.Add(10 * time.Minute).Format(time.RFC3339)
},
wantReminder: func(r *Reminder) {
r.ExpirationTime = now.Add(10 * time.Minute)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Base request and wantReminder, to keep it DRY
req := &CreateReminderRequest{
ActorID: "id",
ActorType: "type",
Name: "name",
}
tt.req(req)
wantReminder := &Reminder{
ActorID: "id",
ActorType: "type",
Name: "name",
Period: NewEmptyReminderPeriod(),
RegisteredTime: now,
}
tt.wantReminder(wantReminder)
// Run tests
gotReminder, err := NewReminderFromCreateReminderRequest(req, now)
if (err != nil) != tt.wantErr {
t.Errorf("NewReminderFromCreateReminderRequest() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotReminder, wantReminder) {
t.Errorf("NewReminderFromCreateReminderRequest() = %#v, want %#v", gotReminder, wantReminder)
}
})
}
t.Run("TTL in the past", func(t *testing.T) {
req := &CreateReminderRequest{
ActorID: "id",
ActorType: "type",
Name: "name",
TTL: "2002-02-02T12:00:02Z", // In the past
}
_, err := NewReminderFromCreateReminderRequest(req, now)
require.Error(t, err)
require.ErrorContains(t, err, "has already expired")
})
}
func TestNewReminderFromCreateTimerRequest(t *testing.T) {
now := time.Now().UTC().Truncate(time.Second)
tests := []struct {
name string
req func(r *CreateTimerRequest)
wantReminder func(r *Reminder)
wantErr bool
}{
{
name: "base test",
req: func(r *CreateTimerRequest) { return },
wantReminder: func(r *Reminder) { return },
},
{
name: "with data",
req: func(r *CreateTimerRequest) {
r.Data = json.RawMessage(`"hi"`)
},
wantReminder: func(r *Reminder) {
r.Data = json.RawMessage(`"hi"`)
},
},
{
name: "with data as JSON object",
req: func(r *CreateTimerRequest) {
r.Data = json.RawMessage(`{ "foo": [ 12, 4 ] } `)
},
wantReminder: func(r *Reminder) {
// Gets compacted automatically
r.Data = json.RawMessage(`{"foo":[12,4]}`)
},
},
{
name: "with period",
req: func(r *CreateTimerRequest) {
r.Period = "2s"
},
wantReminder: func(r *Reminder) {
r.Period, _ = NewReminderPeriod("2s")
},
},
{
name: "with due time as duration",
req: func(r *CreateTimerRequest) {
r.DueTime = "2m"
},
wantReminder: func(r *Reminder) {
r.DueTime = "2m"
r.RegisteredTime = r.RegisteredTime.Add(2 * time.Minute)
},
},
{
name: "with due time as absolute",
req: func(r *CreateTimerRequest) {
r.DueTime = now.Add(10 * time.Minute).Format(time.RFC3339)
},
wantReminder: func(r *Reminder) {
r.DueTime = now.Add(10 * time.Minute).Format(time.RFC3339)
r.RegisteredTime = now.Add(10 * time.Minute)
},
},
{
name: "with TTL as duration",
req: func(r *CreateTimerRequest) {
r.TTL = "10m"
},
wantReminder: func(r *Reminder) {
r.ExpirationTime = now.Add(10 * time.Minute)
},
},
{
name: "with TTL as absolute",
req: func(r *CreateTimerRequest) {
r.TTL = now.Add(10 * time.Minute).Format(time.RFC3339)
},
wantReminder: func(r *Reminder) {
r.ExpirationTime = now.Add(10 * time.Minute)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Base request and wantReminder, to keep it DRY
req := &CreateTimerRequest{
ActorID: "id",
ActorType: "type",
Name: "name",
}
tt.req(req)
wantReminder := &Reminder{
ActorID: "id",
ActorType: "type",
Name: "name",
Period: NewEmptyReminderPeriod(),
RegisteredTime: now,
}
tt.wantReminder(wantReminder)
// Run tests
gotReminder, err := NewReminderFromCreateTimerRequest(req, now)
if (err != nil) != tt.wantErr {
t.Errorf("NewReminderFromCreateTimerRequest() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotReminder, wantReminder) {
t.Errorf("NewReminderFromCreateTimerRequest() = %#v, want %#v", gotReminder, wantReminder)
}
})
}
t.Run("TTL in the past", func(t *testing.T) {
req := &CreateTimerRequest{
ActorID: "id",
ActorType: "type",
Name: "name",
TTL: "2002-02-02T12:00:02Z", // In the past
}
_, err := NewReminderFromCreateTimerRequest(req, now)
require.Error(t, err)
require.ErrorContains(t, err, "has already expired")
})
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
// RenameReminderRequest is the request object for rename a reminder.
type RenameReminderRequest struct {
OldName string
ActorType string
ActorID string
NewName string
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
import (
"github.com/dapr/dapr/pkg/actors/reminders"
)
// ActorHostedRequest is the request object for checking if an actor is hosted on this instance.
type ActorHostedRequest struct {
ActorID string `json:"actorId"`
ActorType string `json:"actorType"`
}
// CreateReminderRequest is the request object to create a new reminder.
type CreateReminderRequest = reminders.CreateReminderRequest
// CreateTimerRequest is the request object to create a new timer.
type CreateTimerRequest = reminders.CreateTimerRequest
// DeleteReminderRequest is the request object for deleting a reminder.
type DeleteReminderRequest struct {
Name string
ActorType string
ActorID string
}
// DeleteStateRequest is the request object for deleting an actor state.
type DeleteStateRequest struct {
ActorID string `json:"actorId"`
ActorType string `json:"actorType"`
Key string `json:"key"`
}
// DeleteTimerRequest is a request object for deleting a timer.
type DeleteTimerRequest struct {
Name string
ActorType string
ActorID string
}
// GetReminderRequest is the request object to get an existing reminder.
type GetReminderRequest struct {
Name string
ActorType string
ActorID string
}
// GetStateRequest is the request object for getting actor state.
type GetStateRequest struct {
ActorID string `json:"actorId"`
ActorType string `json:"actorType"`
Key string `json:"key"`
}
// ActorKey returns the key of the actor for this request.
func (r GetStateRequest) ActorKey() string {
return r.ActorType + daprSeparator + r.ActorID
}
// ReminderResponse is the payload that is sent to an Actor SDK API for execution.
type ReminderResponse struct {
Data any `json:"data"`
DueTime string `json:"dueTime"`
Period string `json:"period"`
}
// RenameReminderRequest is the request object for rename a reminder.
type RenameReminderRequest struct {
OldName string
ActorType string
ActorID string
NewName string
}
// SaveStateRequest is the request object for saving an actor state.
type SaveStateRequest struct {
ActorID string `json:"actorId"`
ActorType string `json:"actorType"`
Key string `json:"key"`
Value any `json:"value"`
}
// StateResponse is the response returned from getting an actor state.
type StateResponse struct {
Data []byte `json:"data"`
}
// TimerResponse is the response object send to an Actor SDK API when a timer fires.
type TimerResponse struct {
Callback string `json:"callback"`
Data any `json:"data"`
DueTime string `json:"dueTime"`
Period string `json:"period"`
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
// SaveStateRequest is the request object for saving an actor state.
type SaveStateRequest struct {
ActorID string `json:"actorId"`
ActorType string `json:"actorType"`
Key string `json:"key"`
Value interface{} `json:"value"`
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
// StateResponse is the response returned from getting an actor state.
type StateResponse struct {
Data []byte `json:"data"`
}
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package actors
// TimerResponse is the response object send to an Actor SDK API when a timer fires.
type TimerResponse struct {
Callback string `json:"callback"`
Data interface{} `json:"data"`
DueTime string `json:"dueTime"`
Period string `json:"period"`
}
......@@ -50,6 +50,7 @@ import (
"github.com/dapr/components-contrib/secretstores"
"github.com/dapr/components-contrib/state"
"github.com/dapr/dapr/pkg/actors"
"github.com/dapr/dapr/pkg/actors/reminders"
componentsV1alpha1 "github.com/dapr/dapr/pkg/apis/components/v1alpha1"
"github.com/dapr/dapr/pkg/apis/resiliency/v1alpha1"
"github.com/dapr/dapr/pkg/channel/http"
......@@ -1822,7 +1823,7 @@ func TestV1ActorEndpoints(t *testing.T) {
Name: "reminder1",
ActorType: "fakeActorType",
ActorID: "fakeActorID",
Data: nil,
Data: json.RawMessage("null"),
DueTime: "0h0m3s0ms",
Period: "0h0m7s0ms",
}
......@@ -1852,7 +1853,7 @@ func TestV1ActorEndpoints(t *testing.T) {
Name: "reminder1",
ActorType: "fakeActorType",
ActorID: "fakeActorID",
Data: nil,
Data: json.RawMessage("null"),
DueTime: "0h0m3s0ms",
Period: "0h0m7s0ms",
}
......@@ -2026,9 +2027,9 @@ func TestV1ActorEndpoints(t *testing.T) {
ActorID: "fakeActorID",
}
reminderResponse := actors.Reminder{
// Functions are not JSON encodable. This will force the error condition
Data: func() {},
reminderResponse := reminders.Reminder{
// This is not valid JSON
Data: json.RawMessage(`foo`),
}
mockActors := new(actors.MockActors)
......@@ -2053,7 +2054,7 @@ func TestV1ActorEndpoints(t *testing.T) {
Name: "timer1",
ActorType: "fakeActorType",
ActorID: "fakeActorID",
Data: nil,
Data: json.RawMessage("null"),
DueTime: "0h0m3s0ms",
Period: "0h0m7s0ms",
Callback: "",
......@@ -2084,7 +2085,7 @@ func TestV1ActorEndpoints(t *testing.T) {
Name: "timer1",
ActorType: "fakeActorType",
ActorID: "fakeActorID",
Data: nil,
Data: json.RawMessage("null"),
DueTime: "0h0m3s0ms",
Period: "0h0m7s0ms",
}
......
......@@ -287,10 +287,14 @@ func getActivityInvocationKey(generation uint64) string {
func (a *activityActor) createReliableReminder(ctx context.Context, actorID string, data any) error {
const reminderName = "run-activity"
wfLogger.Debugf("%s: creating '%s' reminder for immediate execution", actorID, reminderName)
dataEnc, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("failed to encode data as JSON: %w", err)
}
return a.actorRuntime.CreateReminder(ctx, &actors.CreateReminderRequest{
ActorType: ActivityActorType,
ActorID: actorID,
Data: data,
Data: dataEnc,
DueTime: "0s",
Name: reminderName,
Period: a.reminderInterval.String(),
......
......@@ -16,6 +16,7 @@ package wfengine
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
......@@ -448,10 +449,14 @@ func (wf *workflowActor) createReliableReminder(ctx context.Context, actorID str
// Reminders need to have unique names or else they may not fire in certain race conditions.
reminderName := fmt.Sprintf("%s-%s", namePrefix, uuid.NewString()[:8])
wfLogger.Debugf("%s: creating '%s' reminder with DueTime = %s", actorID, reminderName, delay)
dataEnc, err := json.Marshal(data)
if err != nil {
return reminderName, fmt.Errorf("failed to encode data as JSON: %w", err)
}
return reminderName, wf.actors.CreateReminder(ctx, &actors.CreateReminderRequest{
ActorType: WorkflowActorType,
ActorID: actorID,
Data: data,
Data: dataEnc,
DueTime: delay.String(),
Name: reminderName,
Period: wf.reminderInterval.String(),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册