管中窥豹从Event看WinNT内核线程唤醒机制
最近在实现Coroutine,在做调度的时候发现需要搓一个协程调度和协程间同步的轮子,一番研究下来基本就是要重新发明一遍内核的线程调度和同步原语了。于是花了一些时间研究了Windows NT内核是如何实现的(为什么不看Linux?因为我觉得NT内核设计比Linux不知道高到哪里去了)。
这里以WaitForMultipleObjects等待Event为例记录下整个流程(代码取自Windows Research Kernel)。
这个流程的核心在wait.c的KeWaitForMultipleObjects之中。
NTSTATUS
KeWaitForMultipleObjects (
__in ULONG Count,
__in_ecount(Count) PVOID Object[],
__in WAIT_TYPE WaitType,
__in KWAIT_REASON WaitReason,
__in KPROCESSOR_MODE WaitMode,
__in BOOLEAN Alertable,
__in_opt PLARGE_INTEGER Timeout,
__out_opt PKWAIT_BLOCK WaitBlockArray
)从Object[]传入的是所有被等待的对象,这些对象有个共同特点,就是有个DISPATCHER_HEADER。WaitType指定等待类型,比如等待所有对象被唤醒才激活线程(WaitAll)或者等待某个对象唤醒就激活线程(WaitAny),其中WaitAny又是WaitForSingleObject的等待方式。
观察DISPATCHER_HEADER的定义如下:
typedef struct _DISPATCHER_HEADER {
union {
struct {
UCHAR Type;
union {
UCHAR Absolute;
UCHAR NpxIrql;
};
union {
UCHAR Size;
UCHAR Hand;
};
union {
UCHAR Inserted;
BOOLEAN DebugActive;
};
};
volatile LONG Lock;
};
LONG SignalState;
LIST_ENTRY WaitListHead;
} DISPATCHER_HEADER;比较重要的,一个是SignalState,指定信号量或者事件的信号状态,小于等于0时线程需要等待。另一个WaitListHead,本质上是个双向循环链表,除了这个WaitListHead是挂在DISPATCHER_HEADER上的,其余的表项都被挂在WaitBlock这个结构上。
这里出现的WaitBlock是另外一个比较重要的结构,其定义如下:
typedef struct _KWAIT_BLOCK {
LIST_ENTRY WaitListEntry;
struct _KTHREAD *Thread;
PVOID Object;
struct _KWAIT_BLOCK *NextWaitBlock;
USHORT WaitKey;
UCHAR WaitType;
UCHAR SpareByte;
#if defined(_AMD64_)
LONG SpareLong;
#endif
} KWAIT_BLOCK, *PKWAIT_BLOCK, *PRKWAIT_BLOCK;其中,WaitListEntry被串在了DISPATCHER_HEADER上;Thread也就是等待某个同步原语的线程;Object是被等待的对象;NextWaitBlock构成一个单向链表,用于线程记录自己等待的所有对象;WaitType指示等待类型,即WaitAny或者WaitAll;WaitKey被同步原语使用,当激活线程时传递WaitKey给Thread,可以表明是哪个同步对象唤醒的线程(带上在Objects中的索引)。
综上,可以得知整个等待过程中的数据结构如下:

可以看出为了实现等待多个对象,每个线程都记下了自己等待了哪些对象。
在厘清基本的数据结构后,我们来看具体的实现过程。
- WAIT_BLOCK初始化
if (ARGUMENT_PRESENT(WaitBlockArray)) {
if (Count > MAXIMUM_WAIT_OBJECTS) {
KeBugCheck(MAXIMUM_WAIT_OBJECTS_EXCEEDED);
}
} else {
if (Count > THREAD_WAIT_OBJECTS) {
KeBugCheck(MAXIMUM_WAIT_OBJECTS_EXCEEDED);
}
WaitBlockArray = &Thread->WaitBlock[0];
}
ASSERT(Count != 0);
//
// If the dispatcher database is already held, then initialize the thread
// local variables. Otherwise, raise IRQL to DPC level, initialize the
// thread local variables, and lock the dispatcher database.
//
if (ReadForWriteAccess(&Thread->WaitNext) == FALSE) {
goto WaitStart;
}
Thread->WaitNext = FALSE;
InitializeWaitMultiple();可以看到WAIT_BLOCK的内存管理由Thread负责。
在等待不超过THREAD_WAIT_OBJECTS(值为3)的时候方法会直接复用KTHREAD上面预分配的WaitBlock,否则需要外部预分配数组。
而后控制线程调度,并初始化这些WAIT_BLOCK。
- 等待循环
限于篇幅,下述代码只保留了最核心的部分,去掉了Irql、定时器等一些情况的处理。
do {
Thread->Preempted = FALSE;
Index = 0;
if (WaitType == WaitAny) {
do {
Objectx = (PKMUTANT)Object[Index];
ASSERT(Objectx->Header.Type != QueueObject);
if (Objectx->Header.Type == MutantObject) {
if ((Objectx->Header.SignalState > 0) ||
(Thread == Objectx->OwnerThread)) {
if (Objectx->Header.SignalState != MINLONG) {
KiWaitSatisfyMutant(Objectx, Thread);
WaitStatus = (NTSTATUS)(Index | Thread->WaitStatus);
goto NoWait;
} else {
KiUnlockDispatcherDatabase(Thread->WaitIrql);
ExRaiseStatus(STATUS_MUTANT_LIMIT_EXCEEDED);
}
}
} else if (Objectx->Header.SignalState > 0) {
KiWaitSatisfyOther(Objectx);
WaitStatus = (NTSTATUS)(Index);
goto NoWait;
}
Index += 1;
} while(Index < Count);
} else {
do {
Objectx = (PKMUTANT)Object[Index];
if (Objectx->Header.Type == MutantObject) {
if ((Thread == Objectx->OwnerThread) &&
(Objectx->Header.SignalState == MINLONG)) {
KiUnlockDispatcherDatabase(Thread->WaitIrql);
ExRaiseStatus(STATUS_MUTANT_LIMIT_EXCEEDED);
} else if ((Objectx->Header.SignalState <= 0) &&
(Thread != Objectx->OwnerThread)) {
break;
}
} else if (Objectx->Header.SignalState <= 0) {
break;
}
Index += 1;
} while(Index < Count);
if (Index == Count) {
WaitBlock = &WaitBlockArray[0];
do {
Objectx = (PKMUTANT)WaitBlock->Object;
KiWaitSatisfyAny(Objectx, Thread);
WaitBlock = WaitBlock->NextWaitBlock;
} while (WaitBlock != &WaitBlockArray[0]);
WaitStatus = (NTSTATUS)Thread->WaitStatus;
goto NoWait;
}
}
WaitBlock = &WaitBlockArray[0];
do {
Objectx = (PKMUTANT)WaitBlock->Object;
InsertTailList(&Objectx->Header.WaitListHead, &WaitBlock->WaitListEntry);
WaitBlock = WaitBlock->NextWaitBlock;
} while (WaitBlock != &WaitBlockArray[0]);
CurrentPrcb = KeGetCurrentPrcb();
Thread->State = Waiting;
if (StackSwappable != FALSE) {
InsertTailList(&CurrentPrcb->WaitListHead, &Thread->WaitListEntry);
}
KiSetContextSwapBusy(Thread);
WaitStatus = (NTSTATUS)KiSwapThread(Thread, CurrentPrcb);
if (WaitStatus != STATUS_KERNEL_APC) {
return WaitStatus;
}
WaitStart:
InitializeWaitMultiple();
} while (TRUE);可以看到整个等待过程是个非常大的while (TRUE)循环,这个循环在每次执行时都会检查自己等待的同步原语是否满足等待条件。针对WaitAny的情况,当发现某一个同步对象有信号时就会直接结束,而针对WaitAll的情况,就需要所有对象都就绪。当线程满足等待条件时,会执行相应的Satisfy函数调整相应的信号量。即,当对象为事件(自动重置)时设置信号为0,阻止其他线程获取这个事件;当对象为信号量时,执行信号量的P操作。
如果线程不满足条件,则会准备好超时计时器,调度到等待状态,直到被唤醒。
注意到唤醒时的判断条件,STATUS_KERNEL_APC是一个特殊的条件,此时线程会重新检查自己等待的对象是否满足条件,否则会直接返回等待结果,结束等待。
因为如果不考虑等待多个对象,单个对象就绪就可以通知线程等待的结果,而需要等待多个对象时,单一对象的就绪并不能代表最终结果,这就要求线程重新检查自己等待的每个对象是否满足要求。此时通过STATUS_KERNEL_APC这个状态码就起到这样一个通知的作用。
- Event唤醒过程
以线程等待Event,并执行了SetEvent后为例。
这一部分的流程在eventobj.c中,核心方法为KeSetEvent。
OldState = ReadForWriteAccess(&Event->Header.SignalState);
Event->Header.SignalState = 1;
if ((OldState == 0) &&
(IsListEmpty(&Event->Header.WaitListHead) == FALSE)) {
if (Event->Header.Type == EventNotificationObject) {
KiWaitTestWithoutSideEffects(Event, Increment);
} else {
KiWaitTestSynchronizationObject(Event, Increment);
}
}可以看到针对非AutoReset类型,会执行KiWaitTestWithoutSideEffects方法,否则会走KiWaitTestSynchronizationObject。
KiWaitTestWithoutSideEffects唤醒过程如下:
WaitEntry = ListHead->Flink;
do {
//
// Get the address of the wait block and the thread doing the wait.
//
WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
Thread = WaitBlock->Thread;
//
// If the wait type is wait any, then unwait the thread with the
// wait key status. Otherwise, unwait the thread with a kernel APC
// status.
//
if (WaitBlock->WaitType == WaitAny) {
KiUnwaitThread(Thread, (NTSTATUS)WaitBlock->WaitKey, Increment);
} else {
KiUnwaitThread(Thread, STATUS_KERNEL_APC, Increment);
}
WaitEntry = ListHead->Flink;
} while (WaitEntry != ListHead);而KiWaitTestSynchronizationObject唤醒过程如下:
WaitEntry = ListHead->Flink;
do {
//
// Get the address of the wait block and the thread doing the wait.
//
WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
Thread = WaitBlock->Thread;
//
// If the wait type is wait any, then satisfy the wait, unwait the
// thread with the wait key status, and exit loop. Otherwise, unwait
// the thread with a kernel APC status and continue the loop.
//
if (WaitBlock->WaitType == WaitAny) {
Event->Header.SignalState = 0;
KiUnwaitThread(Thread, (NTSTATUS)WaitBlock->WaitKey, Increment);
break;
}
KiUnwaitThread(Thread, STATUS_KERNEL_APC, Increment);
WaitEntry = ListHead->Flink;
} while (WaitEntry != ListHead);两者的差异很显著,针对于非AutoReset的Event,Set操作相当于设置了信号并会通知所有在Event上等待的线程。其中,对于WaitAny的线程,会直接设置返回值,等待的线程将会在收到返回值后直接结束;而对于WaitAll的线程,则会插入APC,等待的线程将会在收到这个状态码后检查是否满足等待要求,被用于上文的WaitForMultipleObjects。
而针对AutoReset的Event,则会在找到第一个WaitAny的线程后立即结束循环,这是因为这个线程必然会拿到这个Event的信号,不需要再激活其他等待的线程(AutoReset的Event会激活一个线程并重置信号)。但是如果线程是等待多个对象的,就不能直接返回,因为这个线程可能不满足其他条件。这样就必须激活所有在这个Event上等待的WaitAll的线程,直到找到像WaitAny这种确保会唤醒的线程。最后,线程在被激活后会去竞争这个Event上的SignalState,这样就保证了只会唤醒一个线程,并且能自动Reset这个Event。
KiUnwaitThread的过程在此不予赘述。
综上,我们可以发现NT内核在处理WaitForSingleObject和WaitForMultipleObjects时的不同表现:
WaitForSingleObject在拿到Event后会直接结束循环返回WaitStatus。WaitForMultipleObjects在拿到Event后不会直接退出,而是去检查是否满足所有被等待对象的条件。
这样就会产生一个很有趣的结果,即WaitForMultipleObjects在WaitAll模式下并不能等价于WaitForSingleObject。
下述的测试很好说明了这个结果:

在线程A上SetEvent后立即ResetEvent,在线程B上等待这个Event,并使用WaitForMultipleObjects,可以看到尽管SetEvent了,但是线程B并不一定会被唤醒,因为Event虽然通知这个线程唤醒,但是状态立即被Reset了。只有当SetEvent后立即调度到线程B,线程B才能被唤醒。
而在WaitForSingleObject时,线程总是能被正常唤醒。

算是Event的一个很有意思的坑了。
Reference:
- 文中的测试用例由小伙提供
- 漫谈兼容内核之十五:Windows线程的等待/唤醒机制