Unreal Engine ControlFlows插件深度剖析:现代C++异步编程范式的工程实现
文章末尾有代码示例
前言:异步编程的复杂性挑战
在现代游戏引擎架构中,异步操作已成为性能优化和用户体验的核心要素。从GPU资源流送、网络通信到复杂的游戏状态机,传统的回调地狱(Callback Hell)和状态管理复杂性一直困扰着开发者。Epic Games的ControlFlows插件 不仅仅是一个异步管理工具,更是一个体现现代C++设计哲学的工程杰作,它通过精巧的类型系统、SFINAE技术和委托机制,构建了一套声明式的异步流程编程范式。
本文将从编译器层面、内存模型、并发理论和设计模式等多个维度,深度剖析ControlFlows的技术内核,揭示其背后的工程智慧。
说明:本文包含三类代码片段——(1) 源码摘录 (会尽量标注对应的头文件/实现文件);(2) 简化示意 (用于解释机制,可能不可直接编译);(3) 扩展设想 (帮助理解工程化方向,当前插件未必实现)。如需精确接口与行为,请以 Engine/Plugins/Experimental/ControlFlows/Source/ControlFlows 下最新源码为准。
核心架构:基于类型擦除的多态委托系统
类型系统设计哲学
ControlFlows的核心创新在于其类型感知的函数签名推断系统 。通过C++模板元编程和SFINAE(Substitution Failure Is Not An Error)技术,实现了编译时的函数类型自动识别:
cpp
template <typename BindingObjectClassT, typename ...ArgsT>
void QueueStep_Internal_DeduceBindingObject (
const FString& InDebugName,
typename TEnableIf<IsDerivedFromSharedFromThis<BindingObjectClassT>(),
BindingObjectClassT*>::Type InBindingObject,
ArgsT...Params)
{
if (InBindingObject->DoesSharedInstanceExist ())
{
QueueStep_Internal_TSharedFromThis (InDebugName,
StaticCastSharedRef <BindingObjectClassT>(InBindingObject->AsShared ()),
Params...);
}
}
template <typename BindingObjectClassT, typename ...ArgsT>
void QueueStep_Internal_DeduceBindingObject (
const FString& InDebugName,
typename TEnableIf<TIsDerivedFrom<BindingObjectClassT, UObject>::IsDerived,
BindingObjectClassT*>::Type InBindingObject,
ArgsT...Params)
{
QueueStep_Internal_UObject <BindingObjectClassT>(InDebugName, InBindingObject, Params...);
}
函数签名到执行策略的映射矩阵
┌─────────────────────────────────────────────────────────────────────────────┐
│ 函数签名 → 执行策略映射表 │
├─────────────────────────────────────────────────────────────────────────────┤
│ 函数签名 │ 节点类型 │ 执行模式 │
├───────────────────────────────────────────────────────┼───────────────┼─────────┤
│ void Function () │ SelfCompleting │ 同步 │
│ void Function (FControlFlowNodeRef) │ RequiresCallback │ 异步 │
│ void Function (TSharedRef<FControlFlow>) │ Task (SubFlow) │ 子流程 │
│ int32 Function (TSharedRef<FControlFlowBranch>) │ Task (Branch) │ 分支 │
│ void Function (TSharedRef<FConcurrentControlFlows>) │ Task (Concurrent) │ 并发 │
│ EConditionalLoopResult Function (TSharedRef<FConditionalLoop>) │ Task (Loop) │ 循环 │
└───────────────────────────────────────────────────────┴───────────────┴─────────┘
注:以上类型名与返回值以 UE 5.5 插件源码为准(例如 FConcurrentControlFlows、FConditionalLoop、EConditionalLoopResult)。
节点继承层次与多态执行机制
cpp
class FControlFlowNode : public TSharedFromThis<FControlFlowNode>
{
protected :
TWeakPtr<FControlFlow> Parent;
FString NodeName;
bool bCancelled = false ;
bool bWasBoundOnExecution = true ;
public :
virtual void Execute () {}
virtual void CancelFlow () ;
void ContinueFlow () ;
};
class FControlFlowNode_SelfCompleting : public FControlFlowNode
{
private :
FSimpleDelegate Process;
protected :
virtual void Execute () override
{
LogExecution ();
if (Parent.IsValid ())
{
Parent.Pin ()->ExecuteNode (SharedThis (this ));
}
}
};
class FControlFlowNode_RequiresCallback : public FControlFlowNode
{
private :
FControlFlowWaitDelegate Process;
protected :
virtual void Execute () override
{
LogExecution ();
if (Process.IsBound ())
{
Process.Execute (SharedThis (this ));
}
else
{
bWasBoundOnExecution = false ;
ContinueFlow ();
}
}
};
执行引擎:状态机驱动的流程调度器
核心执行循环的设计精髓
cpp
void FControlFlow::HandleControlFlowNodeCompleted (TSharedRef<const FControlFlowNode> NodeCompleted)
{
if (!ensure (SubFlowStack_ForDebugging.Num () < MAX_FLOW_LOOPS))
{
UE_LOG (LogControlFlows, Error, TEXT ("Hit maximum Flow loops. Infinite recursion detected?" ));
return ;
}
const bool bCancelRequested = !bInterpretCancelledNodeAsComplete &&
NodeCompleted->HasCancelBeenRequested ();
CurrentNode.Reset ();
CurrentlyRunningTask.Reset ();
if (FlowQueue.Num () > 0 )
{
if (bCancelRequested)
{
CurrentNode = FlowQueue[0 ];
FlowQueue.RemoveAt (0 );
CurrentNode->CancelFlow ();
}
else
{
bBroadcastingStepComplete = true ;
OnStepCompletedDelegate.Broadcast ();
bBroadcastingStepComplete = false ;
ExecuteNextNodeInQueue ();
}
}
else
{
if (bCancelRequested)
{
BroadcastCancellation ();
}
else
{
OnFlowCompleteDelegate.Broadcast ();
OnCompleteDelegate_Internal.ExecuteIfBound ();
}
FControlFlowStatics::HandleControlFlowFinishedNotification ();
}
}
FControlFlow 生命周期详解
理解 FControlFlow 的完整生命周期对于正确使用该系统至关重要。以下从状态机视角详细剖析流程对象从创建到销毁的全过程。
生命周期状态机
┌─────────────────────────────────────────────────────────────────────────────────┐
│ FControlFlow 生命周期状态机 │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ QueueXxx () ┌──────────┐ ExecuteFlow () ┌──────────┐ │
│ │ Created │ ─────────────→ │ Queuing │ ───────────────→│ Running │ │
│ └─────────┘ └──────────┘ └────┬─────┘ │
│ │ │ │ │
│ │ │ ┌─────┴─────┐ │
│ │ │ ↓ ↓ │
│ │ │ ┌──────────┐ ┌──────────┐ │
│ │ │ │Completing│ │Cancelling│ │
│ │ │ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │
│ │ │ ↓ ↓ │
│ │ │ ┌──────────────────────┐ │
│ │ └──────────────→│ Finished │ │
│ │ (空队列直接完成) │ (Completed/Cancelled)│ │
│ │ └──────────────────────┘ │
│ ↓ │ │
│ ┌─────────┐ │ │
│ │ Reset │←───────────────────────────────────────────────┘ │
│ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
阶段1:创建(Construction)
cpp
FControlFlow::FControlFlow (const FString& FlowDebugName)
{
if (FlowDebugName.IsEmpty ())
{
DebugName = FString::Format (TEXT ("ControlFlow_{0}" ),
{ FString::FormatAsNumber (UnnamedControlFlowCounter) });
UnnamedControlFlowCounter++;
}
else
{
DebugName = FlowDebugName;
}
}
创建方式有两种:
方式
代码示例
生命周期管理
直接 MakeShared
TSharedRef<FControlFlow> Flow = MakeShared<FControlFlow>("MyFlow");
由调用者持有 TSharedRef/TSharedPtr 控制
通过 FControlFlowStatics::Create
FControlFlow& Flow = FControlFlowStatics::Create(this, "MyFlow");
由 Manager 托管,自动清理
FControlFlowStatics 托管机制(摘录:ControlFlowManager.h):
cpp
template <typename OwningObjectT>
static FControlFlow& Create_Internal (OwningObjectT* OwningObject, const FString& FlowId, bool bIsPersistent)
{
TSharedRef<TControlFlowContainer<OwningObjectT>> NewContainer =
MakeShared<TControlFlowContainer<OwningObjectT>>(OwningObject, MakeShared <FControlFlow>(FlowId), FlowId);
if (bIsPersistent)
GetPersistentFlows ().Add (NewContainer);
else
GetNewlyCreatedFlows ().Add (NewContainer);
NewContainer->GetControlFlow ()->Reset ();
CheckNewlyCreatedFlows ();
return NewContainer->GetControlFlow ().Get ();
}
初始状态:
CurrentNode = nullptr(未运行)
FlowQueue 为空
bInterpretCancelledNodeAsComplete = false
各种计数器归零
阶段2:队列构建(Queuing)
通过链式调用 QueueXxx() 或 .QueueStep() 向 FlowQueue 添加节点:
cpp
FSimpleDelegate& FControlFlow::QueueFunction (const FString& FlowNodeDebugName)
{
TSharedRef<FControlFlowNode_SelfCompleting> NewNode =
MakeShared <FControlFlowNode_SelfCompleting>(SharedThis (this ), FormatOrGetNewNodeDebugName (FlowNodeDebugName));
FlowQueue.Add (NewNode);
return NewNode->Process;
}
FControlFlowWaitDelegate& FControlFlow::QueueWait (const FString& FlowNodeDebugName)
{
TSharedRef<FControlFlowNode_RequiresCallback> NewNode =
MakeShared <FControlFlowNode_RequiresCallback>(SharedThis (this ), FormatOrGetNewNodeDebugName (FlowNodeDebugName));
FlowQueue.Add (NewNode);
return NewNode->Process;
}
节点类型与创建:
队列方法
创建的节点类型
特点
QueueFunction()
FControlFlowNode_SelfCompleting
同步执行,自动完成
QueueWait()
FControlFlowNode_RequiresCallback
异步,需手动 ContinueFlow()
QueueControlFlow()
FControlFlowNode_Task + FControlFlowSimpleSubTask
子流程
QueueControlFlowBranch()
FControlFlowNode_Task + FControlFlowTask_Branch
分支
QueueConcurrentFlows()
FControlFlowNode_Task + FControlFlowTask_ConcurrentFlows
并发
QueueConditionalLoop()
FControlFlowNode_Task + FControlFlowTask_ConditionalLoop
循环
阶段3:启动执行(ExecuteFlow)
cpp
void FControlFlow::ExecuteFlow ()
{
if (ensureAlwaysMsgf (!CurrentNode.IsValid (),
TEXT ("Flow is already running! Or perhaps there are multiple instances of owning class?" )))
{
if (!ensure (SubFlowStack_ForDebugging.Num () < MAX_FLOW_LOOPS))
{
UE_LOG (LogControlFlows, Error, TEXT ("Hit maximum Flow loops. Infinite recursion?" ));
}
else
{
FControlFlowStatics::HandleControlFlowStartedNotification (AsShared ());
SubFlowStack_ForDebugging.Add (SharedThis (this ));
if (FlowQueue.Num () > 0 )
{
ExecuteNextNodeInQueue ();
}
else
{
OnFlowCompleteDelegate.Broadcast ();
OnExecutedWithoutAnyNodesDelegate_Internal.ExecuteIfBound ();
FControlFlowStatics::HandleControlFlowFinishedNotification ();
}
SubFlowStack_ForDebugging.Pop ();
}
}
}
Manager 状态转移(摘录:ControlFlowManager.cpp):
cpp
void FControlFlowStatics::HandleControlFlowStartedNotification (TSharedRef<const FControlFlow> InFlow)
{
TArray<TSharedRef<FControlFlowContainerBase>>& NewFlows = GetNewlyCreatedFlows ();
for (size_t Idx = 0 ; Idx < NewFlows.Num (); ++Idx)
{
if (InFlow == NewFlows[Idx]->GetControlFlow ())
{
GetExecutingFlows ().Add (NewFlows[Idx]);
}
NewFlows.RemoveAtSwap (Idx);
--Idx;
}
CheckForInvalidFlows ();
}
阶段4:节点执行循环(Node Execution Loop)
执行下一个节点:
cpp
void FControlFlow::ExecuteNextNodeInQueue ()
{
CurrentNode = FlowQueue[0 ];
if (Activity)
Activity->Update (*CurrentNode->GetNodeName ());
FlowQueue.RemoveAt (0 );
CurrentNode->Execute ();
}
不同节点类型的执行逻辑:
SelfCompleting(同步节点):
cpp
void FControlFlowNode_SelfCompleting::Execute ()
{
LogExecution ();
if (ensureAlways (Parent.IsValid ()))
{
Parent.Pin ()->ExecuteNode (SharedThis (this ));
}
}
void FControlFlow::ExecuteNode (TSharedRef<FControlFlowNode_SelfCompleting> SelfCompletingNode)
{
if (SelfCompletingNode->Process.IsBound ())
{
SelfCompletingNode->Process.Execute ();
}
else
{
SelfCompletingNode->bWasBoundOnExecution = false ;
}
if (SelfCompletingNode->HasCancelBeenRequested ())
{
FlowQueue.Reset ();
BroadcastCancellation ();
FControlFlowStatics::HandleControlFlowFinishedNotification ();
}
else
{
SelfCompletingNode->ContinueFlow ();
}
}
RequiresCallback(异步节点):
cpp
void FControlFlowNode_RequiresCallback::Execute ()
{
LogExecution ();
if (Process.IsBound ())
{
Process.Execute (SharedThis (this ));
}
else
{
bWasBoundOnExecution = false ;
ContinueFlow ();
}
}
用户代码需要调用:
cpp
void MyAsyncStep (FControlFlowNodeRef FlowHandle)
{
SomeAsyncOperation ([FlowHandle]() {
FlowHandle->ContinueFlow ();
});
}
阶段5:节点完成与流程推进
cpp
void FControlFlowNode::ContinueFlow ()
{
if (ensure (Parent.IsValid ()))
{
Parent.Pin ()->HandleControlFlowNodeCompleted (SharedThis (this ));
}
}
HandleControlFlowNodeCompleted 核心逻辑(摘录:ControlFlow.cpp):
cpp
void FControlFlow::HandleControlFlowNodeCompleted (TSharedRef<const FControlFlowNode> NodeCompleted)
{
if (ensureMsgf (CurrentNode.IsValid (), ...) &&
ensureMsgf (&NodeCompleted.Get () == CurrentNode.Get (), ...))
{
const bool bCancelRequested = !bInterpretCancelledNodeAsComplete &&
NodeCompleted->HasCancelBeenRequested ();
CurrentNode.Reset ();
CurrentlyRunningTask.Reset ();
if (FlowQueue.Num () > 0 )
{
if (bCancelRequested)
{
CurrentNode = FlowQueue[0 ];
FlowQueue.RemoveAt (0 );
CurrentNode->CancelFlow ();
}
else
{
bBroadcastingStepComplete = true ;
OnStepCompletedDelegate.Broadcast ();
bBroadcastingStepComplete = false ;
ExecuteNextNodeInQueue ();
}
}
else
{
if (bCancelRequested)
BroadcastCancellation ();
else
{
OnStepCompletedDelegate.Broadcast ();
OnFlowCompleteDelegate.Broadcast ();
OnCompleteDelegate_Internal.ExecuteIfBound ();
}
FControlFlowStatics::HandleControlFlowFinishedNotification ();
}
}
}
阶段6:取消流程(CancelFlow)
cpp
void FControlFlow::CancelFlow ()
{
if (CurrentNode.IsValid ())
{
CurrentNode->CancelFlow ();
}
else
{
BroadcastCancellation ();
FControlFlowStatics::HandleControlFlowFinishedNotification ();
}
}
void FControlFlowNode::CancelFlow ()
{
if (ensure (Parent.IsValid ()))
{
bCancelled = true ;
Parent.Pin ()->HandleControlFlowNodeCompleted (SharedThis (this ));
}
}
取消传播机制:
当节点完成时检测到 bCancelled = true 且 bInterpretCancelledNodeAsComplete = false
会对队列中剩余的每个节点调用 CancelFlow()
形成级联取消直到队列清空
SetCancelledNodeAsComplete(true) 详解:
该方法用于控制取消是否级联传播。核心逻辑在 HandleControlFlowNodeCompleted 中:
cpp
const bool bCancelRequested = !bInterpretCancelledNodeAsComplete && NodeCompleted->HasCancelBeenRequested ();
这是一个与运算 ,bCancelRequested = true 需要同时满足 :
bInterpretCancelledNodeAsComplete == false(默认值)
NodeCompleted->HasCancelBeenRequested() == true(节点被取消)
设置
节点被取消后
后续节点
SetCancelledNodeAsComplete(false)(默认)
bCancelRequested = true
级联取消所有后续节点
SetCancelledNodeAsComplete(true)
bCancelRequested = false
正常继续执行后续节点
行为对比图解:
默认行为 (bInterpretCancelledNodeAsComplete = false):
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ Step1 │───→│ Step2 │───→│ Step3 │───→│ Step4 │
└────────┘ └───┬────┘ └────────┘ └────────┘
│ CancelFlow ()
↓
Step2 取消 → Step3 取消 → Step4 取消 → OnFlowCancelledDelegate
设置 SetCancelledNodeAsComplete (true):
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ Step1 │───→│ Step2 │───→│ Step3 │───→│ Step4 │
└────────┘ └───┬────┘ └────────┘ └────────┘
│ CancelFlow ()
↓
Step2 "完成" → Step3 执行 → Step4 执行 → OnFlowCompleteDelegate
使用场景示例: 当希望某个节点的取消不影响后续流程时使用:
cpp
Flow.SetCancelledNodeAsComplete (true )
.QueueStep (TEXT ("可选步骤" ), [](FControlFlowNodeRef Node) {
if (UserClickedSkip)
Node->CancelFlow ();
else
Node->ContinueFlow ();
})
.QueueStep (TEXT ("必须执行" ), []() {
})
.ExecuteFlow ();
简单理解 :把"取消"当作"完成"来处理,不触发级联取消。
阶段7:流程结束与清理
委托广播顺序:
场景
广播顺序
正常完成
OnStepCompletedDelegate → OnFlowCompleteDelegate → OnCompleteDelegate_Internal
取消
OnFlowCancelledDelegate → OnCancelledDelegate_Internal
Manager 清理流程(摘录:ControlFlowManager.cpp):
cpp
bool FControlFlowStatics::IterateForInvalidFlows (float DeltaTime)
{
for (auto & PersistentFlow : GetPersistentFlows ())
{
if (PersistentFlow->IsRunning ())
{
GetExecutingFlows ().Add (PersistentFlow);
}
}
for (auto & ExecutingFlow : GetExecutingFlows ())
{
if (!ExecutingFlow->IsRunning () && ExecutingFlow->NumInQueue () == 0 )
{
ExecutingFlow->Activity = nullptr ;
GetFinishedFlows ().Add (ExecutingFlow);
}
}
for (auto & CompletedFlow : GetFinishedFlows ())
{
}
return false ;
}
阶段8:重置(Reset)
cpp
void FControlFlow::Reset ()
{
CurrentNode.Reset ();
CurrentlyRunningTask.Reset ();
FlowQueue.Reset ();
SubFlowStack_ForDebugging.Reset ();
UnnamedNodeCounter = 0 ;
UnnamedBranchCounter = 0 ;
}
Reset 后可以重新 Queue + ExecuteFlow。
QueueDelay 与 LastZeroSecondDelay 机制
QueueDelay 用于在流程中插入延迟,其中 LastZeroSecondDelay 是一个防止同一帧内多个 0 秒延迟嵌套执行的节流机制。
问题背景: 当使用 QueueDelay(0.0f) 时,虽然延迟时间是 0 秒,但它仍然会通过 FTSTicker 推迟到帧末尾执行。如果在同一帧内有多个连续的 0 秒延迟,可能导致 Ticker 嵌套回调问题。
核心数据结构:
cpp
TPair<double , float > LastZeroSecondDelay;
QueueDelay 源码解析(摘录:ControlFlow.cpp):
cpp
FControlFlow& FControlFlow::QueueDelay (const float InSeconds, const FString& NodeName)
{
QueueWait (NodeName).BindLambda ([InSeconds](FControlFlowNodeRef FlowHandle)
{
TSharedRef<FControlFlow> OwningFlow = FlowHandle->Parent.Pin ().ToSharedRef ();
const float AdditionalDelay = (InSeconds == 0.0f &&
OwningFlow->LastZeroSecondDelay.Key == FApp::GetCurrentTime ())
? OwningFlow->LastZeroSecondDelay.Value : 0.0f ;
const float SecondsToDelay = InSeconds + AdditionalDelay;
FTSTicker::GetCoreTicker ().AddTicker (FTickerDelegate::CreateLambda (
[FlowHandle, bIsZeroSecondDelay = SecondsToDelay == 0.0f ](float DeltaTime)
{
if (FlowHandle->Parent.IsValid () && !FlowHandle->HasCancelBeenRequested ())
{
if (bIsZeroSecondDelay)
{
TSharedPtr<FControlFlow> FlowIter = FlowHandle->Parent.Pin ().ToSharedRef ();
while (FlowIter.IsValid ())
{
FlowIter->LastZeroSecondDelay = { FApp::GetCurrentTime (), DeltaTime };
FlowIter = FlowIter->ParentFlow.Pin ();
}
}
FlowHandle->ContinueFlow ();
}
return false ;
}), SecondsToDelay);
});
return *this ;
}
执行流程示例:
假设以下流程在同一帧内执行:
cpp
Flow.QueueStep (TEXT ("Step1" ), []() { })
.QueueDelay (0.0f , TEXT ("Delay1" ))
.QueueStep (TEXT ("Step2" ), []() { })
.QueueDelay (0.0f , TEXT ("Delay2" ))
.QueueStep (TEXT ("Step3" ), []() { })
.ExecuteFlow ();
帧 N(时间 = 100.0,DeltaTime = 0.016):
步骤
操作
LastZeroSecondDelay
说明
1
Step1 执行完成
{0.0, 0.0}
初始状态
2
Delay1 入队执行
-
AdditionalDelay = 0(时间戳不匹配),SecondsToDelay = 0
3
Ticker 回调(Delay1)
{100.0, 0.016}
记录本帧时间戳和 DeltaTime
4
Step2 执行完成
{100.0, 0.016}
-
5
Delay2 入队执行
-
AdditionalDelay = 0.016(时间戳匹配),SecondsToDelay = 0.016
6
流程暂停
-
等待下一帧
帧 N+1(时间 = 100.016):
步骤
操作
说明
7
Ticker 回调(Delay2)
延迟到期
8
Step3 执行
流程完成
时间线对比:
没有 LastZeroSecondDelay 机制(假设):
帧 N : Step1 → Delay1 → [Ticker] → Step2 → Delay2 → [Ticker] → Step3
↑ 全部在同一帧,可能导致 Ticker 嵌套调用问题
有 LastZeroSecondDelay 机制(实际):
帧 N : Step1 → Delay1 → [Ticker] → Step2 → Delay2 → [等待...]
↓
帧 N +1 : [Ticker] → Step3
↑ 第二个 0 秒延迟被推迟到下一帧
核心逻辑:
┌─────────────────────────────────────────────────────────────────────────┐
│ QueueDelay (0.0 f) 执行逻辑 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 本帧是否已有 0 秒延迟? │
│ (LastZeroSecondDelay.Key == FApp ::GetCurrentTime ()) │
│ │
│ ├── 否 → SecondsToDelay = 0.0 f(本帧末尾执行) │
│ │ Ticker 回调时记录 LastZeroSecondDelay │
│ │ │
│ └── 是 → SecondsToDelay = 0.0 f + LastDeltaTime │
│ (推迟到下一帧执行) │
│ │
└─────────────────────────────────────────────────────────────────────────┘
为什么要向上传播到父流程? 子流程、分支、并发流程都会继承父流程的 LastZeroSecondDelay,确保嵌套流程中的 0 秒延迟也能正确累积。
简单理解: 每帧只允许一个 QueueDelay(0.0f) 立即执行,后续的 0 秒延迟会被推迟到下一帧。这是一种节流(throttling)机制,避免同一帧内多个 Ticker 嵌套回调。
子流程/并发流程的生命周期
子流程(SubFlow):
cpp
FControlFlowPopulator& FControlFlow::QueueControlFlow (const FString& TaskName, const FString& FlowNodeDebugName)
{
TSharedRef<FControlFlowSimpleSubTask> NewTask = MakeShared <FControlFlowSimpleSubTask>(TaskName);
TSharedRef<FControlFlowNode_Task> NewNode = MakeShared <FControlFlowNode_Task>(...);
NewTask->GetTaskFlow ()->ParentFlow = AsWeak ();
NewTask->GetTaskFlow ()->Activity = Activity;
...
}
void FControlFlowSimpleSubTask::Execute ()
{
if (TaskPopulator.IsBound () && GetTaskFlow ().IsValid ())
{
GetTaskFlow ()->LastZeroSecondDelay = GetOwningFlowForTaskNode ().Pin ()->LastZeroSecondDelay;
GetTaskFlow ()->OnCompleteDelegate_Internal.BindSP (SharedThis (this ), &FControlFlowSimpleSubTask::CompletedSubTask);
GetTaskFlow ()->OnExecutedWithoutAnyNodesDelegate_Internal.BindSP (...);
GetTaskFlow ()->OnCancelledDelegate_Internal.BindSP (...);
TaskPopulator.Execute (GetTaskFlow ().ToSharedRef ());
ensureAlwaysMsgf (!GetTaskFlow ()->IsRunning (),
TEXT ("Did you call ExecuteFlow() on a SubFlow? You don't need to." ));
GetTaskFlow ()->ExecuteFlow ();
}
}
关键点:
子流程的 ExecuteFlow() 由 Task 内部调用,用户不应手动调用
子流程完成后通过 OnCompleteDelegate_Internal 通知父流程继续
生命周期时序图
用户代码 FControlFlow FControlFlowNode Manager
│ │ │ │
│ MakeShared/Create │ │ │
├───────────────────────────→│ │ │
│ │ ctor: 初始化 DebugName │ │
│ │ │ │
│ .QueueStep ()/QueueWait () │ │ │
├───────────────────────────→│ │ │
│ │ 创建 Node, 加入 FlowQueue │ │
│ │ │ │
│ .ExecuteFlow () │ │ │
├───────────────────────────→│ │ │
│ │ HandleControlFlowStartedNotification ─────────────────────→│
│ │ │ NewlyCreated→Executing │
│ │ ExecuteNextNodeInQueue () │ │
│ ├───────────────────────────────→│ │
│ │ │ Execute () │
│ │ │ (执行绑定函数) │
│ │ │ │
│ │←───────────────────────────────┤ ContinueFlow () │
│ │ HandleControlFlowNodeCompleted │ │
│ │ OnStepCompletedDelegate.Broadcast () │
│ │ │ │
│ │ (队列非空) ExecuteNextNodeInQueue () │
│ │ ...循环... │ │
│ │ │ │
│ │ (队列空) OnFlowCompleteDelegate.Broadcast () │
│ │ HandleControlFlowFinishedNotification ─────────────────────→│
│ │ │ Executing→Finished │
│ │ │ │
│ │ │ (下一帧清理) │
关键成员变量与状态判断
cpp
class FControlFlow
{
private :
FString DebugName;
bool bInterpretCancelledNodeAsComplete = false ;
bool bBroadcastingStepComplete = false ;
TSharedPtr<FControlFlowNode_Task> CurrentlyRunningTask = nullptr ;
TSharedPtr<FControlFlowNode> CurrentNode = nullptr ;
TWeakPtr<FControlFlow> ParentFlow;
TArray<TSharedRef<FControlFlowNode>> FlowQueue;
};
bool FControlFlow::IsRunning () const
{
return CurrentNode.IsValid () || bBroadcastingStepComplete;
}
注意事项与最佳实践
不要重复调用 ExecuteFlow() :源码有 ensureAlwaysMsgf(!CurrentNode.IsValid(), ...) 检查
子流程不要手动 ExecuteFlow() :由 Task 内部驱动
异步节点必须调用 ContinueFlow() 或 CancelFlow() :否则流程会永久挂起
使用 SetCancelledNodeAsComplete(true) 可以让取消不级联
FControlFlowStatics::Create 创建的流程会在下一帧检查是否已执行 :未执行会自动触发 ExecuteFlow()
内存管理:智能指针与生命周期控制
ControlFlows采用了精密的内存管理策略,通过智能指针系统确保对象生命周期的安全性:
┌─────────────────────────────────────────────────────────────────────────────┐
│ 内存引用关系图 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ TSharedRef ┌─────────────────┐ │
│ │ FControlFlow │◄─────────────────┤ FControlFlowNode │ │
│ │ │ │ │ │
│ │ • FlowQueue │ │ • Parent (Weak) │ │
│ │ • CurrentNode │ │ • NodeName │ │
│ │ • ParentFlow │ │ • bCancelled │ │
│ └─────────────────┘ └─────────────────┘ │
│ │ │ │
│ │ TWeakPtr │ TSharedRef │
│ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ ParentFlow │ │ FControlFlowTask │ │
│ │ (Optional) │ │ │ │
│ └─────────────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
并发执行模型:从单线程到真并行
ControlFlows的并发系统展现了从简单到复杂的渐进式设计:
cpp
enum class EConcurrentExecution
{
Default,
Random,
Parallel,
};
void FConcurrentControlFlows::Execute ()
{
ensureAlwaysMsgf (!HasAnySubFlowBeenExecuted (),
TEXT ("Duplicate execution detected! Call ExecuteFlow() only once per flow!" ));
if (ExecutionBehavior == EConcurrentExecution::Parallel)
{
ParallelFor (FlowsToExecute.Num (), [&FlowsToExecute](int32 Index)
{
UE_LOG (LogControlFlows, Verbose, TEXT ("Parallel execution: %s" ),
*FlowsToExecute[Index]->GetDebugName ());
FlowsToExecute[Index]->ExecuteFlow ();
});
}
else
{
if (ExecutionBehavior == EConcurrentExecution::Random)
{
for (int32 Idx = 0 ; Idx < FlowsToExecute.Num (); ++Idx)
{
Swap (FlowsToExecute[Idx],
FlowsToExecute[FMath::RandRange (0 , FlowsToExecute.Num () - 1 )]);
}
}
for (const auto & Flow : FlowsToExecute)
{
Flow->ExecuteFlow ();
if (bCancelAllHasBegun) break ;
}
}
}
重要提示:并发/子流程的 ExecuteFlow() 由 ForkFlow(...) / QueueConcurrentFlows(...) 内部调度触发 (例如并行策略下会通过 ParallelFor 批量启动子流程)。一般不应在外部对这些 subflow 再次手动调用 ExecuteFlow();源码中有 ensureAlwaysMsgf 用于检测重复执行(提示语类似 “Call ExecuteFlow() only once per flow!”)。
高级设计模式:观察者模式与责任链模式的融合
ControlFlows插件的核心创新在于巧妙地融合了观察者模式 与责任链模式 ,创造出一套既具备流程控制灵活性又具备事件响应解耦性的异步编程框架。
融合模式的核心理念
在传统的设计模式应用中,观察者模式和责任链模式通常独立使用。ControlFlows的突破性在于将两者有机结合:
责任链模式 :负责流程的顺序执行与传递
观察者模式 :负责状态变化的事件通知与响应
cpp
┌─────────────────────────────────────────────────────────────────────────────┐
│ 观察者 + 责任链融合架构 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ Node1 ──ContinueFlow ()──→ Flow ──Broadcast ()──→ Observers │
│ │ │ ↓ │
│ │ │ ┌─────────────┐ │
│ └──责任传递──→ Node2 ──ContinueFlow ()──→ │ 事件处理器 │ │
│ │ └─────────────┘ │
│ │ │
│ └──责任传递──→ Node3 ──→ Complete │
│ │
│ 🔄 责任链:节点间的顺序执行传递 │
│ 🔔 观察者:状态变化的事件通知 │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
观察者模式的实现层次
cpp
DECLARE_MULTICAST_DELEGATE (FOnFlowComplete);
DECLARE_MULTICAST_DELEGATE (FOnStepCompleted);
DECLARE_MULTICAST_DELEGATE (FOnFlowCancelled);
class FControlFlow : public TSharedFromThis<FControlFlow>
{
private :
FOnFlowComplete OnFlowCompleteDelegate;
FOnStepCompleted OnStepCompletedDelegate;
FOnFlowCancelled OnFlowCancelledDelegate;
FSimpleDelegate OnCompleteDelegate_Internal;
FSimpleDelegate OnCancelledDelegate_Internal;
public :
FOnFlowComplete& OnFlowComplete ()
{
return OnFlowCompleteDelegate;
}
FOnStepCompleted& OnStepCompleted ()
{
return OnStepCompletedDelegate;
}
FOnFlowCancelled& OnFlowCancelled ()
{
return OnFlowCancelledDelegate;
}
};
观察者模式特征:
主题(Subject) :FControlFlow 流程对象
观察者(Observer) :注册的委托函数
通知机制 :状态改变时自动广播给所有观察者
松耦合 :观察者与主题之间无直接依赖
责任链模式的执行机制
cpp
void FControlFlow::HandleControlFlowNodeCompleted (TSharedRef<const FControlFlowNode> NodeCompleted)
{
CurrentNode.Reset ();
CurrentlyRunningTask.Reset ();
if (FlowQueue.Num () > 0 )
{
if (!bCancelRequested)
{
bBroadcastingStepComplete = true ;
OnStepCompletedDelegate.Broadcast ();
bBroadcastingStepComplete = false ;
ExecuteNextNodeInQueue ();
}
}
else
{
OnStepCompletedDelegate.Broadcast ();
OnFlowCompleteDelegate.Broadcast ();
OnCompleteDelegate_Internal.ExecuteIfBound ();
}
}
void FControlFlow::ExecuteNextNodeInQueue ()
{
CurrentNode = FlowQueue[0 ];
FlowQueue.RemoveAt (0 );
CurrentNode->Execute ();
}
责任链模式特征:
处理者(Handler) :每个 FControlFlowNode
请求传递 :通过 ContinueFlow() → HandleControlFlowNodeCompleted() → ExecuteNextNodeInQueue()
链式结构 :FlowQueue 维护处理者队列
职责单一 :每个节点只处理自己的逻辑
融合机制的实现细节
cpp
void FControlFlowNode::ContinueFlow ()
{
if (ensure (Parent.IsValid ()))
{
Parent.Pin ()->HandleControlFlowNodeCompleted (SharedThis (this ));
}
}
void FControlFlowNode_SelfCompleting::Execute ()
{
LogExecution ();
if (ensureAlways (Parent.IsValid ()))
{
Parent.Pin ()->ExecuteNode (SharedThis (this ));
}
}
void FControlFlowNode_RequiresCallback::Execute ()
{
LogExecution ();
if (Process.IsBound ())
{
Process.Execute (SharedThis (this ));
}
else
{
bWasBoundOnExecution = false ;
ContinueFlow ();
}
}
实际应用场景示例
cpp
class FAdvancedWorkflow : public TSharedFromThis<FAdvancedWorkflow>
{
public :
void ExecuteComplexWorkflow ()
{
TSharedRef<FControlFlow> MainFlow = MakeShared <FControlFlow>("ComplexWorkflow" );
MainFlow->OnStepCompleted ().AddSP (this , &FAdvancedWorkflow::OnStepCompleted);
MainFlow->OnFlowComplete ().AddSP (this , &FAdvancedWorkflow::OnWorkflowComplete);
MainFlow->OnFlowCancelled ().AddSP (this , &FAdvancedWorkflow::OnWorkflowCancelled);
MainFlow->QueueStep ("数据预处理" , this , &FAdvancedWorkflow::PreprocessData)
.QueueStep ("业务逻辑执行" , this , &FAdvancedWorkflow::ExecuteBusinessLogic)
.BranchFlow ([this ](TSharedRef<FControlFlowBranch> Branch)
{
EBusinessState State = EvaluateBusinessState ();
Branch->AddOrGetBranch (static_cast <int32>(EBusinessState::Success))
.QueueStep ("成功处理" , this , &FAdvancedWorkflow::HandleSuccess)
.QueueStep ("结果持久化" , this , &FAdvancedWorkflow::PersistResults);
Branch->AddOrGetBranch (static_cast <int32>(EBusinessState::Retry))
.QueueStep ("重试逻辑" , this , &FAdvancedWorkflow::HandleRetry)
.Loop ([this ](TSharedRef<FConditionalLoop> RetryLoop)
{
RetryLoop->CheckConditionFirst ()
.QueueStep ("重新执行" , this , &FAdvancedWorkflow::RetryExecution);
return ShouldContinueRetry () ? EConditionalLoopResult::RunLoop
: EConditionalLoopResult::LoopFinished;
});
Branch->AddOrGetBranch (static_cast <int32>(EBusinessState::Failed))
.QueueStep ("错误处理" , this , &FAdvancedWorkflow::HandleError)
.QueueStep ("清理资源" , this , &FAdvancedWorkflow::CleanupResources);
return static_cast <int32>(State);
})
.QueueStep ("工作流完成" , this , &FAdvancedWorkflow::FinalizeWorkflow)
.ExecuteFlow ();
}
private :
void OnStepCompleted ()
{
UE_LOG (LogWorkflow, Log, TEXT ("步骤完成 - 更新进度条" ));
UpdateProgressBar ();
NotifyStakeholders ();
}
void OnWorkflowComplete ()
{
UE_LOG (LogWorkflow, Log, TEXT ("工作流完成 - 触发后续处理" ));
TriggerPostProcessing ();
SendCompletionNotification ();
}
void OnWorkflowCancelled ()
{
UE_LOG (LogWorkflow, Warning, TEXT ("工作流取消 - 执行清理" ));
PerformCleanup ();
NotifyFailure ();
}
};
融合模式的优势分析
方面
观察者模式贡献
责任链模式贡献
融合效果
解耦性
流程状态变化与响应逻辑解耦
节点处理逻辑与流程控制解耦
高度模块化的异步系统
扩展性
可动态添加/移除观察者
可动态构建处理链
灵活的流程定制能力
可维护性
状态变化通知集中管理
处理逻辑职责单一
清晰的代码结构
性能
事件驱动,按需响应
顺序执行,避免轮询
高效的异步执行
可测试性
可独立测试事件通知
可独立测试处理逻辑
单元测试友好
设计哲学的深层理解
cpp
template <typename T>
concept FlowNodeConcept = requires (T node)
{
{ node.ContinueFlow () } -> std::same_as<void >;
{ node.CancelFlow () } -> std::same_as<void >;
{ node.GetNodeName () } -> std::convertible_to<FString>;
{ node.HasCancelBeenRequested () } -> std::same_as<bool >;
};
static_assert (FlowNodeConcept<FControlFlowNode>,
"Flow nodes must support both chain-of-responsibility and observer patterns" );
这种融合设计的核心价值在于:
单一职责原则 :
开闭原则 :
可以扩展新的节点类型(责任链)
可以添加新的观察者(观察者模式)
依赖倒置原则 :
高层模块(Flow)不依赖低层模块(Node)
都依赖于抽象(委托接口)
组合优于继承 :
委托系统的多层抽象
ControlFlows的委托系统展现了现代C++的类型安全设计:
cpp
DECLARE_DELEGATE_OneParam (FControlFlowWaitDelegate, FControlFlowNodeRef)
DECLARE_DELEGATE_OneParam (FControlFlowPopulator, TSharedRef<FControlFlow>)
DECLARE_DELEGATE_RetVal_OneParam (int32, FControlFlowBranchDefiner, TSharedRef<FControlFlowBranch>)
DECLARE_DELEGATE_RetVal_OneParam (EConditionalLoopResult, FControlFlowConditionalLoopDefiner, TSharedRef<FConditionalLoop>)
DECLARE_DELEGATE_OneParam (FConcurrentFlowsDefiner, TSharedRef<FConcurrentControlFlows>)
template <typename BindingObjectClassT, typename ...PayloadParamsT>
void QueueStep_Internal_TSharedFromThis (
const FString& InDebugName,
TSharedRef<BindingObjectClassT> InBindingObject,
typename TMemFunPtrType<false , BindingObjectClassT,
EConditionalLoopResult(TSharedRef<FConditionalLoop>, PayloadParamsT...)>::Type InFunction,
PayloadParamsT...Params)
{
QueueConditionalLoop (InDebugName).BindSP (InBindingObject, InFunction, Params...);
}
template <typename BindingObjectClassT, typename ...PayloadParamsT>
void QueueStep_Internal_TSharedFromThis (
const FString& InDebugName,
TSharedRef<BindingObjectClassT> InBindingObject,
typename TMemFunPtrType<false , BindingObjectClassT,
void (FControlFlowNodeRef, PayloadParamsT...)>::Type InFunction,
PayloadParamsT...Params)
{
QueueWait (InDebugName).BindSP (InBindingObject, InFunction, Params...);
}
条件循环:函数式编程范式的体现
cpp
class FConditionalLoop : public TSharedFromThis<FConditionalLoop>
{
private :
TOptional<bool > CheckConditionalFirst;
TSharedRef<FControlFlow> FlowLoop;
public :
FControlFlow& CheckConditionFirst ()
{
CheckConditionalFirst = true ;
return FlowLoop.Get ();
}
FControlFlow& RunLoopFirst ()
{
CheckConditionalFirst = false ;
return FlowLoop.Get ();
}
FControlFlow& ExecuteAtLeastOnce ()
{
return RunLoopFirst ();
}
};
MyFlow.Loop ([this ](TSharedRef<FConditionalLoop> OuterLoop)
{
OuterLoop->CheckConditionFirst ()
.QueueStep ("外层循环体开始" , this , &MyClass::OuterLoopStart)
.Loop ([this ](TSharedRef<FConditionalLoop> InnerLoop)
{
InnerLoop->RunLoopFirst ()
.QueueStep ("内层处理" , this , &MyClass::InnerProcess)
.BranchFlow ([this ](TSharedRef<FControlFlowBranch> Branch)
{
int32 decision = EvaluateInnerCondition ();
Branch->AddOrGetBranch (0 )
.QueueStep ("分支A处理" , this , &MyClass::ProcessBranchA);
Branch->AddOrGetBranch (1 )
.QueueStep ("分支B处理" , this , &MyClass::ProcessBranchB);
return decision;
});
return ShouldContinueInner () ? EConditionalLoopResult::RunLoop
: EConditionalLoopResult::LoopFinished;
})
.QueueStep ("外层循环体结束" , this , &MyClass::OuterLoopEnd);
return ShouldContinueOuter () ? EConditionalLoopResult::RunLoop
: EConditionalLoopResult::LoopFinished;
})
并发控制:从理论到实践
并发模型的三种执行策略
ControlFlows提供了三种并发执行策略,通过 EConcurrentExecution 枚举定义:
cpp
enum class EConcurrentExecution
{
Default,
Random,
Parallel,
};
实际执行逻辑(摘录自源码):
cpp
class FConcurrentControlFlows : public TSharedFromThis<FConcurrentControlFlows>
{
public :
CONTROLFLOWS_API FControlFlow& AddOrGetFlow (int32 InIdentifier, const FString& DebugSubFlowName = TEXT("" )) ;
CONTROLFLOWS_API FConcurrentControlFlows& SetExecution (const EConcurrentExecution InBehavior) ;
private :
void Execute ()
{
ensureAlwaysMsgf (!this ->HasAnySubFlowBeenExecuted (),
TEXT ("Did you call ExecuteFlow() on a SubFlow? Do not do this!" ));
if (ConcurrentFlows.Num () == 0 )
{
OnAllCompleted ();
return ;
}
TArray<TSharedRef<FConcurrencySubFlowContainer>> FlowsToExecute;
for (const TPair<int32, TSharedRef<FConcurrencySubFlowContainer>>& PairIt : ConcurrentFlows)
{
FlowsToExecute.Add (PairIt.Value);
PairIt.Value->GetControlFlow ()->LastZeroSecondDelay =
OwningTask.Pin ()->GetOwningFlowForTaskNode ().Pin ()->LastZeroSecondDelay;
}
if (ExecutionBehavior == EConcurrentExecution::Parallel)
{
ParallelFor (FlowsToExecute.Num (), [&FlowsToExecute](int32 Index)
{
UE_LOG (LogControlFlows, Verbose, TEXT ("Executing Subflow %s" ),
*FlowsToExecute[Index]->GetDebugName ());
FlowsToExecute[Index]->Execute ();
});
}
else
{
if (ExecutionBehavior == EConcurrentExecution::Random)
{
for (int32 Idx = 0 ; Idx < FlowsToExecute.Num (); ++Idx)
{
Swap (FlowsToExecute[Idx],
FlowsToExecute[FMath::RandRange (0 , FlowsToExecute.Num () - 1 )]);
}
}
for (const TSharedRef<FConcurrencySubFlowContainer>& Flow : FlowsToExecute)
{
UE_LOG (LogControlFlows, Verbose, TEXT ("Executing Subflow %s" ),
*Flow->GetDebugName ());
Flow->Execute ();
if (bCancelAllHasBegun)
{
break ;
}
}
}
}
EConcurrentExecution ExecutionBehavior = EConcurrentExecution::Default;
TMap<int32, TSharedRef<FConcurrencySubFlowContainer>> ConcurrentFlows;
bool bCancelAllHasBegun = false ;
TWeakPtr<FControlFlowSubTaskBase> OwningTask;
};
同步原语与完成检测
cpp
void FConcurrentControlFlows::CheckToBroadcastComplete ()
{
if (GetConcurrencyBehavior ().GetContinueCondition () == FConcurrentControlFlowBehavior::EContinueConditions::Default)
{
if (AreAllSubFlowsCompletedOrCancelled ())
{
OnConcurrencyCompleted.ExecuteIfBound ();
}
else
{
UE_LOG (LogControlFlows, Verbose, TEXT ("Other flows are still running" ));
}
}
else
{
checkf (false , TEXT ("Unhandled Continue Condition" ));
}
}
bool FConcurrentControlFlows::AreAllSubFlowsCompletedOrCancelled () const
{
for (const TPair<int32, TSharedRef<FConcurrencySubFlowContainer>>& PairIt : ConcurrentFlows)
{
if (!PairIt.Value->IsCompleteOrCancelled ())
{
return false ;
}
}
return true ;
}
并发行为的前瞻性设计:FConcurrentControlFlowBehavior
ControlFlows插件展现了Epic Games对复杂并发语义的前瞻性思考。虽然当前只实现了基础的同步模式,但预留了丰富的并发控制扩展空间:
cpp
class FConcurrentControlFlowBehavior
{
friend class FConcurrentControlFlows ;
private :
enum class EContinueConditions
{
Default,
};
EContinueConditions GetContinueCondition () const
{
return EContinueConditions::Default;
}
};
并发语义的三大类设计
1. Sync 语义(Default - 当前实现)
┌─────────────────────────────────────────────────────────────┐
│ Sync 模式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ SubFlow1 ────────────────────────────────────────────────┐ │
│ SubFlow2 ──────────────────────────────────────────────┐ │ │
│ SubFlow3 ────────────────────────────────────────────┐ │ │ │
│ │ │ │ │
│ ↓ ↓ ↓ │
│ 等待所有完成 │
│ ↓ │
│ 继续主流程 │
│ │
└─────────────────────────────────────────────────────────────┘
特点:
等待所有 子流程完成或取消
提供最强的同步保证
类似于 std::thread::join() 的语义
2. Race 语义(未来扩展)
┌─────────────────────────────────────────────────────────────┐
│ Race 模式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ SubFlow1 ────────────────────────────────────────────────┐ │
│ SubFlow2 ──────────────────────────────────X (取消) │ │
│ SubFlow3 ────────────────────X (取消) │ │
│ │ │
│ ↓ │
│ 第一个完成立即继续 │
│ 其他流程被取消 │
│ │
└─────────────────────────────────────────────────────────────┘
变体:
Race :第一个完成或取消就继续,其他取消
Race_Complete :第一个完成就继续(忽略取消),其他取消
Race_Cancel :第一个取消就继续(忽略完成),其他取消
3. Rush 语义(未来扩展)
┌─────────────────────────────────────────────────────────────┐
│ Rush 模式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ SubFlow1 ────────────────────────────────────────────────┐ │
│ SubFlow2 ──────────────────────────────────────────────→ │ │
│ SubFlow3 ────────────────────────────────────────────────→ │
│ │ │
│ ↓ │
│ 第一个完成立即继续 │
│ 其他流程继续执行 │
│ │
└─────────────────────────────────────────────────────────────┘
变体:
Rush :第一个完成或取消就继续,其他继续执行
Rush_Complete :第一个完成就继续,其他继续执行
Rush_Cancel :第一个取消就继续,其他继续执行
设计哲学的深层理解
cpp
为什么不需要 Sync_Complete 和 Sync_Cancel?
Sync 本质是等待所有流程结束(无论完成还是取消)
正在运行的流程必须等到它完成或取消,才能满足同步条件
区分完成和取消就不是真正的"同步"语义了
实际应用场景展望
cpp
Flow.ForkFlow ([](TSharedRef<FConcurrentControlFlows> Concurrent)
{
FConcurrentControlFlowBehavior RaceBehavior;
RaceBehavior.SetContinueCondition (EContinueConditions::Race_Complete);
Concurrent->SetBehavior (RaceBehavior);
Concurrent->AddOrGetFlow (0 , "CDN服务器A" );
Concurrent->AddOrGetFlow (1 , "CDN服务器B" );
Concurrent->AddOrGetFlow (2 , "CDN服务器C" );
});
Flow.ForkFlow ([](TSharedRef<FConcurrentControlFlows> Concurrent)
{
FConcurrentControlFlowBehavior RushBehavior;
RushBehavior.SetContinueCondition (EContinueConditions::Rush_Complete);
Concurrent->SetBehavior (RushBehavior);
Concurrent->AddOrGetFlow (0 , "路径规划算法" );
Concurrent->AddOrGetFlow (1 , "行为树决策" );
Concurrent->AddOrGetFlow (2 , "神经网络推理" );
});
这种前瞻性设计体现了现代游戏引擎架构的可扩展性 和前瞻性 思维,为复杂的异步场景预留了丰富的控制语义。
性能分析与优化策略
内存分配模式分析
┌─────────────────────────────────────────────────────────────────────────────┐
│ 内存分配热点分析 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ 高频分配区域: │
│ ┌─────────────────┐ 频率: 每个QueueStep调用 │
│ │ FControlFlowNode │ ◄─── 使用MakeShared减少分配开销 │
│ └─────────────────┘ │
│ │
│ 中频分配区域: │
│ ┌─────────────────┐ 频率: 每个子流程/分支 │
│ │ FControlFlow │ ◄─── 对象池化可能的优化点 │
│ └─────────────────┘ │
│ │
│ 低频分配区域: │
│ ┌─────────────────┐ 频率: 流程管理器级别 │
│ │ FControlFlowStatics │ ◄─── 单例模式,生命周期管理 │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
执行性能优化技术
cpp
class FControlFlow
{
private :
static int32 UnnamedControlFlowCounter;
int32 UnnamedNodeCounter = 0 ;
mutable TOptional<FString> CachedFlowPath;
TPair<double , float > LastZeroSecondDelay;
public :
FString FormatOrGetNewNodeDebugName (const FString& FlowNodeDebugName = TEXT("" ))
{
if (FlowNodeDebugName.IsEmpty ())
{
return FString::Printf (TEXT ("UnnamedNode_%d" ), UnnamedNodeCounter++);
}
return FlowNodeDebugName;
}
FString GetFlowPath () const
{
if (!CachedFlowPath.IsSet ())
{
CachedFlowPath = ComputeFlowPath ();
}
return CachedFlowPath.GetValue ();
}
private :
FString ComputeFlowPath () const
{
TStringBuilder<256 > PathBuilder;
int32 DuplicateCount = 0 ;
for (int32 StackIndex = 0 ; StackIndex < SubFlowStack_ForDebugging.Num () - 1 ; ++StackIndex)
{
if (SubFlowStack_ForDebugging[StackIndex] == SubFlowStack_ForDebugging[StackIndex + 1 ])
{
++DuplicateCount;
}
else
{
PathBuilder.Appendf (TEXT ("%s(x%d)." ),
*SubFlowStack_ForDebugging[StackIndex]->DebugName,
DuplicateCount + 1 );
DuplicateCount = 0 ;
}
}
return PathBuilder.ToString ();
}
};
工程级应用案例:复杂系统的实战分析
案例1:分布式资源加载系统
cpp
class FAdvancedResourceLoader : public TSharedFromThis<FAdvancedResourceLoader>
{
private :
TSharedRef<FControlFlow> LoadingPipeline;
TMap<FString, FStreamableHandle> ActiveHandles;
FCriticalSection LoadingMutex;
std::atomic<int32> LoadedResourceCount{0 };
std::atomic<int32> TotalResourceCount{0 };
public :
FAdvancedResourceLoader ()
: LoadingPipeline (MakeShared <FControlFlow>("AdvancedResourceLoader" ))
{}
void StartComplexLoadingSequence (const TArray<FAssetLoadingGroup>& LoadingGroups)
{
LoadingPipeline
.QueueStep ("资源预分析" , this , &FAdvancedResourceLoader::AnalyzeResourceDependencies)
.QueueStep ("内存预分配" , this , &FAdvancedResourceLoader::PreallocateMemoryPools)
.ForkFlow ([this , LoadingGroups](TSharedRef<FConcurrentControlFlows> ConcurrentLoader)
{
ConcurrentLoader->SetExecution (EConcurrentExecution::Parallel);
for (int32 GroupIndex = 0 ; GroupIndex < LoadingGroups.Num (); ++GroupIndex)
{
ConcurrentLoader->AddOrGetFlow (GroupIndex,
FString::Printf (TEXT ("LoadingGroup_%d" ), GroupIndex))
.QueueStep ("组预处理" , this , &FAdvancedResourceLoader::PreprocessGroup, GroupIndex)
.Loop ([this , GroupIndex](TSharedRef<FConditionalLoop> ResourceLoop)
{
ResourceLoop->CheckConditionFirst ()
.QueueStep ("加载单个资源" , this , &FAdvancedResourceLoader::LoadSingleResource, GroupIndex)
.BranchFlow ([this , GroupIndex](TSharedRef<FControlFlowBranch> LoadBranch)
{
EResourceLoadResult Result = EvaluateLoadResult (GroupIndex);
LoadBranch->AddOrGetBranch (static_cast <int32>(EResourceLoadResult::Success))
.QueueStep ("成功后处理" , this , &FAdvancedResourceLoader::HandleLoadSuccess, GroupIndex);
LoadBranch->AddOrGetBranch (static_cast <int32>(EResourceLoadResult::Retry))
.QueueStep ("重试逻辑" , this , &FAdvancedResourceLoader::HandleLoadRetry, GroupIndex);
LoadBranch->AddOrGetBranch (static_cast <int32>(EResourceLoadResult::Failed))
.QueueStep ("失败处理" , this , &FAdvancedResourceLoader::HandleLoadFailure, GroupIndex);
return static_cast <int32>(Result);
});
return HasMoreResourcesToLoad (GroupIndex) ? EConditionalLoopResult::RunLoop
: EConditionalLoopResult::LoopFinished;
})
.QueueStep ("组后处理" , this , &FAdvancedResourceLoader::PostprocessGroup, GroupIndex);
}
})
.QueueStep ("资源验证" , this , &FAdvancedResourceLoader::ValidateAllResources)
.QueueStep ("缓存优化" , this , &FAdvancedResourceLoader::OptimizeResourceCache)
.QueueStep ("完成回调" , this , &FAdvancedResourceLoader::NotifyLoadingComplete)
.ExecuteFlow ();
}
private :
void AnalyzeResourceDependencies (FControlFlowNodeRef FlowHandle)
{
AsyncTask (ENamedThreads::AnyBackgroundThreadNormalTask, [this , FlowHandle]()
{
PerformDependencyAnalysis ();
AsyncTask (ENamedThreads::GameThread, [FlowHandle]()
{
FlowHandle->ContinueFlow ();
});
});
}
void PreallocateMemoryPools ()
{
size_t EstimatedMemoryNeed = CalculateMemoryRequirement ();
ResourceMemoryPool.Reserve (EstimatedMemoryNeed);
UE_LOG (LogResourceLoader, Log, TEXT ("预分配内存池: %zu bytes" ), EstimatedMemoryNeed);
}
void LoadSingleResource (FControlFlowNodeRef FlowHandle, int32 GroupIndex)
{
FAssetLoadingGroup& Group = LoadingGroups[GroupIndex];
if (Group.HasMoreResources ())
{
FStreamableManager& StreamableManager = UAssetManager::GetStreamableManager ();
TSharedPtr<FStreamableHandle> Handle = StreamableManager.RequestAsyncLoad (
Group.GetNextResourcePath (),
FStreamableDelegate::CreateLambda ([this , FlowHandle, GroupIndex]()
{
LoadedResourceCount.fetch_add (1 , std::memory_order_relaxed);
UpdateLoadingProgress ();
FlowHandle->ContinueFlow ();
})
);
{
FScopeLock Lock (&LoadingMutex) ;
ActiveHandles.Add (Group.GetCurrentResourceName (), Handle);
}
}
else
{
FlowHandle->ContinueFlow ();
}
}
};
案例2:多阶段AI决策系统
cpp
class FAdvancedAIController : public AAIController, public TSharedFromThis<FAdvancedAIController>
{
private :
TSharedRef<FControlFlow> DecisionPipeline;
TSharedRef<FControlFlow> PerceptionPipeline;
TSharedRef<FControlFlow> ActionPipeline;
FAIWorldState WorldState;
FAIDecisionContext DecisionContext;
TArray<FAIAction> PossibleActions;
public :
void ExecuteAdvancedBehavior ()
{
DecisionPipeline
.QueueStep ("感知更新" , this , &FAdvancedAIController::UpdatePerception)
.QueueStep ("世界状态建模" , this , &FAdvancedAIController::BuildWorldModel)
.BranchFlow ([this ](TSharedRef<FControlFlowBranch> ThreatBranch)
{
EThreatLevel ThreatLevel = EvaluateThreatLevel ();
ThreatBranch->AddOrGetBranch (static_cast <int32>(EThreatLevel::High))
.QueueStep ("紧急威胁响应" , this , &FAdvancedAIController::HandleHighThreat)
.QueueStep ("快速决策" , this , &FAdvancedAIController::MakeEmergencyDecision);
ThreatBranch->AddOrGetBranch (static_cast <int32>(EThreatLevel::Medium))
.QueueStep ("威胁评估" , this , &FAdvancedAIController::EvaluateMediumThreat)
.Loop ([this ](TSharedRef<FConditionalLoop> EvaluationLoop)
{
EvaluationLoop->CheckConditionFirst ()
.QueueStep ("收集额外信息" , this , &FAdvancedAIController::GatherMoreInformation)
.QueueStep ("重新评估" , this , &FAdvancedAIController::ReevaluateThreat);
return HasSufficientInformation () ? EConditionalLoopResult::LoopFinished
: EConditionalLoopResult::RunLoop;
})
.QueueStep ("制定策略" , this , &FAdvancedAIController::FormulateStrategy);
ThreatBranch->AddOrGetBranch (static_cast <int32>(EThreatLevel::Low))
.QueueStep ("常规行为规划" , this , &FAdvancedAIController::PlanRoutineBehavior)
.ForkFlow ([this ](TSharedRef<FConcurrentControlFlows> ConcurrentPlanning)
{
ConcurrentPlanning->SetExecution (EConcurrentExecution::Default);
ConcurrentPlanning->AddOrGetFlow (0 , "路径规划" )
.QueueStep ("计算最优路径" , this , &FAdvancedAIController::CalculateOptimalPath)
.QueueStep ("路径平滑" , this , &FAdvancedAIController::SmoothPath);
ConcurrentPlanning->AddOrGetFlow (1 , "资源管理" )
.QueueStep ("评估资源需求" , this , &FAdvancedAIController::EvaluateResourceNeeds)
.QueueStep ("优化资源分配" , this , &FAdvancedAIController::OptimizeResourceAllocation);
ConcurrentPlanning->AddOrGetFlow (2 , "社交互动" )
.QueueStep ("分析社交关系" , this , &FAdvancedAIController::AnalyzeSocialRelations)
.QueueStep ("规划互动行为" , this , &FAdvancedAIController::PlanSocialInteractions);
});
return static_cast <int32>(ThreatLevel);
})
.QueueStep ("行为执行" , this , &FAdvancedAIController::ExecutePlannedBehavior)
.QueueStep ("结果评估" , this , &FAdvancedAIController::EvaluateResults)
.ExecuteFlow ();
}
private :
void UpdatePerception (FControlFlowNodeRef FlowHandle)
{
PerceptionPipeline
.QueueStep ("视觉感知" , this , &FAdvancedAIController::ProcessVisualPerception)
.QueueStep ("听觉感知" , this , &FAdvancedAIController::ProcessAuditoryPerception)
.QueueStep ("触觉感知" , this , &FAdvancedAIController::ProcessTactilePerception)
.QueueStep ("感知融合" , this , &FAdvancedAIController::FusePerceptionData);
PerceptionPipeline.OnFlowComplete ().AddLambda ([FlowHandle]()
{
FlowHandle->ContinueFlow ();
});
PerceptionPipeline.ExecuteFlow ();
}
void BuildWorldModel ()
{
WorldState.UpdateFromPerception (GetPerceptionData ());
WorldState.PredictFutureStates (PredictionTimeHorizon);
WorldState.IdentifyKeyEntitiesAndRelationships ();
UE_LOG (LogAI, Verbose, TEXT ("世界模型更新完成,识别到 %d 个关键实体" ),
WorldState.GetKeyEntities ().Num ());
}
};
调试与性能监控:生产级工具链
说明:本章节更多是在说明“如果要把 ControlFlows 做成一套生产级工具链,通常会怎么补齐”,其中的 Debugger/MemoryTracker 代码以工程化示意 为主,并非插件当前默认自带功能。
高级调试系统
cpp
class FControlFlowDebugger
{
private :
struct FFlowExecutionTrace
{
FString FlowName;
FString NodeName;
double StartTime;
double EndTime;
EFlowNodeResult Result;
TArray<FString> CallStack;
};
TArray<FFlowExecutionTrace> ExecutionHistory;
TMap<FString, FFlowPerformanceMetrics> PerformanceMetrics;
FCriticalSection DebugMutex;
public :
void BeginFlowProfiling (const FString& FlowName)
{
TRACE_CPUPROFILER_EVENT_SCOPE_STR (*FString::Printf (TEXT ("ControlFlow_%s" ), *FlowName));
if (CVarControlFlowDebugEnabled.GetValueOnGameThread ())
{
FScopeLock Lock (&DebugMutex) ;
FFlowExecutionTrace& Trace = ExecutionHistory.AddDefaulted_GetRef ();
Trace.FlowName = FlowName;
Trace.StartTime = FPlatformTime::Seconds ();
CaptureCallStack (Trace.CallStack);
}
}
FString GenerateFlowVisualization (const FControlFlow& Flow)
{
TStringBuilder<1024 > GraphBuilder;
GraphBuilder.Append (TEXT ("digraph ControlFlow {\n" ));
GraphBuilder.Append (TEXT (" rankdir=TB;\n" ));
GraphBuilder.Append (TEXT (" node [shape=box];\n" ));
for (int32 NodeIndex = 0 ; NodeIndex < Flow.FlowQueue.Num (); ++NodeIndex)
{
const FControlFlowNode& Node = *Flow.FlowQueue[NodeIndex];
FString NodeColor = GetNodeColor (Node.GetType ());
GraphBuilder.Appendf (TEXT (" Node%d [label=\"%s\" fillcolor=\"%s\" style=filled];\n" ),
NodeIndex, *Node.GetNodeName (), *NodeColor);
if (NodeIndex > 0 )
{
GraphBuilder.Appendf (TEXT (" Node%d -> Node%d;\n" ), NodeIndex - 1 , NodeIndex);
}
}
GraphBuilder.Append (TEXT ("}\n" ));
return GraphBuilder.ToString ();
}
TArray<FFlowPerformanceHotspot> AnalyzePerformanceHotspots ()
{
TArray<FFlowPerformanceHotspot> Hotspots;
for (const auto & MetricPair : PerformanceMetrics)
{
const FFlowPerformanceMetrics& Metrics = MetricPair.Value;
if (Metrics.AverageExecutionTime > PerformanceThreshold)
{
FFlowPerformanceHotspot& Hotspot = Hotspots.AddDefaulted_GetRef ();
Hotspot.FlowName = MetricPair.Key;
Hotspot.AverageTime = Metrics.AverageExecutionTime;
Hotspot.MaxTime = Metrics.MaxExecutionTime;
Hotspot.ExecutionCount = Metrics.ExecutionCount;
Hotspot.Severity = CalculateHotspotSeverity (Metrics);
}
}
Hotspots.Sort ([](const FFlowPerformanceHotspot& A, const FFlowPerformanceHotspot& B)
{
return A.Severity > B.Severity;
});
return Hotspots;
}
};
内存泄漏检测与预防
cpp
class FControlFlowMemoryTracker
{
private :
struct FAllocationInfo
{
void * Address;
size_t Size;
FString FlowName;
FString NodeName;
double AllocationTime;
TArray<FString> CallStack;
};
TMap<void *, FAllocationInfo> ActiveAllocations;
std::atomic<size_t > TotalAllocatedBytes{0 };
std::atomic<int32> AllocationCount{0 };
public :
void TrackAllocation (void * Address, size_t Size, const FString& Context)
{
if (CVarControlFlowMemoryTrackingEnabled.GetValueOnGameThread ())
{
FAllocationInfo Info;
Info.Address = Address;
Info.Size = Size;
Info.AllocationTime = FPlatformTime::Seconds ();
if (FControlFlow* CurrentFlow = GetCurrentExecutingFlow ())
{
Info.FlowName = CurrentFlow->GetDebugName ();
if (auto CurrentNode = CurrentFlow->GetCurrentNode ())
{
Info.NodeName = CurrentNode->GetNodeName ();
}
}
{
FScopeLock Lock (&AllocationMutex) ;
ActiveAllocations.Add (Address, MoveTemp (Info));
}
TotalAllocatedBytes.fetch_add (Size, std::memory_order_relaxed);
AllocationCount.fetch_add (1 , std::memory_order_relaxed);
}
}
void TrackDeallocation (void * Address)
{
if (CVarControlFlowMemoryTrackingEnabled.GetValueOnGameThread ())
{
FScopeLock Lock (&AllocationMutex) ;
if (FAllocationInfo* Info = ActiveAllocations.Find (Address))
{
TotalAllocatedBytes.fetch_sub (Info->Size, std::memory_order_relaxed);
AllocationCount.fetch_sub (1 , std::memory_order_relaxed);
ActiveAllocations.Remove (Address);
}
}
}
FString GenerateLeakReport ()
{
FScopeLock Lock (&AllocationMutex) ;
if (ActiveAllocations.Num () == 0 )
{
return TEXT ("No memory leaks detected." );
}
TStringBuilder<2048 > ReportBuilder;
ReportBuilder.Appendf (TEXT ("Memory Leak Report - %d active allocations, %zu bytes\n\n" ),
ActiveAllocations.Num (), TotalAllocatedBytes.load ());
TArray<FAllocationInfo> SortedAllocations;
ActiveAllocations.GenerateValueArray (SortedAllocations);
SortedAllocations.Sort ([](const FAllocationInfo& A, const FAllocationInfo& B)
{
return A.AllocationTime < B.AllocationTime;
});
for (const FAllocationInfo& Info : SortedAllocations)
{
double LeakAge = FPlatformTime::Seconds () - Info.AllocationTime;
ReportBuilder.Appendf (TEXT ("Leak: %p (%zu bytes) in %s::%s (age: %.2fs)\n" ),
Info.Address, Info.Size, *Info.FlowName, *Info.NodeName, LeakAge);
}
return ReportBuilder.ToString ();
}
};
编译器优化与代码生成分析
模板实例化的编译时开销分析
cpp
template <typename BindingObjectClassT, typename ...PayloadParamsT>
struct TControlFlowTypeDeduction
{
template <typename T>
static auto TestAsyncSignature (int ) -> decltype (
std::declval<T>().*(static_cast <void (T::*)(FControlFlowNodeRef, PayloadParamsT...)>(nullptr )),
std::true_type{}
) ;
template <typename T>
static std::false_type TestAsyncSignature (...) ;
template <typename T>
static auto TestSyncSignature (int ) -> decltype (
std::declval<T>().*(static_cast <void (T::*)(PayloadParamsT...)>(nullptr )),
std::true_type{}
) ;
template <typename T>
static std::false_type TestSyncSignature (...) ;
static constexpr bool IsAsyncFunction = decltype (TestAsyncSignature <BindingObjectClassT>(0 ))::value;
static constexpr bool IsSyncFunction = decltype (TestSyncSignature <BindingObjectClassT>(0 ))::value;
static_assert (IsAsyncFunction || IsSyncFunction,
"Function signature must match one of the supported patterns" );
static_assert (!(IsAsyncFunction && IsSyncFunction),
"Ambiguous function signature detected" );
};
template <typename BindingObjectClassT, typename ...PayloadParamsT>
FORCEINLINE void DispatchQueueStep (
const FString& InDebugName,
BindingObjectClassT* InBindingObject,
auto InFunction,
PayloadParamsT...Params)
{
using DeductionHelper = TControlFlowTypeDeduction<BindingObjectClassT, PayloadParamsT...>;
if constexpr (DeductionHelper::IsAsyncFunction)
{
QueueWait (InDebugName).BindUObject (InBindingObject, InFunction, Params...);
}
else if constexpr (DeductionHelper::IsSyncFunction)
{
QueueFunction (InDebugName).BindUObject (InBindingObject, InFunction, Params...);
}
}
运行时性能特征分析
┌─────────────────────────────────────────────────────────────────────────────┐
│ 性能特征:测量方法与结果范围(建议) │
├─────────────────────────────────────────────────────────────────────────────┤
│ 如何测: │
│ - 在关键节点/任务处加 Trace/ Stat(例如 TRACE_CPUPROFILER_EVENT_SCOPE) │
│ - 用 Unreal Insights 做对比:DebugGame / Development / Shipping + LTO / PGO 等 │
│ - 分别测:节点创建(MakeShared) 、 委托调用、 队列推进、 并发调度(ParallelFor) │
│ │
│ 影响因素(同一套代码差异可达数倍): │
│ - 平台/CPU/ 缓存、 编译配置、 内存分配器、 日志级别、 调试名/ 字符串路径是否开启 │
│ - 并发策略(Default /Random/ Parallel)与子流程规模 / 粒度 │
│ │
│ 结果范围(数量级参考,不是硬指标): │
│ - 节点创建与销毁:通常为几十到数百 ns/ 次(取决于 allocator 与调试信息) │
│ - 委托绑定与调用:通常为几十 ns 到 ~ 0.1 µs/ 次(取决于绑定方式与捕获大小) │
│ - 流程推进(一次 ContinueFlow → 选择下一个节点):通常为几十 ns 到数百 ns │
│ - 并发扩展:ParallelFor 可能带来加速,但受任务粒度与共享数据竞争限制 │
└─────────────────────────────────────────────────────────────────────────────┘
高级优化技术与最佳实践
说明:本章节讨论的是“围绕 ControlFlows 这种模型可能采取的优化方向”,部分内容属于扩展设想/通用工程手法 (例如对象池、SIMD、异步 I/O 管线),并不等价于插件当前实现。
零拷贝优化策略
cpp
class FControlFlowOptimized : public FControlFlow
{
private :
static constexpr size_t SmallNodeSize = 64 ;
alignas (std::max_align_t ) char SmallNodeBuffer[SmallNodeSize];
bool bUsingSmallNodeOptimization = false ;
TArray<TUniquePtr<FControlFlowNode>> NodePool;
TArray<int32> FreeNodeIndices;
public :
template <typename NodeType, typename ...ArgsT>
TSharedRef<NodeType> CreateOptimizedNode (ArgsT&&...Args)
{
static_assert (std::is_base_of_v<FControlFlowNode, NodeType>,
"NodeType must derive from FControlFlowNode" );
if constexpr (sizeof (NodeType) <= SmallNodeSize &&
std::is_trivially_destructible_v<NodeType>)
{
if (!bUsingSmallNodeOptimization)
{
NodeType* Node = new (SmallNodeBuffer) NodeType (Forward <ArgsT>(Args)...);
bUsingSmallNodeOptimization = true ;
return MakeShareable (Node, [this ](NodeType* Ptr)
{
Ptr->~NodeType ();
bUsingSmallNodeOptimization = false ;
});
}
}
if (FreeNodeIndices.Num () > 0 )
{
int32 Index = FreeNodeIndices.Pop ();
NodeType* Node = static_cast <NodeType*>(NodePool[Index].Release ());
new (Node) NodeType (Forward <ArgsT>(Args)...);
return MakeShareable (Node, [this , Index](NodeType* Ptr)
{
Ptr->~NodeType ();
NodePool[Index] = TUniquePtr <FControlFlowNode>(Ptr);
FreeNodeIndices.Add (Index);
});
}
return MakeShared <NodeType>(Forward <ArgsT>(Args)...);
}
template <typename FunctionT>
FControlFlow& QueueBatch (const TArray<FunctionT>& Functions)
{
FlowQueue.Reserve (FlowQueue.Num () + Functions.Num ());
for (const auto & Function : Functions)
{
auto Node = CreateOptimizedNode <FControlFlowNode_SelfCompleting>(
SharedThis (this ),
FormatOrGetNewNodeDebugName (),
Function
);
FlowQueue.Add (Node);
}
return *this ;
}
};
内存局部性优化
cpp
struct alignas (64 ) FControlFlowNodeCacheFriendly
{
TWeakPtr<FControlFlow> Parent;
FSimpleDelegate Process;
uint32 NodeFlags;
uint32 Reserved;
FString NodeName;
TSharedPtr<FTrackedActivity> Activity;
struct
{
uint32 bCancelled : 1 ;
uint32 bWasBoundOnExecution : 1 ;
uint32 bProfilerEventStarted : 1 ;
uint32 NodeType : 4 ;
uint32 Priority : 8 ;
uint32 Reserved : 17 ;
} Flags;
};
class FControlFlowSIMDOptimizer
{
public :
static bool BatchCheckNodeCompletion (const TArray<TSharedRef<FControlFlowNode>>& Nodes)
{
const int32 NumNodes = Nodes.Num ();
const int32 SIMDWidth = 8 ;
int32 ProcessedCount = 0 ;
for (; ProcessedCount + SIMDWidth <= NumNodes; ProcessedCount += SIMDWidth)
{
__m256i NodeStates = _mm256_setzero_si256();
for (int32 i = 0 ; i < SIMDWidth; ++i)
{
uint32 NodeState = Nodes[ProcessedCount + i]->GetPackedState ();
NodeStates = _mm256_insert_epi32(NodeStates, NodeState, i);
}
__m256i CompletionMask = _mm256_set1_epi32(0x1 );
__m256i CompletedNodes = _mm256_and_si256(NodeStates, CompletionMask);
if (_mm256_testz_si256(CompletedNodes, CompletedNodes) == 0 )
{
return false ;
}
}
for (; ProcessedCount < NumNodes; ++ProcessedCount)
{
if (!Nodes[ProcessedCount]->IsCompleted ())
{
return false ;
}
}
return true ;
}
};
异步I/O与流水线优化
cpp
class FControlFlowAsyncIOOptimizer
{
private :
class FAsyncIOCompletionPort
{
private :
HANDLE CompletionPort;
TArray<OVERLAPPED_ENTRY> CompletionEntries;
static constexpr int32 MaxConcurrentOperations = 64 ;
public :
void ProcessCompletions (FControlFlow& Flow)
{
ULONG NumEntriesRemoved = 0 ;
CompletionEntries.SetNum (MaxConcurrentOperations);
BOOL Result = GetQueuedCompletionStatusEx (
CompletionPort,
CompletionEntries.GetData (),
MaxConcurrentOperations,
&NumEntriesRemoved,
0 ,
FALSE
);
if (Result && NumEntriesRemoved > 0 )
{
for (ULONG i = 0 ; i < NumEntriesRemoved; ++i)
{
FAsyncIOOperation* Operation = static_cast <FAsyncIOOperation*>(
CompletionEntries[i].lpCompletionKey);
if (Operation && Operation->FlowNode.IsValid ())
{
Operation->FlowNode.Pin ()->ContinueFlow ();
}
}
}
}
};
struct FResourceLoadingPipeline
{
enum class EStage : uint8
{
FileRead,
Decompression,
Parsing,
GPUUpload,
Finalization
};
struct FPipelineStage
{
EStage Stage;
TFunction<void (FResourceLoadingContext&)> Processor;
TArray<FResourceLoadingContext> PendingContexts;
std::atomic<int32> ActiveOperations{0 };
};
TArray<FPipelineStage> Stages;
void ProcessPipeline ()
{
ParallelFor (Stages.Num (), [this ](int32 StageIndex)
{
FPipelineStage& Stage = Stages[StageIndex];
while (Stage.PendingContexts.Num () > 0 &&
Stage.ActiveOperations.load () < MaxConcurrentOperationsPerStage)
{
FResourceLoadingContext Context = Stage.PendingContexts.Pop ();
Stage.ActiveOperations.fetch_add (1 );
AsyncTask (ENamedThreads::AnyBackgroundThreadNormalTask,
[this , StageIndex, Context = MoveTemp (Context)]() mutable
{
Stages[StageIndex].Processor (Context);
if (StageIndex + 1 < Stages.Num ())
{
Stages[StageIndex + 1 ].PendingContexts.Add (MoveTemp (Context));
}
else
{
Context.CompletionCallback.ExecuteIfBound ();
}
Stages[StageIndex].ActiveOperations.fetch_sub (1 );
});
}
});
}
};
public :
void LoadFileAsync (const FString& FilePath,
FControlFlowNodeRef FlowNode,
TFunction<void (TArray<uint8>&&)> CompletionCallback)
{
FAsyncFileHandle FileHandle = FPlatformFileManager::Get ().GetPlatformFile ().OpenAsyncRead (*FilePath);
if (FileHandle)
{
int64 FileSize = FileHandle->Size ();
TArray<uint8> Buffer;
Buffer.SetNumUninitialized (FileSize);
FAsyncFileCallBack ReadCallback = [FlowNode, CompletionCallback = MoveTemp (CompletionCallback),
Buffer = MoveTemp (Buffer)](bool bWasSuccessful, IAsyncReadRequest*) mutable
{
if (bWasSuccessful)
{
CompletionCallback (MoveTemp (Buffer));
}
FlowNode->ContinueFlow ();
};
FileHandle->ReadRequest (0 , FileSize, AIOP_Normal, &ReadCallback, Buffer.GetData ());
}
else
{
FlowNode->ContinueFlow ();
}
}
};
总结:现代C++异步编程的工程典范
ControlFlows插件代表了现代C++在游戏引擎中异步编程的最高水准实现。它不仅仅是一个工具,更是一个展现了以下核心设计原则的工程典范:
🎯 技术创新点
类型安全的编译时推断 :通过SFINAE和模板元编程实现零运行时开销的类型推断
内存安全的生命周期管理 :智能指针系统确保复杂异步场景下的内存安全
可扩展的并发模型 :从单线程到真并行的渐进式并发支持
声明式的流程描述 :函数式编程范式与面向对象设计的完美融合
🚀 性能优化策略
编译时优化 :模板重载/SFINAE 让“签名→节点类型”在编译期分发
分配与拷贝控制 :大量节点通过 MakeShared 创建,调试字符串尽量延迟/缓存
并发调度 :并发子流程在 Parallel 策略下由 ParallelFor 驱动执行
可选的工程化优化空间 :对象池化/SIMD/更深的 I/O 管线属于潜在改进方向(非插件默认实现)
💡 工程实践价值
可维护性 :清晰的代码结构降低复杂异步逻辑的维护成本
可测试性 :模块化设计便于单元测试和集成测试
可扩展性 :开放的架构支持自定义节点类型和执行策略
可调试性 :丰富的调试信息和性能监控工具
🔬 理论意义
ControlFlows的设计体现了计算机科学中多个重要概念的实际应用:
状态机理论 :流程执行的状态转换模型
并发理论 :多种并发语义的统一抽象
类型理论 :类型安全的函数签名推断系统
编译器理论 :模板元编程和编译时优化
这个插件不仅解决了游戏开发中的实际问题,更为现代C++在复杂系统中的应用提供了宝贵的参考范例。它证明了通过精心的设计和实现,可以在保持高性能的同时,显著提升代码的可读性和可维护性。
对于希望深入理解现代C++异步编程、游戏引擎架构或系统级性能优化的开发者而言,ControlFlows插件的源码分析将是一次极其宝贵的学习之旅。
本文基于Unreal Engine 5.5版本的ControlFlows插件源码进行深度分析,涵盖了从基础概念到高级优化的完整技术栈。随着引擎版本更新,部分实现细节可能会有所变化,建议读者结合最新源码进行学习。
QueueStep QueueWait QueueControlFlow QueueControlFlowBranch QueueConcurrentFlows QueueConditionalLoop
cpp
FControlFlow& Flow = FControlFlowStatics::Create (this , TEXT ("TestQueueControlFlowBranch" ))
.QueueStep (TEXT ("ConstructNode" ), this , &ThisClass::Construct)
.BranchFlow ([this ](TSharedRef<FControlFlowBranch> Branch)
{
Branch->AddOrGetBranch (1 ).QueueStep (TEXT ("QueueControlFlowBranch1" ), this , &ThisClass::QueueControlFlowBranch1, 1 , FString ("TestQueueControlFlowBranch1" ));
Branch->AddOrGetBranch (2 ).QueueStep (TEXT ("QueueControlFlowBranch2" ), this , &ThisClass::QueueControlFlowBranch2, 2 , FString ("TestQueueControlFlowBranch2" ));
return FMath::RandBool () ? 1 : 2 ;
})
.QueueStep (TEXT ("DestructNode" ), this , &ThisClass::Destruct);
Flow.ExecuteFlow ();
cpp
FControlFlow& Flow = FControlFlowStatics::Create (this , TEXT ("TestQueueControlFlowBranch" ))
.QueueStep (TEXT ("ConstructNode" ), this , &ThisClass::Construct)
.ForkFlow ([this ](TSharedRef<FConcurrentControlFlows> ConcurrentFlows)
{
ConcurrentFlows->AddOrGetFlow (0 ).QueueStep (TEXT ("QueueConcurrentFlows1" ), this , &ThisClass::QueueConcurrentFlows1, 1 , FString ("QueueConcurrentFlows1" ));
ConcurrentFlows->AddOrGetFlow (1 ).QueueStep (TEXT ("QueueConcurrentFlows2" ), this , &ThisClass::QueueConcurrentFlows2, 2 , FString ("QueueConcurrentFlows2" ));
})
.QueueStep (TEXT ("DestructNode" ), this , &ThisClass::Destruct);
Flow.ExecuteFlow ();
cpp
FControlFlow& Flow = FControlFlowStatics::Create (this , TEXT ("TestQueueConditionalLoop" ))
.QueueStep (TEXT ("ConstructNode" ), this , &ThisClass::Construct)
.Loop ([this ](TSharedRef<FConditionalLoop> InnerLoop)
{
UE_LOG (LogTemp, Display, TEXT ("ATestControlFlows::QueueConditionalLoop Loop1" ));
InnerLoop->RunLoopFirst ().QueueStep (TEXT ("QueueConditionalLoop" ), this , &ThisClass::QueueConditionalLoop, 1 , FString ("QueueConditionalLoop" ));
return LoopCounter <= 10 ? EConditionalLoopResult::RunLoop : EConditionalLoopResult::LoopFinished;
})
.Loop ([this ](TSharedRef<FConditionalLoop> InnerLoop)
{
UE_LOG (LogTemp, Display, TEXT ("ATestControlFlows::QueueConditionalLoop Loop2" ));
InnerLoop->CheckConditionFirst ().QueueStep (TEXT ("QueueConditionalLoop" ), this , &ThisClass::QueueConditionalLoop, 1 , FString ("QueueConditionalLoop" ));
return LoopCounter <= 10 ? EConditionalLoopResult::RunLoop : EConditionalLoopResult::LoopFinished;
})
.QueueStep (TEXT ("DestructNode" ), this , &ThisClass::Destruct);
Flow.ExecuteFlow ();