未验证 提交 a17cd2b9 编写于 作者: C Chris Gillum 提交者: GitHub

[Workflow] Add workflow generation to activity actor IDs (#6455)

* [Workflow] Add workflow generation to activity actor IDs

This change ensures uniqueness for activity invocations across
multiple workflow generations. This change will prevent actor
deadlocks caused by multiple workflow generations that schedule
concurrently executing activities with the same task ID.
Signed-off-by: NChris Gillum <cgillum@microsoft.com>

* Fix typos in comment
Co-authored-by: NAlessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
Signed-off-by: NChris Gillum <cgillum@gmail.com>
Signed-off-by: NChris Gillum <cgillum@microsoft.com>

---------
Signed-off-by: NChris Gillum <cgillum@microsoft.com>
Signed-off-by: NChris Gillum <cgillum@gmail.com>
Co-authored-by: NAlessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
上级 e51ef459
......@@ -146,7 +146,7 @@ func (a *activityActor) executeActivity(ctx context.Context, actorID string, nam
return err
}
endIndex := strings.LastIndex(actorID, "::")
endIndex := strings.Index(actorID, "::")
if endIndex < 0 {
return fmt.Errorf("invalid activity actor ID: %s", actorID)
}
......
......@@ -320,7 +320,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, actorID string, remind
}
req := actors.TransactionalRequest{
ActorType: wf.config.activityActorType,
ActorID: getActivityActorID(actorID, taskID),
ActorID: getActivityActorID(actorID, taskID, state.Generation),
Operations: []actors.TransactionalOperation{{
Operation: actors.Delete,
Request: actors.TransactionalDelete{
......@@ -417,7 +417,7 @@ func (wf *workflowActor) runWorkflow(ctx context.Context, actorID string, remind
if err != nil {
return err
}
targetActorID := getActivityActorID(actorID, e.EventId)
targetActorID := getActivityActorID(actorID, e.EventId, state.Generation)
req := invokev1.
NewInvokeMethodRequest("Execute").
......@@ -534,9 +534,9 @@ func getRuntimeState(actorID string, state workflowState) *backend.Orchestration
return backend.NewOrchestrationRuntimeState(api.InstanceID(actorID), state.History)
}
func getActivityActorID(workflowActorID string, taskID int32) string {
// An activity can be identified by it's name followed by it's task ID. Example: SayHello::0, SayHello::1, etc.
return fmt.Sprintf("%s::%d", workflowActorID, taskID)
func getActivityActorID(workflowActorID string, taskID int32, generation uint64) string {
// An activity can be identified by its name followed by its task ID and generation. Example: SayHello::0::1, SayHello::1::1, etc.
return fmt.Sprintf("%s::%d::%d", workflowActorID, taskID, generation)
}
func (wf *workflowActor) removeCompletedStateData(ctx context.Context, state workflowState, actorID string) error {
......@@ -554,7 +554,7 @@ func (wf *workflowActor) removeCompletedStateData(ctx context.Context, state wor
}
req := actors.TransactionalRequest{
ActorType: wf.config.activityActorType,
ActorID: getActivityActorID(actorID, taskID),
ActorID: getActivityActorID(actorID, taskID, state.Generation),
Operations: []actors.TransactionalOperation{{
Operation: actors.Delete,
Request: actors.TransactionalDelete{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册