Skip to content
Published at:

第7章:线程同步(进程内)

同步基础

同步的经典目标是防止数据竞争(Data Race)。当两个或更多线程访问同一内存位置,且至少有一个线程执行写操作时,就发生了数据竞争。如果所有线程只对共享内存执行读操作,那么不需要任何同步即可安全地并发执行。但一旦有线程进行写入,其他线程可能读到部分修改前或修改后的数据,导致数据损坏。

回顾第 5 章中计算素数的例子,那种 "fork/join" 模式的算法除了需要等待所有线程完成外,不需要任何同步。然而,同步天生会降低性能,因为某些操作必须串行化执行。阿姆达尔定律(Amdahl's Law) 描述了通过增加线程/CPU 所能获得的理论加速上限:

加速比限制 = 1 / (1 - p)

其中 p 是可以并行化的代码比例。例如,如果 80% 的代码可以并行化,则最大加速比为 5 倍。

大多数同步机制需要线程等待,直到可以安全地继续执行,从而防止数据竞争的发生。

原子操作(Atomic Operations)

即使是 C 语言中看似简单的变量自增操作 x++,在多线程/多处理器环境下也不是线程安全的。下图(图 7-1)展示了两个线程在两个处理器上同时将初始值 0 读入寄存器、各自递增、然后写回的场景——最终结果可能是 1 而不是期望的 2。

简单递增应用程序

Simple Increment 应用程序使用多个线程来递增同一个内存位置。用户可以选择线程数量和每线程的迭代次数。运行后,程序会显示实际结果、期望结果和执行时间。

界面中的"同步"组合框默认为"无"——即直接使用 ++ 操作符,不加任何同步。在这种默认设置下运行,结果远低于期望值,且每次执行的结果各不相同。

未加同步的代码(取自 MainDlg.cpp):

cpp
void CMainDlg::DoSimpleCount() {
    auto handles = std::make_unique<HANDLE[]>(m_Threads);
    for (int i = 0; i < m_Threads; i++) {
        handles[i] = ::CreateThread(nullptr, 0, [](auto param) {
            return ((CMainDlg*)param)->IncSimpleThread();
        }, this, 0, nullptr);
    }

    ::WaitForMultipleObjects(m_Threads, handles.get(), TRUE, INFINITE);
    for (int i = 0; i < m_Threads; i++)
        ::CloseHandle(handles[i]);
}

DWORD CMainDlg::IncSimpleThread() {
    for (int i = 0; i < m_Loops; i++)
        m_Count++;

    return 0;
}

代码通过 CreateThreadPVOID 参数传递 this 指针,使线程函数可以调用实例方法。由于 API 只接受函数指针,必须使用非捕获的 lambda 表达式。这里 m_Threads 是线程数,m_Loops 是每线程的迭代次数,m_Count 是被共享递增的内存位置。这是一个刻意设计、执行数百万次递增以暴露竞争问题的示例;在实际应用中,这种竞争发生的频率要低得多,可能在开发阶段被忽视,直到部署后才会暴露。

Interlocked 函数家族

解决方案是让递增操作变为**原子(Atomic)**的,将其与其他线程的访问隔离开来。Windows 提供了 Interlocked 函数家族:

cpp
unsigned InterlockedIncrement(unsigned volatile *Addend);

该函数"执行原子递增操作"并返回新值。它在底层是编译器内建函数(intrinsic),发出特殊的 CPU 指令——基于硬件实现,因此比软件锁更快。由于没有"锁"对象,也就没有死锁风险。

在 Simple Increment 应用程序中,从组合框选择 Interlocked 后会使用该函数:

cpp
DWORD CMainDlg::IncInterlockedThread() {
    for (int i = 0; i < m_Loops; i++)
        ::InterlockedIncrement((unsigned*)&m_Count);
    return 0;
}

使用此选项运行后,结果与期望值匹配。

其他相关函数包括:

  • InterlockedDecrement — 原子递减
  • InterlockedAdd — 原子加法
  • InterlockedExchange — 原子交换
  • InterlockedAnd / InterlockedOr / InterlockedXor — 原子位运算
  • InterlockedExchangePointer — 原子指针交换
  • InterlockedCompareExchange — 原子比较并交换

64 位和 16 位变体在函数名末尾追加 6416 后缀(如 InterlockedIncrement64)。

还有带有 acquire/release 语义的扩展变体,如 InterlockedAndAcquireInterlockedAndReleaseInterlockedAndNoFence。除非你确切了解这些语义的含义,否则建议使用"标准"版本。推荐参考 Herb Sutter 的演讲 Atomic<> Weapons 以及相关的 C++ 内存模型课程。

InterlockedCompareExchange无锁编程(Lock-Free Programming) 中被大量使用,这种编程方式利用 CPU 指令来避免使用软件锁。Windows 使用 SLIST_HEADERSLIST_ENTRY 提供了一个无锁的单向链表实现:

cpp
typedef struct DECLSPEC_ALIGN(16) _SLIST_ENTRY {
    struct _SLIST_ENTRY *Next;
} SLIST_ENTRY, *PSLIST_ENTRY;

这两种类型必须是 16 字节对齐的。动态分配时需要使用 C 运行时库的 _aligned_malloc

典型的数据项将 SLIST_ENTRY 嵌入为第一个成员:

cpp
struct MyDataItem {
    SLIST_ENTRY Entry;
    int MyValue;
    // ...
};

由于操作是无锁的且链表是单向的,它实现的是栈(Stack) 的语义(仅在链表头部进行 push/pop 操作)。相关函数如下:

函数描述
InitializeSListHead将链表头初始化为空
InterlockedPushEntrySList在链表头部插入一个节点
InterlockedPopEntrySList从链表头部移除一个节点
InterlockedPushListSListEx在链表头部插入多个节点
InterlockedFlushSList移除所有节点,返回头指针
QueryDepthSList返回节点数量(非线程安全,建议用 InterlockedIncrement/InterlockedDecrement 自行跟踪计数)

临界区(Critical Sections)

Interlocked 函数可以很好地处理简单场景,但更通用的场景需要一个更广泛的同步机制。临界区(Critical Section) 是一种经典的同步原语,其基本原则是:同一时刻最多只有一个线程可以获取锁

获取锁的线程成为锁的所有者(Owner),这意味着:(1) 只有所有者才能释放临界区;(2) 如果所有者重入(递归地再次获取),操作会成功并使内部计数器递增——之后必须释放相同次数。

临界区由 CRITICAL_SECTIONRTL_CRITICAL_SECTION 的 typedef)表示,应视为不透明结构。

初始化函数:

cpp
void InitializeCriticalSection(LPCRITICAL_SECTION lpCriticalSection);

BOOL InitializeCriticalSectionAndSpinCount(
    LPCRITICAL_SECTION lpCriticalSection,
    DWORD              dwSpinCount);

BOOL InitializeCriticalSectionEx(
    LPCRITICAL_SECTION lpCriticalSection,
    DWORD              dwSpinCount,
    DWORD              Flags);

InitializeCriticalSection 返回 void,因为它只是将成员设置为初始值。带 spin count 的变体允许线程在进入内核等待之前短暂自旋——如果锁持有者很快释放,则可以避免昂贵的内核模式转换。"默认的 spin count 是 2000"(由 InitializeCriticalSection 使用)。在单处理器系统上,spin count 始终为零。Flags 参数可传入 CRITICAL_SECTION_NO_DEBUG_INFO(0x01000000)来跳过分配额外的调试结构。

销毁函数:

cpp
void DeleteCriticalSection(LPCRITICAL_SECTION lpCriticalSection);

获取和释放:

cpp
void EnterCriticalSection(LPCRITICAL_SECTION lpCriticalSection);
void LeaveCriticalSection(LPCRITICAL_SECTION lpCriticalSection);

EnterCriticalSection 会阻塞直到获取锁成功。如果调用者已经拥有该锁,则立即返回(递归获取)。值得注意的是,任何线程都可以调用 LeaveCriticalSection——不限于所有者——调用会成功,并将所有者线程 ID 重置为零。

尝试获取(非阻塞):

cpp
BOOL TryEnterCriticalSection(LPCRITICAL_SECTION lpCriticalSection);

Simple Increment 应用程序中使用临界区的示例(在组合框中选择 "Critical Section"):

cpp
// m_CritSection 是 CRITICAL_SECTION 类型
void CMainDlg::DoCriticalSectionCount() {
    auto handles = std::make_unique<HANDLE[]>(m_Threads);
    ::InitializeCriticalSection(&m_CritSection);

    for (int i = 0; i < m_Threads; i++) {
        handles[i] = ::CreateThread(nullptr, 0, [](auto param) {
            return ((CMainDlg*)param)->IncCriticalSectionThread();
        }, this, 0, nullptr);
    }

    ::WaitForMultipleObjects(m_Threads, handles.get(), TRUE, INFINITE);

    for (int i = 0; i < m_Threads; i++)
        ::CloseHandle(handles[i]);

    ::DeleteCriticalSection(&m_CritSection);
}

DWORD CMainDlg::IncCriticalSectionThread() {
    for (int i = 0; i < m_Loops; i++) {
        ::EnterCriticalSection(&m_CritSection);
        m_Count++;
        ::LeaveCriticalSection(&m_CritSection);
    }

    return 0;
}

需要特别注意:每个 EnterCriticalSection 调用必须与同一个函数中的 LeaveCriticalSection 调用配对,调用可能释放锁的外部函数是危险的。

锁和资源获取即初始化

EnterCriticalSectionLeaveCriticalSection 形成天然的配对关系,但开发者很容易忘记释放——例如在 Leave 调用之前提前 return。如果代码能够"自动调用 LeaveCriticalSection",而不需要开发者干预,那就更好了。

对于 C 语言(非 C++),可以使用终止处理程序(Termination Handler),即微软 C 扩展中的 try/finally 语法。对于 C++,更优雅的方式是使用构造函数和析构函数实现的 RAII(Resource Acquisition Is Initialization,资源获取即初始化) 惯用法。

来自 ThreadingHelpers 项目的 AutoCriticalSection

cpp
// AutoCriticalSection.h
struct AutoCriticalSection {
    AutoCriticalSection(CRITICAL_SECTION& cs);
    ~AutoCriticalSection();

    // 删除拷贝/移动构造函数和赋值运算符
    AutoCriticalSection(const AutoCriticalSection&) = delete;
    AutoCriticalSection& operator=(const AutoCriticalSection&) = delete;
    AutoCriticalSection(AutoCriticalSection&&) = delete;
    AutoCriticalSection& operator=(AutoCriticalSection&&) = delete;

private:
    CRITICAL_SECTION& _cs;
};

// AutoCriticalSection.cpp
AutoCriticalSection::AutoCriticalSection(CRITICAL_SECTION& cs) : _cs(cs) {
    ::EnterCriticalSection(&_cs);
}

AutoCriticalSection::~AutoCriticalSection() {
    ::LeaveCriticalSection(&_cs);
}

构造函数获取临界区,析构函数释放它。在 Simple Increment 应用中的用法变得非常简洁:

cpp
DWORD CMainDlg::IncCriticalSectionThread() {
    for (int i = 0; i < m_Loops; i++) {
        AutoCriticalSection locker(m_CritSection);
        m_Count++;
    }

    return 0;
}

进一步的 CriticalSection 包装类(同样是 RAII 风格——包装 CRITICAL_SECTION 对象本身,实现自动初始化和清理):

cpp
// CriticalSection.h
class CriticalSection : public CRITICAL_SECTION {
public:
    CriticalSection(DWORD spinCount = 0, DWORD flags = 0);
    ~CriticalSection();

    void Lock();
    void Unlock();
    bool TryLock();
};

// CriticalSection.cpp
CriticalSection::CriticalSection(DWORD spinCount, DWORD flags) {
    ::InitializeCriticalSectionEx(this, (DWORD)spinCount, flags);
}

CriticalSection::~CriticalSection() {
    ::DeleteCriticalSection(this);
}

void CriticalSection::Lock() {
    ::EnterCriticalSection(this);
}

void CriticalSection::Unlock() {
    ::LeaveCriticalSection(this);
}

bool CriticalSection::TryLock() {
    return ::TryEnterCriticalSection(this);
}

CRITICAL_SECTION 派生的设计使得 CriticalSection 可以传递到任何需要 CRITICAL_SECTION 的地方。另一种做法是将 CRITICAL_SECTION 作为成员嵌入并提供隐式转换运算符,这作为练习留给读者。

死锁(Deadlocks)

死锁的经典场景是:线程 A 持有锁 1,等待锁 2;同时,锁 2 被线程 B 持有,线程 B 又在等待锁 1。两个线程互相等待,永远无法继续执行。

理论上的解决方案很简单:始终以相同的顺序获取锁。任何需要多个锁的线程都应按一致的顺序获取它们,这可以保证不会发生死锁(至少对于这些锁而言)。实际挑战在于强制执行这种规则——这超出了代码本身的范畴,需要通过文档来约束未来的代码遵循相同的规则。另一种方案是编写一个"多锁"包装器,使其总是以相同的顺序获取锁。一种简单的实现方式是按照锁在内存中的地址进行排序,然后按排序后的顺序获取。

练习:为临界区编写一个多锁包装器。

MD5 计算器应用程序

MD5 计算器应用程序计算已加载镜像文件(EXE 和 DLL)的 MD5 哈希值。这个应用涉及后台计算、响应式用户界面、系统级镜像加载通知和哈希缓存。

计算 MD5 哈希值

MD5Calculator 类封装了 Windows Crypto API 来计算文件的 MD5 哈希值:

cpp
// MD5Calculator.h
class MD5Calculator {
public:
    static std::vector<uint8_t> Calculate(PCWSTR path);
};
cpp
// MD5Calculator.cpp
#include <wincrypt.h>
#include "MD5Calculator.h"
#include <wil\resource.h>

std::vector<uint8_t> MD5Calculator::Calculate(PCWSTR path) {
    std::vector<uint8_t> md5;
    wil::unique_hfile hFile(::CreateFile(path, GENERIC_READ, FILE_SHARE_READ,
        nullptr, OPEN_EXISTING, FILE_FLAG_SEQUENTIAL_SCAN, nullptr));
    if (!hFile)
        return md5;

    wil::unique_handle hMemMap(::CreateFileMapping(hFile.get(), nullptr,
        PAGE_READONLY, 0, 0, nullptr));
    if (!hMemMap)
        return md5;

    wil::unique_hcryptprov hProvider;
    if (!::CryptAcquireContext(hProvider.addressof(), nullptr, nullptr,
        PROV_RSA_FULL, CRYPT_VERIFYCONTEXT))
        return md5;

    wil::unique_hcrypthash hHash;
    if (!::CryptCreateHash(hProvider.get(), CALG_MD5, 0, 0, hHash.addressof()))
        return md5;

    wil::unique_mapview_ptr<BYTE> buffer((BYTE*)::MapViewOfFile(hMemMap.get(),
        FILE_MAP_READ, 0, 0, 0));
    if (!buffer)
        return md5;

    auto size = ::GetFileSize(hFile.get(), nullptr);
    if (!::CryptHashData(hHash.get(), buffer.get(), size, 0))
        return md5;

    DWORD hashSize;
    DWORD len = sizeof(DWORD);
    if (!::CryptGetHashParam(hHash.get(), HP_HASHSIZE, (BYTE*)&hashSize,
        &len, 0))
        return md5;

    md5.resize(len = hashSize);
    ::CryptGetHashParam(hHash.get(), HP_HASHVAL, md5.data(), &len, 0);

    return md5;
}

计算过程分为几个步骤:用 CreateFile 打开文件(使用 FILE_FLAG_SEQUENTIAL_SCAN 标志),用 CreateFileMapping 创建文件映射,用 CryptAcquireContext 获取加密服务提供者,用 CryptCreateHash 创建 MD5 哈希句柄(传入 CALG_MD5),用 MapViewOfFile 将文件映射到内存,用 CryptHashData 计算哈希,最后用 CryptGetHashParam 获取哈希大小和值。WIL 的智能指针(unique_hfileunique_handleunique_hcryptprovunique_hcrypthashunique_mapview_ptr)负责自动清理资源。

哈希缓存

HashCache 类将 unordered_map 包装在临界区之后:

cpp
using Hash = std::vector<uint8_t>;
class HashCache {
public:
    HashCache();
    bool Add(PCWSTR path, const Hash& hash);
    const Hash Get(PCWSTR path) const;
    bool Remove(PCWSTR path);
    void Clear();
private:
    mutable CriticalSection _lock;
    std::unordered_map<std::wstring, Hash> _cache;
};

实现:

cpp
HashCache::HashCache() {
    _cache.reserve(512);
}

bool HashCache::Add(PCWSTR path, const Hash& hash) {
    AutoCriticalSection locker(_lock);
    auto it = _cache.find(path);
    if (it == _cache.end()) {
        _cache.insert({ path, hash });
        return true;
    }

    return false;
}

const Hash HashCache::Get(PCWSTR path) const {
    AutoCriticalSection locker(_lock);
    auto it = _cache.find(path);
    return it == _cache.end() ? Hash() : it->second;
}

bool HashCache::Remove(PCWSTR path) {
    AutoCriticalSection locker(_lock);
    auto it = _cache.find(path);
    if (it != _cache.end()) {
        _cache.erase(it);
        return true;
    }

    return false;
}

void HashCache::Clear() {
    AutoCriticalSection locker(_lock);
    _cache.clear();
}

每个操作都受到 RAII 风格的 AutoCriticalSection 保护。主视图类持有一个名为 m_CacheHashCache 实例和一个控制缓存是否启用的 m_UseCache 布尔值。注意 Get 方法声明为 const,但使用的 CriticalSection 被声明为 mutable(可变),以便在逻辑上的只读操作中仍然能获取锁。

图像加载通知

TraceManager 类封装了与 ETW(Event Tracing for Windows,Windows 事件跟踪) 的交互:

cpp
class TraceManager final {
public:
    ~TraceManager();

    bool Start(std::function<void(PEVENT_RECORD)> callback);
    bool Stop();

private:
    void OnEventRecord(PEVENT_RECORD rec);
    DWORD Run();

private:
    TRACEHANDLE _handle{ 0 };
    TRACEHANDLE _hTrace{ 0 };
    EVENT_TRACE_PROPERTIES* _properties;
    std::unique_ptr<BYTE[]> _propertiesBuffer;
    EVENT_TRACE_LOGFILE _traceLog = { 0 };
    wil::unique_handle _hProcessThread;
    std::function<void(PEVENT_RECORD)> _callback;
};

析构函数:

cpp
TraceManager::~TraceManager() {
    Stop();
}

Start 配置会话并开始处理:

cpp
bool TraceManager::Start(std::function<void(PEVENT_RECORD)> cb) {
    _callback = cb;

    if (_handle || _hTrace)
        return true;

    auto size = sizeof(EVENT_TRACE_PROPERTIES) + sizeof(KERNEL_LOGGER_NAME);
    _propertiesBuffer = std::make_unique<BYTE[]>(size);
    ::memset(_propertiesBuffer.get(), 0, size);

    _properties = reinterpret_cast<EVENT_TRACE_PROPERTIES*>(_propertiesBuffer.get());
    _properties->EnableFlags = EVENT_TRACE_FLAG_IMAGE_LOAD;
    _properties->Wnode.BufferSize = (ULONG)size;
    _properties->Wnode.Guid = SystemTraceControlGuid;
    _properties->Wnode.Flags = WNODE_FLAG_TRACED_GUID;
    _properties->Wnode.ClientContext = 1;
    _properties->LogFileMode = EVENT_TRACE_REAL_TIME_MODE;
    _properties->LoggerNameOffset = sizeof(EVENT_TRACE_PROPERTIES);

    auto error = ::StartTrace(&_handle, KERNEL_LOGGER_NAME, _properties);
    if (error != ERROR_SUCCESS && error != ERROR_ALREADY_EXISTS)
        return false;

    // 设置消费者
    _traceLog.Context = this;
    _traceLog.LoggerName = KERNEL_LOGGER_NAME;
    _traceLog.ProcessTraceMode = PROCESS_TRACE_MODE_EVENT_RECORD |
        PROCESS_TRACE_MODE_REAL_TIME;
    _traceLog.EventRecordCallback = [](PEVENT_RECORD record) {
        ((TraceManager*)record->UserContext)->OnEventRecord(record);
    };

    _hTrace = ::OpenTrace(&_traceLog);
    if (!_hTrace)
        return false;

    _hProcessThread.reset(::CreateThread(nullptr, 0, [](auto param) {
        return ((TraceManager*)param)->Run();
    }, this, 0, nullptr));

    return true;
}

RunOnEventRecord

cpp
DWORD TraceManager::Run() {
    auto error = ::ProcessTrace(&_hTrace, 1, nullptr, nullptr);
    return error;
}

void TraceManager::OnEventRecord(PEVENT_RECORD rec) {
    if (_callback)
        _callback(rec);
}

Stop

cpp
bool TraceManager::Stop() {
    if (_hTrace) {
        ::CloseTrace(_hTrace);
        _hTrace = 0;
    }

    if (_handle) {
        ::StopTrace(_handle, KERNEL_LOGGER_NAME, _properties);
        _handle = 0;
    }

    return true;
}

主框架类持有 TraceManager 实例,通过菜单命令启动/停止跟踪:

cpp
LRESULT CMainFrame::OnStartTrace(WORD, WORD, HWND, BOOL&) {
    m_TraceManager.Start([this](auto record) {
        m_view.OnEvent(record);
    });
    // UI 更新省略...
    return 0;
}

LRESULT CMainFrame::OnStopTrace(WORD, WORD, HWND, BOOL&) {
    m_TraceManager.Stop();
    // UI 更新省略
    return 0;
}

事件解析

EventProperty 结构和 EventParser 类从 ETW 事件中提取数据:

cpp
struct EventProperty {
    EventProperty(EVENT_PROPERTY_INFO& info);

    std::wstring Name;
    BYTE* Data;
    ULONG Length;
    EVENT_PROPERTY_INFO& Info;

    template<typename T>
    T GetValue() const {
        static_assert(std::is_pod<T>() && !std::is_pointer<T>());
        return *(T*)Data;
    }

    PCWSTR GetUnicodeString() const;
    PCSTR GetAnsiString() const;
};
cpp
class EventParser {
public:
    EventParser(PEVENT_RECORD record);

    PTRACE_EVENT_INFO GetEventInfo() const;
    PEVENT_RECORD GetEventRecord() const;
    const EVENT_HEADER& GetEventHeader() const;
    const std::vector<EventProperty>& GetProperties() const;
    const EventProperty* GetProperty(PCWSTR name) const;
    DWORD GetProcessId() const;
    static std::wstring GetDosNameFromNtName(PCWSTR name);

private:
    std::unique_ptr<BYTE[]> _buffer;
    PTRACE_EVENT_INFO _info{ nullptr };
    PEVENT_RECORD _record;
    mutable std::vector<EventProperty> _properties;
};

实现:

cpp
EventProperty::EventProperty(EVENT_PROPERTY_INFO& info) : Info(info) {
}

EventParser::EventParser(PEVENT_RECORD record) : _record(record) {
    ULONG size = 0;
    auto error = ::TdhGetEventInformation(record, 0, nullptr, _info, &size);
    if (error == ERROR_INSUFFICIENT_BUFFER) {
        _buffer = std::make_unique<BYTE[]>(size);
        _info = reinterpret_cast<PTRACE_EVENT_INFO>(_buffer.get());
        error = ::TdhGetEventInformation(record, 0, nullptr, _info, &size);
    }
    ::SetLastError(error);
}

PTRACE_EVENT_INFO EventParser::GetEventInfo() const {
    return _info;
}

PEVENT_RECORD EventParser::GetEventRecord() const {
    return _record;
}

const EVENT_HEADER& EventParser::GetEventHeader() const {
    return _record->EventHeader;
}

const std::vector<EventProperty>& EventParser::GetProperties() const {
    if (!_properties.empty())
        return _properties;

    _properties.reserve(_info->TopLevelPropertyCount);
    auto userDataLength = _record->UserDataLength;
    BYTE* data = (BYTE*)_record->UserData;

    for (ULONG i = 0; i < _info->TopLevelPropertyCount; i++) {
        auto& prop = _info->EventPropertyInfoArray[i];

        EventProperty property(prop);
        property.Name.assign((WCHAR*)((BYTE*)_info + prop.NameOffset));
        auto len = prop.length;
        property.Length = len;
        property.Data = data;
        data += len;
        userDataLength -= len;

        _properties.push_back(std::move(property));
    }

    return _properties;
}

const EventProperty* EventParser::GetProperty(PCWSTR name) const {
    for (auto& prop : GetProperties())
        if (prop.Name == name)
            return &prop;
    return nullptr;
}

DWORD EventParser::GetProcessId() const {
    return _record->EventHeader.ProcessId;
}

PCWSTR EventProperty::GetUnicodeString() const {
    return (PCWSTR)Data;
}

PCSTR EventProperty::GetAnsiString() const {
    return (PCSTR)Data;
}

GetDosNameFromNtName 静态函数将 NT 设备路径(如 \Device\HarddiskVolume3\...)转换为 Win32 盘符路径:

cpp
std::wstring EventParser::GetDosNameFromNtName(PCWSTR name) {
    static std::vector<std::pair<std::wstring, std::wstring>> deviceNames;
    static bool first = true;
    if (first) {
        auto drives = ::GetLogicalDrives();
        int drive = 0;
        while (drives) {
            if (drives & 1) {
                WCHAR driveName[] = L"X:";
                driveName[0] = (WCHAR)(drive + 'A');
                WCHAR path[MAX_PATH];
                if (::QueryDosDevice(driveName, path, MAX_PATH)) {
                    deviceNames.push_back({ path, driveName });
                }
            }
            drive++;
            drives >>= 1;
        }
        first = false;
    }

    for (auto& [ntName, dosName] : deviceNames) {
        if (::_wcsnicmp(name, ntName.c_str(), ntName.size()) == 0)
            return dosName + (name + ntName.size());
    }

    return L"";
}

这个函数通过调用 GetLogicalDrivesQueryDosDevice 构建一个静态向量,将 NT 设备名映射到盘符。然后它将传入的路径与已知的设备名进行匹配,并加上对应的盘符前缀。

整合所有内容

EventData 结构体保存每个事件的信息:

cpp
struct EventData {
    CString FileName;
    ULONGLONG Time;
    DWORD ProcessId;
    Hash MD5Hash;
    DWORD CalculatingThreadId;
    DWORD CalculationTime;
    bool Cached : 1;
    bool CalcDone : 1;
};

视图类存储事件、缓存和保护它们的临界区:

cpp
class CView ... {
    // ...
private:
    std::vector<EventData> m_Events;
    HashCache m_Cache;
    CriticalSection m_EventsLock;
    bool m_UseCache{ false };
};

OnEvent 方法过滤并处理传入的 ETW 事件:

cpp
void CView::OnEvent(PEVENT_RECORD record) {
    EventParser parser(record);
    if (parser.GetEventHeader().EventDescriptor.Opcode != 10)
        return;
}

四种可能的镜像事件中,只有 opcode 10 是实际的镜像加载事件。当匹配的事件到达时,提取 FileName 属性并填充 EventData 实例:

cpp
auto fileName = parser.GetProperty(L"FileName");
if (fileName) {
    EventData data;
    data.FileName = parser.GetDosNameFromNtName(
        fileName->GetUnicodeString()).c_str();
    data.ProcessId = parser.GetProcessId();
    data.Time = parser.GetEventHeader().TimeStamp.QuadPart;
    data.CalcDone = false;
    size_t size;
    {
        AutoCriticalSection locker(m_EventsLock);
        m_Events.push_back(std::move(data));
        size = m_Events.size();
    }

    int index = static_cast<int>(size - 1);
    PostMessage(WM_START_CALC, index, size);
}

设计要点:保护事件向量的临界区通过人工块作用域尽可能缩短持有时间。不直接在回调中计算哈希值(这会阻塞 ETW 回调线程),而是通过 PostMessage 异步发送 WM_START_CALC 到 UI 线程,使回调能快速返回,让 ETW 继续处理后续事件。GetDosNameFromNtName 函数将 NT 风格路径转换为标准 Win32 路径,以便 CreateFile 使用。

读写锁(Reader Writer Locks)

使用临界区保护共享数据虽然有效,但这是一种悲观的机制——它最多允许一个线程同时访问共享数据。在某些线程读取而另一些线程写入的场景中,可以进行优化:如果一个线程只是读取数据,没有理由阻止其他只读线程同时执行。这正是**"单写者-多读者"(Single Writer, Multiple Readers)** 机制提供的功能。

Windows 提供了 SRWLOCK 结构体("S" 代表 "Slim")用于此类锁:

cpp
typedef RTL_SRWLOCK SRWLOCK, *PSRWLOCK;

RTL_SRWLOCK 内部定义如下:

cpp
typedef struct _RTL_SRWLOCK {
    PVOID Ptr;
} RTL_SRWLOCK, *PRTL_SRWLOCK;

这是不透明数据,应如此对待。使用 InitializeSRWLock 初始化 SRWLOCK

cpp
void InitializeSRWLock(_Out_ PSRWLOCK SRWLock);

也可以使用静态初始化,赋值 SRWLOCK_INIT(将结构清零)。值得注意的是,SRWLOCK 没有"删除"操作——因为所有内部信息都被压缩到那个指针大小的单元中。

对于已初始化的 SRWLOCK,线程使用以下函数获取共享(读)或独占(写)锁:

cpp
void AcquireSRWLockShared(_InOut_ PSRWLOCK SRWLock);
void AcquireSRWLockExclusive(_InOut_ PSRWLOCK SRWLock);

如果无法获取锁,线程进入等待状态。一旦获取锁,线程可以相应地访问共享资源——这意味着线程有责任避免"错误"的访问方式(例如,持有共享锁的线程绝不能修改共享数据)。

释放函数:

cpp
void ReleaseSRWLockShared(_Inout_ PSRWLOCK SRWLock);
void ReleaseSRWLockExclusive(_Inout_ PSRWLOCK SRWLock);

SRW 锁有一定的局限性:

  • 共享锁的持有者不能直接将锁升级为独占锁。必须先释放共享锁,再竞争获取独占锁。
  • 独占锁的持有者不能递归地获取该锁;这会导致死锁。
  • 不保证第一个请求锁的线程会第一个获取到锁。文档声明:"SRW 锁既不公平也不是 FIFO。"

如果可以接受这些限制,且对数据的大部分操作是读取而非写入,则可以获得更好的性能。

此外也支持尝试获取的变体:

cpp
BOOLEAN TryAcquireSRWLockExclusive(_Inout_ PSRWLOCK SRWLock);
BOOLEAN TryAcquireSRWLockShared(_Inout_ PSRWLOCK SRWLock);

如果获取锁成功则返回 TRUE,否则返回 FALSE。如果获取成功,最终必须调用相应的释放函数。

资源获取即初始化包装器

与临界区类似,为 SRWLOCK 提供 RAII 包装器也是非常便利的。提供了三个类:一个包装 SRWLOCK 本身,另外两个用于获取/释放锁:

cpp
class ReaderWriterLock : public SRWLOCK {
public:
    ReaderWriterLock();
    ReaderWriterLock(const ReaderWriterLock&) = delete;
    ReaderWriterLock& operator=(const ReaderWriterLock&) = delete;
    void LockShared();
    void UnlockShared();
    void LockExclusive();
    void UnlockExclusive();
};

struct AutoReaderWriterLockExclusive {
    AutoReaderWriterLockExclusive(SRWLOCK& lock);
    ~AutoReaderWriterLockExclusive();

private:
    SRWLOCK& _lock;
};

struct AutoReaderWriterLockShared {
    AutoReaderWriterLockShared(SRWLOCK& lock);
    ~AutoReaderWriterLockShared();

private:
    SRWLOCK& _lock;
};

实现:

cpp
ReaderWriterLock::ReaderWriterLock() {
    ::InitializeSRWLock(this);
}

void ReaderWriterLock::LockShared() {
    ::AcquireSRWLockShared(this);
}

void ReaderWriterLock::UnlockShared() {
    ::ReleaseSRWLockShared(this);
}

void ReaderWriterLock::LockExclusive() {
    ::AcquireSRWLockExclusive(this);
}

void ReaderWriterLock::UnlockExclusive() {
    ::ReleaseSRWLockExclusive(this);
}

AutoReaderWriterLockExclusive::AutoReaderWriterLockExclusive(SRWLOCK& lock)
    : _lock(lock) {
    ::AcquireSRWLockExclusive(&_lock);
}

AutoReaderWriterLockExclusive::~AutoReaderWriterLockExclusive() {
    ::ReleaseSRWLockExclusive(&_lock);
}

AutoReaderWriterLockShared::AutoReaderWriterLockShared(SRWLOCK& lock)
    : _lock(lock) {
    ::AcquireSRWLockShared(&_lock);
}

AutoReaderWriterLockShared::~AutoReaderWriterLockShared() {
    ::ReleaseSRWLockShared(&_lock);
}

这些包装器均来自 ThreadingHelpers 项目。

MD5 计算器 2

在 MD5 计算器中,可以用 SRW 锁替换哈希缓存中的临界区,以潜在提升并发性——因为多个读操作可以同时进行:

cpp
class HashCache {
public:
    HashCache();
    bool Add(PCWSTR path, const Hash& hash);
    const Hash Get(PCWSTR path) const;
    bool Remove(PCWSTR path);
    void Clear();

private:
    mutable ReaderWriterLock _lock;
    std::unordered_map<std::wstring, Hash> _cache;
};

实现:

cpp
bool HashCache::Add(PCWSTR path, const Hash& hash) {
    AutoReaderWriterLockExclusive locker(_lock);
    auto it = _cache.find(path);
    if (it == _cache.end()) {
        _cache.insert({ path, hash });
        return true;
    }
    return false;
}

const Hash HashCache::Get(PCWSTR path) const {
    AutoReaderWriterLockShared locker(_lock);
    auto it = _cache.find(path);
    return it == _cache.end() ? Hash() : it->second;
}

bool HashCache::Remove(PCWSTR path) {
    AutoReaderWriterLockExclusive locker(_lock);
    auto it = _cache.find(path);
    if (it != _cache.end()) {
        _cache.erase(it);
        return true;
    }
    return false;
}

void HashCache::Clear() {
    AutoReaderWriterLockExclusive locker(_lock);
    _cache.clear();
}

Get 使用 AutoReaderWriterLockShared(共享/读锁),而 AddRemoveClear 使用 AutoReaderWriterLockExclusive(独占/写锁)。类似的修改也可以应用到 CView 类中。这些改动位于 MD5Calculator2 项目中。

条件变量(Condition Variables)

条件变量(Condition Variable) 是一种同步机制,允许等待在临界区或 SRW 锁上的线程暂停执行,直到某个特定条件发生。一个经典的场景是生产者-消费者(Producer-Consumer) 模式:一些线程生成数据项并将其放入队列,而消费者线程从队列中取出数据项并处理它们。

如果数据项的产生速度快于消费者的处理速度,队列不会为空,消费者将继续工作。反之,如果消费者处理完了所有内容,它们应该进入等待状态直到有新数据项产生。这正是条件变量所提供的功能。空闲的消费者线程(队列为空)不应该浪费 CPU 时间进行轮询——条件变量允许高效等待,完全不消耗 CPU,直到被唤醒(通常由生产者唤醒)。

条件变量由不透明的 CONDITION_VARIABLE 结构表示,与 SRWLOCK 非常相似。必须通过以下函数初始化:

cpp
void InitializeConditionVariable(_Out_ PCONDITION_VARIABLE ConditionVariable);

也可以通过 CONDITION_VARIABLE_INIT 进行静态初始化。

条件变量总是与临界区或 SRW 锁关联。当线程需要在条件变量上等待信号时,必须先获取临界区/SRW 锁,然后调用以下睡眠函数之一:

cpp
BOOL SleepConditionVariableCS(
    _Inout_ PCONDITION_VARIABLE ConditionVariable,
    _Inout_ PCRITICAL_SECTION CriticalSection,
    _In_ DWORD dwMilliseconds
);

BOOL SleepConditionVariableSRW(
    _Inout_ PCONDITION_VARIABLE ConditionVariable,
    _Inout_ PSRWLOCK SRWLock,
    _In_ DWORD dwMilliseconds,
    _In_ ULONG Flags
);

调用这些 Sleep* 函数的线程必须事先恰好获取了关联的同步对象一次。该函数会原子地释放同步对象并在条件变量上等待。在等待期间,线程可以被以下唤醒函数唤醒:

cpp
VOID WakeConditionVariable(_Inout_ PCONDITION_VARIABLE ConditionVariable);
VOID WakeAllConditionVariable(_Inout_ PCONDITION_VARIABLE ConditionVariable);

WakeConditionVariable 唤醒单个线程(不保证是哪个),而 WakeAllConditionVariable 唤醒所有等待中的线程。

唤醒后,线程重新获取同步对象并继续执行。线程随后应重新检查它等待的条件——如果条件未满足,应再次调用 Sleep*。这是因为另一个线程可能先被唤醒,执行了使条件再次变为 false 的操作。这种现象称为虚假唤醒(Spurious Wakeup)

使用临界区的消费者线程流程如下:

  1. 获取临界区
  2. 检查是否有工作可用(如队列非空)
  3. 如果队列为空,调用 SleepConditionVariableCS——释放临界区并进入等待状态
  4. 生产者最终在添加数据项后调用 WakeConditionVariable
  5. SleepConditionVariableCS 返回,重新获取临界区,重新检查条件
  6. 如果有工作可用,线程在持有临界区的同时完成工作
  7. 工作完成,释放临界区

如果 Sleep* 返回 TRUE,说明线程被唤醒并已获取同步原语。返回 FALSE 表示错误(或者如果 dwMilliseconds 不是 INFINITE,超时会导致 GetLastError 返回 ERROR_TIMEOUT)。

对于 SRW 锁,Flags 参数指示所需的获取模式:0 表示独占访问,CONDITION_VARIABLE_LOCKMODE_SHARED 表示共享访问。

队列演示应用程序

Queue Demo 应用程序演示了条件变量如何唤醒访问共享队列的生产者和消费者线程。启动后可以选择生产者和消费者线程的数量。

点击 "Run" 开始运行。生产者线程生成数字数据项并将其推入队列。消费者线程取出数据项并检查数字是否为素数。如果队列为空,消费者在条件变量上睡眠,直到被生产者唤醒。底部的状态栏会定期更新当前队列大小。

如果生产者生成数据项的速度快于消费者的处理速度,队列大小会增长。点击 "Stop" 停止生产者,让消费者追赶并清空队列。如果消费者更快(可能由于数量更多),队列大小保持在接近零的水平。运行期间会显示消费者线程的统计信息。

CMainDlg 类定义了以下嵌套类型:

cpp
struct ConsumerThreadData {
    unsigned ItemsProcessed{ 0 };
    unsigned Primes{ 0 };
    wil::unique_handle hThread;
};

struct WorkItem {
    unsigned Data;
    bool IsPrime;
};

ConsumerThreadData 保存每个消费者线程的状态:线程句柄、已处理项数量和找到的素数数量。每个存储在队列中的 WorkItem 包含一个待检查素数的数字和一个结果字段。

基于这些结构和应用需求的数据成员:

cpp
std::queue<WorkItem> m_Queue;        // 队列
CriticalSection m_QueueLock;         // 保护队列的临界区
CONDITION_VARIABLE m_QueueCondVar;
std::vector<wil::unique_handle> m_ProducerThreads;
std::vector<ConsumerThreadData> m_ConsumerThreads;
wil::unique_handle m_hAbortEvent;
static CMainDlg* m_pThis;            // 简化对 'this' 的访问

CriticalSection 是来自 ThreadingHelpers 的包装类。m_hAbortEvent 是一个事件内核对象句柄,用于向生产者和消费者线程发出停止信号。静态 m_pThis 保存对话框实例指针,供线程函数访问。

CMainDlg::OnInitDialog 初始化:

cpp
LRESULT CMainDlg::OnInitDialog(UINT, WPARAM, LPARAM, BOOL&) {
    m_pThis = this;
    m_hAbortEvent.reset(::CreateEvent(nullptr, TRUE, FALSE, nullptr));
    // ...
}

点击 "Run" 调用 OnRunOnRun 再调用 RunRun 函数首先获取线程数量:

cpp
void CMainDlg::Run() {
    int consumers = GetDlgItemInt(IDC_CONSUMERS);
    if (consumers < 1 || consumers > 64) {
        DisplayError(L"Consumer threads must be between 1 and 64");
        return;
    }

    int producers = GetDlgItemInt(IDC_PRODUCERS);
    if (producers < 1 || producers > 64) {
        DisplayError(L"Producer threads must be between 1 and 64");
        return;
    }

接下来进行本次运行的初始化:

cpp
    bool abort = false;
    ::ResetEvent(m_hAbortEvent.get());
    ::InitializeConditionVariable(&m_QueueCondVar);
    m_ThreadList.DeleteAllItems();

创建消费者线程:

cpp
    m_ConsumerThreads.clear();
    m_ConsumerThreads.reserve(consumers);

    for (int i = 0; i < consumers; i++) {
        ConsumerThreadData data;
        data.hThread.reset(::CreateThread(nullptr, 0, [](auto p) {
            return m_pThis->ConsumerThread(PtrToLong(p));
        }, LongToPtr(i), 0, nullptr));

        if (!data.hThread) {
            abort = true;
            break;
        }

        m_ConsumerThreads.push_back(std::move(data));
    }

    if (abort) {
        ::SetEvent(m_hAbortEvent.get());
        return;
    }

类似地创建生产者线程:

cpp
    m_ProducerThreads.clear();
    m_ProducerThreads.reserve(producers);

    for (int i = 0; i < producers; i++) {
        wil::unique_handle hThread(::CreateThread(nullptr, 0, [](auto p) {
            return m_pThis->ProducerThread();
        }, this, 0, nullptr));

        if (!hThread) {
            DisplayError(L"Failed to create producer thread. Aborting");
            abort = true;
            break;
        }
    }

    if (abort) {
        ::SetEvent(m_hAbortEvent.get());
        return;
    }

Run 的最后一部分填充列表视图并启动定时器以更新队列大小显示:

cpp
    CString text;
    for (int i = 0; i < (int)m_ConsumerThreads.size(); i++) {
        const auto& t = m_ConsumerThreads[i];
        text.Format(L"%2d", i);
        int n = m_ThreadList.InsertItem(i, text);
        m_ThreadList.SetItemText(n, 1,
            std::to_wstring(::GetThreadId(t.hThread.get())).c_str());
    }

    GetDlgItem(IDC_RUN).EnableWindow(FALSE);
    GetDlgItem(IDC_STOP).EnableWindow(TRUE);

    SetTimer(1, 500, nullptr);
}

生产者线程代码:

cpp
DWORD CMainDlg::ProducerThread() {
    for (;;) {
        if (::WaitForSingleObject(m_hAbortEvent.get(), 0) == WAIT_OBJECT_0)
            break;

        WorkItem item;
        item.IsPrime = false;
        LARGE_INTEGER li;
        ::QueryPerformanceCounter(&li);
        item.Data = li.LowPart;
        {
            AutoCriticalSection locker(m_QueueLock);
            m_Queue.push(item);
        }
        ::WakeConditionVariable(&m_QueueCondVar);

        // 偶尔小睡一下
        if ((item.Data & 0x7f) == 0)
            ::Sleep(1);
    }
    return 0;
}

消费者线程代码:

cpp
DWORD CMainDlg::ConsumerThread(int index) {
    auto& data = m_ConsumerThreads[index];
    auto tick = ::GetTickCount64();
    for (;;) {
        WorkItem value;
        {
            bool abort = false;
            AutoCriticalSection locker(m_QueueLock);
            while (m_Queue.empty()) {
                if (::WaitForSingleObject(m_hAbortEvent.get(), 0) == WAIT_OBJECT_0) {
                    abort = true;
                    break;
                }
                ::SleepConditionVariableCS(&m_QueueCondVar, &m_QueueLock, INFINITE);
            }
            if (abort)
                break;
            ATLASSERT(!m_Queue.empty());
            value = m_Queue.front();
            m_Queue.pop();
        }

        {
            // 执行实际工作
            bool isPrime = IsPrime(value.Data);
            if (isPrime) {
                value.IsPrime = true;
                ::InterlockedIncrement(&data.Primes);
            }
            ::InterlockedIncrement(&data.ItemsProcessed);
        }

        auto current = ::GetTickCount64();
        if (current - tick > 600) {
            PostMessage(WM_UPDATE_THREAD, index);
            tick = current;
        }
    }

    PostMessage(WM_UPDATE_THREAD, index);
    return 0;
}

WM_UPDATE_THREAD 消息处理器更新消费者线程统计信息:

cpp
LRESULT CMainDlg::OnUpdateThread(UINT, WPARAM index, LPARAM, BOOL&) {
    auto& data = m_ConsumerThreads[index];
    int n = (int)index;
    CString text;
    text.Format(L"%u", ::InterlockedAdd((LONG*)&data.ItemsProcessed, 0));
    m_ThreadList.SetItemText(n, 2, text);
    text.Format(L"%u", ::InterlockedAdd((LONG*)&data.Primes, 0));
    m_ThreadList.SetItemText(n, 3, text);
    return 0;
}

定时器处理器更新队列大小显示:

cpp
LRESULT CMainDlg::OnTimer(UINT, WPARAM id, LPARAM, BOOL&) {
    if (id == 1) {
        size_t size;
        {
            AutoCriticalSection locker(m_QueueLock);
            size = m_Queue.size();
        }

        SetDlgItemInt(IDC_QUEUE_SIZE, (unsigned)size, FALSE);
    }

    return 0;
}

最后,点击 "Stop" 调用 OnStopOnStop 再调用 Stop

cpp
void CMainDlg::Stop() {
    // 向线程发出终止信号
    ::SetEvent(m_hAbortEvent.get());
    ::WakeAllConditionVariable(&m_QueueCondVar);
    // 更新 UI
    GetDlgItem(IDC_RUN).EnableWindow(TRUE);
    GetDlgItem(IDC_STOP).EnableWindow(FALSE);
}

这里设置了终止事件(使所有生产者退出),并通过条件变量唤醒所有消费者,使它们能够排空队列中剩余的数据项。

等待地址

WaitOnAddress 是 Windows 8 / Server 2012 引入的同步机制,允许线程高效等待某个内存地址的值变为期望值。它比条件变量更高效,且"由于不直接使用临界区(或其他软件同步原语),所以不易出现死锁"。

函数签名:

cpp
BOOL WaitOnAddress(
    _In_ volatile VOID* Address,
    _In_ PVOID CompareAddress,
    _In_ SIZE_T AddressSize,
    _In_opt_ DWORD dwMilliseconds);

注意:本节中的函数需要链接到 synchronization.lib 导入库。

该函数检查 Address 指向的值是否与 CompareAddress 指向的值相同。如果不同,立即返回 TRUE;如果相同,线程进入等待状态。AddressSize 必须是 1、2、4 或 8。

唤醒函数:

cpp
VOID WakeByAddressSingle(_In_ PVOID Address);
VOID WakeByAddressAll(_In_ PVOID Address);

WakeByAddressSingle 唤醒单个在指定地址上等待的线程,WakeByAddressAll 唤醒所有等待线程。同样存在虚假唤醒的可能,因此需要在循环中重新检查条件。

典型用法示例:

cpp
DWORD undesiredValue = 0;
DWORD actualValue = 0;

void Thread1() {
    // 根据需要设置 undesiredValue
    while (actualValue == undesiredValue) {
        ::WaitOnAddress(&actualValue, &undesiredValue, sizeof(DWORD), INFINITE);
    }

    // actualValue != undesiredValue
}

void Thread2() {
    // ...
    actualValue++;
    ::WakeByAddressSingle(&actualValue);
}

线程 1 在 while 循环中等待 actualValue 不等于 undesiredValue。线程 2 修改 actualValue 后调用 WakeByAddressSingle,唤醒线程 1 重新检查条件。

同步屏障(Synchronization Barriers)

同步屏障(Synchronization Barrier) 是 Windows 8 引入的原语,用于协调多个线程在工作的特定点上相互等待,直到所有线程都到达该点后才一起继续执行。一个典型的场景是多阶段初始化:每个子系统必须在所有子系统完成第一阶段初始化后,才能进入第二阶段。

屏障由 SYNCHRONIZATION_BARRIER 结构表示:

cpp
BOOL InitializeSynchronizationBarrier(
    _Out_ LPSYNCHRONIZATION_BARRIER lpBarrier,
    _In_ LONG lTotalThreads,
    _In_ LONG lSpinCount);
  • lTotalThreads:必须到达屏障的线程总数
  • lSpinCount:传入 -1 使用默认值

进入屏障:

cpp
BOOL EnterSynchronizationBarrier(
    _Inout_ LPSYNCHRONIZATION_BARRIER lpBarrier,
    _In_ DWORD dwFlags);

dwFlags 可以取以下值:

  • SYNCHRONIZATION_BARRIER_FLAGS_SPIN_ONLY:仅自旋等待
  • SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY:仅阻塞等待
  • SYNCHRONIZATION_BARRIER_FLAGS_NO_DELETE:优化标志,所有线程必须一致使用

该函数对恰好一个线程返回 TRUE(屏障释放时),对其他所有线程返回 FALSE

下面是一个四子系统两阶段初始化的示例:

cpp
DWORD WINAPI InitSubSystem1(PVOID p) {
    auto barrier = (PSYNCHRONIZATION_BARRIER)p;
    // 阶段1
    printf("Subsystem 1: Starting phase 1 initialization (TID: %u)...\n",
        ::GetCurrentThreadId());

    // 执行工作...
    printf("Subsystem 1: Ended phase 1 initialization...\n");
    ::EnterSynchronizationBarrier(barrier, 0);
    printf("Subsystem 1: Starting phase 2 initialization...\n");

    // 执行工作
    printf("Subsystem 1: Ended phase 2 initialization...\n");
    return 0;
}

主函数调用方式:

cpp
SYNCHRONIZATION_BARRIER sb;
InitializeSynchronizationBarrier(&sb, 4, -1);
LPTHREAD_START_ROUTINE functions[] = {
    InitSubSystem1, InitSubSystem2, InitSubSystem3, InitSubSystem4
};

printf("System initialization started\n");
HANDLE hThread[4];
int i = 0;
for (auto f : functions) {
    hThread[i++] = ::CreateThread(nullptr, 0, f, &sb, 0, nullptr);
}

::WaitForMultipleObjects(_countof(hThread), hThread, TRUE, INFINITE);
printf("System initialization complete\n");
// 关闭线程句柄...

示例输出:

System initialization started
Subsystem 1: Starting phase 1 initialization (TID: 79480)...
Subsystem 2: Starting phase 1 initialization (TID: 104836)...
Subsystem 3: Starting phase 1 initialization (TID: 32556)...
Subsystem 4: Starting phase 1 initialization (TID: 86268)...
Subsystem 2: Ended phase 1 initialization...
Subsystem 3: Ended phase 1 initialization...
Subsystem 1: Ended phase 1 initialization...
Subsystem 4: Ended phase 1 initialization...
Subsystem 4: Starting phase 2 initialization...
Subsystem 3: Starting phase 2 initialization...
Subsystem 1: Starting phase 2 initialization...
Subsystem 2: Starting phase 2 initialization...
Subsystem 3: Ended phase 2 initialization...
Subsystem 1: Ended phase 2 initialization...
Subsystem 4: Ended phase 2 initialization...
Subsystem 2: Ended phase 2 initialization...
System initialization complete

可以看到,所有线程都完成了阶段 1 后,阶段 2 才开始执行。

删除屏障:

cpp
BOOL DeleteSynchronizationBarrier(_Inout_ LPSYNCHRONIZATION_BARRIER lpBarrier);

在调用 EnterSynchronizationBarrier 之后立即调用 DeleteSynchronizationBarrier 是可行的,因为该函数会等待所有线程到达屏障后才进行删除,除非所有线程都使用了 SYNCHRONIZATION_BARRIER_FLAGS_NO_DELETE 标志。

C++ 标准库呢?

C++ 标准库提供的同步原语可作为 Windows API 的跨平台替代方案。对应的类型如下:

  • std::mutex:类似临界区,但不支持递归获取(不可重入)
  • std::recursive_mutex:行为与临界区完全一样(支持递归获取)
  • std::shared_mutex:类似 SRW 锁,支持共享(读)和独占(写)两种模式
  • std::condition_variable:相当于 Windows 的条件变量
  • 以及 std::timed_mutexstd::lock_guardstd::unique_lock 等其他类型

显然,C++ 标准库中可能缺少一些 Windows 特有的功能,比如等待地址变化(WaitOnAddress)和同步屏障(Synchronization Barriers)。不过,这些功能可能会在未来的标准中添加。所有 C++ 标准库的同步类型都仅在同一进程内有效,无法跨进程使用。

练习

  1. 创建一个线程安全的栈数据结构实现。如果出栈操作无法成功,阻塞线程直到有数据可用(使用条件变量)。

总结

在本章中,我们介绍了通过 Windows API 可用的各种同步机制。所有这些机制的共同点是能够在同一进程中的线程之间进行某种意义上的同步。在下一章中,我们将利用内核对象扩展可用的同步原语,这些内核对象还可以在不同进程中的线程之间进行同步。