未验证 提交 99f3db72 编写于 作者: A Alessandro (Ale) Segala 提交者: GitHub

Fixed goroutine leak in reminders and timers (#6523) (#6554)

* Fixed goroutine leak in reminders and timers



* Added unit tests + some more tweaks



* Fixed last goroutine leaks



* Comments



---------
Signed-off-by: NItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: NArtur Souza <asouza.pro@gmail.com>
Co-authored-by: NDapr Bot <56698301+dapr-bot@users.noreply.github.com>
上级 e00f51dd
......@@ -906,11 +906,11 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel
nextTimer = a.clock.NewTimer(reminder.NextTick().Sub(a.clock.Now()))
defer func() {
if nextTimer.Stop() {
if nextTimer != nil && !nextTimer.Stop() {
<-nextTimer.C()
}
if ttlTimer != nil && ttlTimer.Stop() {
<-ttlTimerC
if ttlTimer != nil && !ttlTimer.Stop() {
<-ttlTimer.C()
}
}()
......@@ -922,6 +922,7 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel
case <-ttlTimerC:
// proceed with reminder deletion
log.Infof("Reminder %s with parameters: dueTime: %s, period: %s has expired", reminderKey, reminder.DueTime, reminder.Period)
ttlTimer = nil
break L
case <-stopChannel:
// reminder has been already deleted
......@@ -932,12 +933,14 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel
_, exists := a.activeReminders.Load(reminderKey)
if !exists {
log.Error("Could not find active reminder with key: " + reminderKey)
nextTimer = nil
return
}
// if all repetitions are completed, proceed with reminder deletion
// If all repetitions are completed, delete the reminder and do not execute it
if reminder.RepeatsLeft() == 0 {
log.Info("Reminder " + reminderKey + " has been completed")
nextTimer = nil
break L
}
......@@ -945,7 +948,8 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel
if err != nil {
if errors.Is(err, ErrReminderCanceled) {
// The handler is explicitly canceling the timer
log.Info("Reminder " + reminderKey + " was canceled by the actor")
log.Debug("Reminder " + reminderKey + " was canceled by the actor")
nextTimer = nil
break L
} else {
log.Errorf("Error while executing reminder %s: %v", reminderKey, err)
......@@ -966,16 +970,15 @@ func (a *actorsRuntime) startReminder(reminder *reminders.Reminder, stopChannel
}
} else {
log.Error("Could not find active reminder with key: " + reminderKey)
nextTimer = nil
return
}
if reminder.TickExecuted() {
nextTimer = nil
break L
}
if nextTimer.Stop() {
<-nextTimer.C()
}
nextTimer.Reset(reminder.NextTick().Sub(a.clock.Now()))
}
......@@ -1269,11 +1272,11 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest
nextTimer = a.clock.NewTimer(reminder.NextTick().Sub(a.clock.Now()))
defer func() {
if nextTimer.Stop() {
if nextTimer != nil && !nextTimer.Stop() {
<-nextTimer.C()
}
if ttlTimer != nil && ttlTimer.Stop() {
<-ttlTimerC
if ttlTimer != nil && !ttlTimer.Stop() {
<-ttlTimer.C()
}
}()
......@@ -1285,6 +1288,7 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest
case <-ttlTimerC:
// timer has expired; proceed with deletion
log.Infof("Timer %s with parameters: dueTime: %s, period: %s, TTL: %s has expired", timerKey, req.DueTime, req.Period, req.TTL)
ttlTimer = nil
break L
case <-stop:
// timer has been already deleted
......@@ -1299,17 +1303,16 @@ func (a *actorsRuntime) CreateTimer(ctx context.Context, req *CreateTimerRequest
}
} else {
log.Errorf("Could not find active timer %s", timerKey)
nextTimer = nil
return
}
if reminder.TickExecuted() {
log.Infof("Timer %s has been completed", timerKey)
nextTimer = nil
break L
}
if nextTimer.Stop() {
<-nextTimer.C()
}
nextTimer.Reset(reminder.NextTick().Sub(a.clock.Now()))
}
......
......@@ -18,6 +18,7 @@ import (
"encoding/json"
"errors"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
......@@ -3078,3 +3079,80 @@ func TestPlacementSwitchIsNotTurnedOn(t *testing.T) {
assert.Nil(t, testActorsRuntime.store)
})
}
func TestCreateTimerReminderGoroutineLeak(t *testing.T) {
testActorsRuntime := newTestActorsRuntime()
defer testActorsRuntime.Stop()
actorType, actorID := getTestActorTypeAndID()
fakeCallAndActivateActor(testActorsRuntime, actorType, actorID, testActorsRuntime.clock)
testFn := func(createFn func(i int, ttl bool) error) func(t *testing.T) {
return func(t *testing.T) {
// Get the baseline goroutines
initialCount := runtime.NumGoroutine()
// Create 10 timers/reminders with unique names
for i := 0; i < 10; i++ {
require.NoError(t, createFn(i, false))
}
// Create 5 timers/reminders that override the first ones
for i := 0; i < 5; i++ {
require.NoError(t, createFn(i, false))
}
// Create 5 timers/reminders that have TTLs
for i := 10; i < 15; i++ {
require.NoError(t, createFn(i, true))
}
// Advance the clock to make the timers/reminders fire
time.Sleep(200 * time.Millisecond)
testActorsRuntime.clock.Sleep(5 * time.Second)
time.Sleep(200 * time.Millisecond)
testActorsRuntime.clock.Sleep(5 * time.Second)
// Sleep to allow for cleanup
time.Sleep(200 * time.Millisecond)
// Get the number of goroutines again, which should be +/- 2 the initial one (we give it some buffer)
currentCount := runtime.NumGoroutine()
if currentCount >= (initialCount+2) || currentCount <= (initialCount-2) {
t.Fatalf("Current number of goroutine %[1]d is outside of range [%[2]d-2, %[2]d+2]", currentCount, initialCount)
}
}
}
t.Run("timers", testFn(func(i int, ttl bool) error {
req := &CreateTimerRequest{
ActorType: actorType,
ActorID: actorID,
Name: fmt.Sprintf("timer%d", i),
Data: json.RawMessage(`"data"`),
DueTime: "2s",
}
if ttl {
req.DueTime = "1s"
req.Period = "1s"
req.TTL = "2s"
}
return testActorsRuntime.CreateTimer(context.Background(), req)
}))
t.Run("reminders", testFn(func(i int, ttl bool) error {
req := &CreateReminderRequest{
ActorType: actorType,
ActorID: actorID,
Name: fmt.Sprintf("reminder%d", i),
Data: json.RawMessage(`"data"`),
DueTime: "2s",
}
if ttl {
req.DueTime = "1s"
req.Period = "1s"
req.TTL = "2s"
}
return testActorsRuntime.CreateReminder(context.Background(), req)
}))
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册