Skip to content
Published at:

第10章:高级线程处理

本章深入探讨 Windows 系统编程中与线程相关的高级主题,包括线程本地存储、远程线程、线程枚举、CPU 缓存效应、等待链遍历、用户模式调度、一次性初始化以及多线程调试技术。

线程本地存储(Thread Local Storage, TLS)

概念与动机

在多线程程序中,每个线程都可以访问栈上的局部变量和进程范围内的全局变量。但有时我们希望某个数据在每个线程中各有一份独立的副本,互不干扰。一个经典例子是 GetLastError():每个线程调用后都能获取自己最近一次的错误码,而不会被其他线程改写。

一种朴素的实现方案是用线程 ID 作为键的哈希表,但这种方式需要同步且查找可能较慢。Windows 提供了线程本地存储(Thread Local Storage, TLS)——一种用户模式机制,允许按线程进行数据存储。

注意:GetLastError 的实际值存储在 TEB(Thread Environment Block)结构中而非 TLS 槽中,但原理相同。

另一个经典案例是 C 运行时库的 errno。原始 C 标准库设计于多线程概念出现之前,errno 原本是一个全局变量。在多线程环境中,线程 1 检查 errno 前可能被线程 2 的 I/O 操作覆盖。如今 errno 是一个宏,通过 TLS 获取当前线程的值。

动态 TLS

Windows API 提供四个 TLS 函数:

函数说明
TlsAlloc()分配一个 TLS 槽索引,将所有现有线程的对应单元格清零。失败返回 TLS_OUT_OF_INDEXES0xffffffff),保证至少有 TLS_MINIMUM_AVAILABLE(64)个槽可用
TlsSetValue(DWORD index, PVOID value)在当前线程的指定槽中存储指针大小的值
TlsGetValue(DWORD index)检索当前线程在指定槽中存储的值
TlsFree(DWORD index)释放由 TlsAlloc 分配的索引

关键特性:

  • 调用线程只能访问自己的 TLS 值,没有直接的方法访问另一个线程的 TLS 槽
  • 访问 TLS 时无需同步,因为每个线程只读写自己的数据
  • 实际可用槽数远大于 64。作者在 Windows 10 2004 上测试获得了 1084 个槽

测试槽数量的代码:

cpp
int slots = 0;
while (true) {
    DWORD slot = ::TlsAlloc();
    if (slot == TLS_OUT_OF_INDEXES) {
        printf("Out of TLS indices!\n");
        break;
    }
    slots++;
}
printf("Allocated: %d\n", slots);

最佳实践:使用单个槽,动态分配所需信息的结构体,将结构体指针存储在槽中。这样可以减少 TLS 槽的消耗,并将所有线程相关数据集中管理。

TLS 的非标准用法:环境事务(Ambient Transaction)

TLS 的一个巧妙用法是实现"环境事务"——在不修改函数签名的情况下传递事务上下文。下面是一个 Transaction 类的示例。

类声明:

cpp
class Transaction {
public:
    Transaction();
    ~Transaction();
    static Transaction* GetCurrent();
    void AddLog(PCWSTR text);
    void AddError(PCWSTR text);

private:
    int _errors = 0;
    inline static DWORD _tlsIndex = TLS_OUT_OF_INDEXES;
};

实现:

cpp
Transaction::Transaction() {
    if (_tlsIndex == TLS_OUT_OF_INDEXES)
        _tlsIndex = ::TlsAlloc();
    ::TlsSetValue(_tlsIndex, this);
}

Transaction::~Transaction() {
    if (_errors == 0) {
        // 提交事务
    }
    else {
        // 中止/回滚事务
    }
    ::TlsSetValue(_tlsIndex, nullptr);
}

Transaction* Transaction::GetCurrent() {
    if (_tlsIndex == TLS_OUT_OF_INDEXES)
        return nullptr;
    return static_cast<Transaction*>(::TlsGetValue(_tlsIndex));
}

void Transaction::AddError(PCWSTR) {
    _errors++;
}

void Transaction::AddLog(PCWSTR) {
}

使用示例:

cpp
void f1() {
    auto tn = Transaction::GetCurrent();
    if (tn)
        tn->AddLog(L"f1 working");
    if (!DoWork()) {
        if (tn)
            tn->AddError(L"Failed in DoWork");
        else
            printf("Failed in DoWork");
    }
}

void do_something() {
    Transaction t;
    f1();
}

构造函数分配 TLS 索引(仅首次)并将 this 存入 TLS;析构函数根据错误计数决定提交或回滚,然后清空 TLS。静态方法 GetCurrent() 在任何被调用函数中获取当前线程的事务对象,无需通过参数层层传递。

这种做法被称为"环境事务(ambient transaction)",.NET Framework 中通过 TransactionScope 类使用相同的模式。

静态线程本地存储(Static TLS)

微软扩展 __declspec(thread) 可将全局或静态变量声明为线程本地:

cpp
__declspec(thread) int counter;

C++11 及更高版本可使用跨平台的 thread_local 关键字:

cpp
thread_local int counter;

称为"静态"是因为无需手动分配和销毁。编译器将所有线程本地变量整合到 PE 文件的 .tls 节中。进程启动时,NTDLL 加载器调用 TlsAlloc 分配槽,并为每个线程动态分配包含所有线程本地变量的内存块。在 PE 文件的 .tls 节中可以看到编译时存储的初始值(如 thread_local int counter = 5; 时二进制数据中存储的数值"5")。

远程线程(Remote Threads)

CreateRemoteThreadCreateRemoteThreadEx 用于在另一个进程中创建线程。与 CreateThread 相比,增加了目标进程句柄参数 hProcess

cpp
HANDLE CreateRemoteThread(
    HANDLE hProcess,
    LPSECURITY_ATTRIBUTES lpThreadAttributes,
    SIZE_T dwStackSize,
    LPTHREAD_START_ROUTINE lpStartAddress,
    LPVOID lpParameter,
    DWORD dwCreationFlags,
    LPDWORD lpThreadId);

需要的权限包括:PROCESS_CREATE_THREADPROCESS_QUERY_INFORMATIONPROCESS_VM_OPERATIONPROCESS_VM_WRITEPROCESS_VM_READ

最关键的参数是 lpStartAddress——线程函数的地址是相对于目标进程的。这意味着目标进程中必须已存在该代码。由于 Windows 子系统 DLL(如 kernel32.dll)在所有进程中被映射到相同的虚拟地址,因此本进程中获取的函数地址可以直接在目标进程中使用。

CreateRemoteThreadEx 增加了属性列表参数,可以指定线程相关属性(如理想处理器、优先级等)。

Breakin 应用程序

Breakin 示例应用程序通过远程线程调用 DebugBreak 函数,模拟调试器的入侵行为(Windows 已有 DebugBreakProcess 函数可执行同样的操作)。

主要步骤:

  1. 从命令行获取进程 ID,用 OpenProcess 打开句柄
  2. 利用 kernel32.dll 在所有进程中被映射到相同地址的特性,在本进程中定位 DebugBreak 函数地址
  3. 调用 CreateRemoteThread,将函数地址作为线程起始地址
  4. 关闭句柄

完整代码:

cpp
int main(int argc, const char* argv[]) {
    if (argc < 2) {
        printf("Usage: breakin <pid>\n");
        return 0;
    }

    int pid = atoi(argv[1]);
    auto hProcess = ::OpenProcess(PROCESS_CREATE_THREAD | PROCESS_QUERY_INFORMATION |
                                  PROCESS_VM_OPERATION | PROCESS_VM_READ | PROCESS_VM_WRITE,
                                  FALSE, pid);
    if (!hProcess)
        return Error("Failed to open process");

    auto hThread = ::CreateRemoteThread(hProcess, nullptr, 0,
        (LPTHREAD_START_ROUTINE)::GetProcAddress(
            ::GetModuleHandle(L"kernel32"), "DebugBreak"),
        nullptr, 0, nullptr);
    if (!hThread)
        return Error("Failed to create remote thread");

    printf("Remote thread created successfully!\n");

    ::CloseHandle(hThread);
    ::CloseHandle(hProcess);

    return 0;
}

代码假设 DebugBreak 符合线程函数的 DWORD WINAPI ThreadFunction(PVOID param) 原型——它不接受参数且返回类型不被使用,因此"足够相似"即可工作。可以用记事本附加调试器后测试 Breakin。

CreateRemoteThreadEx 更有用的场景是 DLL 注入,将在第 15 章详细讲解。

线程枚举(Thread Enumeration)

使用工具帮助函数 CreateToolhelp32Snapshot 配合 TH32CS_SNAPTHREAD 标志枚举系统中的线程。

cpp
HANDLE CreateToolhelp32Snapshot(DWORD dwFlags, DWORD th32ProcessID);
BOOL Thread32First(HANDLE hSnapshot, LPTHREADENTRY32 lpte);
BOOL Thread32Next(HANDLE hSnapshot, LPTHREADENTRY32 lpte);

快照包含所有进程中的所有线程——无法指定特定进程 ID 来仅枚举该进程的线程,第二个参数在枚举线程时无用。创建快照后,先调用 Thread32First,再循环调用 Thread32Next 直到返回 FALSE

THREADENTRY32 结构包含:

  • dwSize:调用前必须设置为 sizeof(THREADENTRY32)
  • cntUsage:引用计数
  • th32ThreadID:线程 ID
  • th32OwnerProcessID:所属进程 ID
  • tpBasePri:基础优先级
  • tpDeltaPri:优先级增量(未使用)
  • dwFlags:标志(未使用)

thlist 应用程序

thlist 是一个线程列表工具,核心辅助函数 EnumThreads 返回 ThreadInfo 结构体向量。

ThreadInfo 结构定义:

cpp
struct ThreadInfo {
    DWORD Id;
    DWORD Pid;
    int Priority;
    FILETIME CreateTime;
    DWORD CPUTime;
    std::wstring ProcessName;
};

EnumThreads 完整实现:

cpp
std::vector<ThreadInfo> EnumThreads(int pid) {
    std::vector<ThreadInfo> threads;
    HANDLE hSnapshot = ::CreateToolhelp32Snapshot(
        TH32CS_SNAPPROCESS | TH32CS_SNAPTHREAD, 0);
    if (hSnapshot == INVALID_HANDLE_VALUE)
        return threads;

    PROCESSENTRY32 pe;
    pe.dwSize = sizeof(pe);
    std::unordered_map<DWORD, PROCESSENTRY32> processes;
    processes.reserve(512);

    ::Process32First(hSnapshot, &pe);
    while (::Process32Next(hSnapshot, &pe)) {
        processes.insert({ pe.th32ProcessID, pe });
    }

    threads.reserve(4096);
    THREADENTRY32 te;
    te.dwSize = sizeof(te);

    ::Thread32First(hSnapshot, &te);
    do {
        if (te.th32OwnerProcessID > 0 &&
            (pid == 0 || te.th32OwnerProcessID == pid)) {
            ThreadInfo ti;
            ti.Id = te.th32ThreadID;
            ti.Pid = te.th32OwnerProcessID;
            ti.Priority = te.tpBasePri;
            ti.ProcessName = processes[ti.Pid].szExeFile;

            auto hThread = ::OpenThread(
                THREAD_QUERY_LIMITED_INFORMATION, FALSE, ti.Id);
            if (hThread) {
                FILETIME user, kernel, exit;
                ::GetThreadTimes(hThread, &ti.CreateTime, &exit,
                                 &kernel, &user);
                ti.CPUTime = DWORD(
                    (*(ULONGLONG*)&kernel + *(ULONGLONG*)&user) / 10000000);
                ::CloseHandle(hThread);
            }
            else {
                ti.CPUTime = 0;
                ti.CreateTime.dwHighDateTime =
                    ti.CreateTime.dwLowDateTime = 0;
            }
            threads.push_back(std::move(ti));
        }
    } while (::Thread32Next(hSnapshot, &te));

    ::CloseHandle(hSnapshot);
    return threads;
}

EnumThreads 工作流程:

  1. 创建快照(同时包含进程和线程,使用 TH32CS_SNAPPROCESS | TH32CS_SNAPTHREAD
  2. 枚举进程,建立进程 ID 到 PROCESSENTRY32unordered_map 映射(跳过 PID 为 0 的空闲进程)
  3. 枚举线程,在映射中查找对应进程获取映像名称
  4. OpenThread(THREAD_QUERY_LIMITED_INFORMATION, ...) 打开线程
  5. 调用 GetThreadTimes 获取创建时间、内核时间和用户时间
  6. 将 CPU 时间(100 纳秒单位)除以 10000000 转换为秒
  7. 关闭线程句柄,将 ThreadInfo 加入向量

GetThreadTimesGetProcessTimes 类似但基于线程操作。时间从 1601 年 1 月 1 日 UTC 开始以 100 纳秒为单位。

第 3 章描述的原生 API 也提供了枚举每个进程中线程的功能。

缓存和缓存行(Caches and Cache Lines)

背景

在早期微处理器中,CPU 速度与内存速度相当。后来 CPU 速度大幅提升而内存速度滞后,导致 CPU 频繁等待内存数据。为弥补这一差距,在 CPU 和内存之间引入了缓存(Cache)。缓存是一种比主内存速度更快的高速存储器,可以显著减少 CPU 等待时间。

现代大多数 CPU 实现三级缓存:

  • 1 级缓存(L1):分为数据缓存(D-cache)和指令缓存(I-cache),每个逻辑处理器各有一套
  • 2 级缓存(L2):由同一物理核心的逻辑处理器共享
  • 3 级缓存(L3):系统范围共享

缓存容量约比主内存小 3 个数量级。此外还有 TLB(Translation Lookaside Buffer,转换后备缓冲器),是一种用于快速将虚拟地址转换为物理地址的 CPU 缓存,将在第 12 章讨论。

缓存行(Cache Line)

CPU 从内存读取数据时不是按字节读取,而是一次读取一整块,这个块称为缓存行(Cache Line),通常大小为 64 字节。理解这一机制对于编写高性能代码至关重要。

SumMatrix:行优先 vs 列优先遍历

以下两个函数对矩阵求和,算法复杂度相同但性能差异巨大:

cpp
// 行优先遍历(Row-major)
long long SumMatrix1(const Matrix<int>& m) {
    long long sum = 0;
    for (int r = 0; r < m.Rows(); ++r)
        for (int c = 0; c < m.Columns(); ++c)
            sum += m[r][c];

    return sum;
}

// 列优先遍历(Column-major)
long long SumMatrix2(const Matrix<int>& m) {
    long long sum = 0;
    for (int c = 0; c < m.Columns(); ++c)
        for (int r = 0; r < m.Rows(); ++r)
            sum += m[r][c];

    return sum;
}

测试结果(256x256 矩阵):行优先约 34 微秒,列优先约 81 微秒。随着矩阵增大差距急剧扩大,16384x16384 时行优先约 143 毫秒,列优先约 4562 毫秒(约 32 倍差异)。

原因分析:SumMatrix1 按行内连续的方式遍历,读取某个整数时其后续整数已在同一个缓存行中,可直接从缓存获取;SumMatrix2 按列跳跃式访问,每次读取的整数在内存中位置较远,需要加载不同的缓存行,且可能丢弃即将使用的数据(缓存污染)。

伪共享(False Sharing)

FalseSharing 项目演示了多线程统计数组中偶数数量的场景。每个线程处理数组的一个连续部分,统计结果存入另一个数组的对应单元格。

问题版本(CountEvenNumbers1——存在伪共享:

cpp
[](auto param) -> DWORD {
    auto d = (ThreadData*)param;
    auto start = d->start, end = d->end;
    auto counters = d->counters;
    auto data = d->data;
    for (; start < end; ++start)
        if (data[start] % 2 == 0)
            ++(*counters);  // 直接递增共享数组元素
    return 0;
}

优化版本(CountEvenNumbers2——使用局部变量消除伪共享:

cpp
[](auto param) -> DWORD {
    auto d = (ThreadData*)param;
    auto start = d->start, end = d->end;
    auto data = d->data;
    size_t count = 0;        // 局部变量在栈上
    for (; start < end; ++start)
        if (data[start] % 2 == 0)
            count++;

    *(d->counters) = count;  // 循环结束后一次性写入
    return 0;
}

伪共享的原理:当线程 A 写入某个计数器值时,它实际上写入了一整个缓存行。如果线程 B 的计数器恰好位于同一缓存行中(因为它们存储在连续数组中,而缓存行大小为 64 字节),那么线程 B 所在处理器上的对应缓存行会失效,迫使线程 B 下次访问时从主内存重新读取。这种"看似共享实则无关"的现象称为伪共享(False Sharing)

性能对比(8 线程):

  • 问题版本:约 2532 毫秒
  • 优化版本:约 855 毫秒

更值得注意的是,问题版本中增加线程数有时反而降低性能,而优化版性能随线程数持续提升。

优化原理:局部变量 count 分配在栈上(每个线程栈至少 4KB),不可能与其他线程的 count 位于同一缓存行,从而完全消除了伪共享。

等待链遍历(Wait Chain Traversal, WCT)

WCT API 可以识别多种死锁情况,从感兴趣的线程开始遍历等待链。链由线程和对象交替组成——每个线程等待其后的对象,该对象由链中下一个线程拥有。

可跟踪的同步对象包括:

  • 临界区(Critical Section)
  • 互斥锁(Mutex),含跨进程
  • ALPC(Advanced Local Procedure Call,Windows 组件使用的进程间通信机制)
  • SendMessage API
  • 对线程和进程的等待操作
  • COM 跨单元调用
  • 套接字和 SMB 操作

核心 API

cpp
// 打开 WCT 会话
HWCT OpenThreadWaitChainSession(DWORD Flags, PWAITCHAINCALLBACK callback);

// 对特定线程执行等待链分析
BOOL GetThreadWaitChain(HWCT WctHandle, DWORD_PTR Context, DWORD Flags,
    DWORD ThreadId, LPDWORD NodeCount,
    PWAITCHAIN_NODE_INFO NodeInfoArray, LPBOOL IsCycle);

// 关闭 WCT 会话
VOID CloseThreadWaitChainSession(HWCT WctHandle);
  • OpenThreadWaitChainSession 的 Flags 为 0 表示同步模式(分析线程阻塞直到完成),WCT_ASYNC_OPEN_FLAG(值为 1)表示异步模式,需要提供回调函数
  • GetThreadWaitChain 最大分析深度由 WCT_MAX_NODE_COUNT 定义(当前为 16)

WAITCHAIN_NODE_INFO 结构包含:

  • ObjectType:对象类型(WCT_OBJECT_TYPE 枚举)
  • ObjectStatus:对象状态(WCT_OBJECT_STATUS 枚举)
  • 联合体:LockObject(对象名称和超时信息)或 ThreadObject(进程 ID、线程 ID、等待时间、上下文切换次数)

WCT_OBJECT_TYPE 枚举值:CriticalSection、SendMessage、Mutex、ALPC、COM、ThreadWait、ProcessWait、Thread、ComActivation、Unknown、Socket、SMB。

WCT_OBJECT_STATUS 枚举值:NoAccess、Running、Blocked、PidOnly、PidOnlyRpcss、Owned、NotOwned、Abandoned、Unknown、Error。

死锁检测器(Deadlock Detector)应用程序

DeadlockDetector 是一个图形化死锁检测工具,主要功能:

  1. "进程"组合框选择要分析的进程
  2. 点击"检测死锁"后枚举所选进程中所有线程
  3. 对每个线程执行等待链分析
  4. 在树形视图中显示结果,根节点代表线程

核心操作流程:

cpp
LRESULT CMainDlg::OnDetect(WORD, WORD wID, HWND, BOOL&) {
    auto hWct = ::OpenThreadWaitChainSession(0, nullptr);
    if (hWct == nullptr) {
        AtlMessageBox(*this, L"Failed to open WCT session",
                      IDR_MAINFRAME, MB_ICONERROR);
        return 0;
    }

    auto pid = (DWORD)m_ProcCombo.GetItemData(m_ProcCombo.GetCurSel());
    auto threads = EnumThreads(pid);

    m_Tree.DeleteAllItems();
    int failures = 0;
    for (auto& tid : threads) {
        if (!DoWaitChain(hWct, tid))
            failures++;
    }

    if (failures == threads.size()) {
        AtlMessageBox(*this,
            L"Failed to analyze wait chain. (try running elevated)",
            IDR_MAINFRAME, MB_ICONEXCLAMATION);
    }

    ::CloseThreadWaitChainSession(hWct);
    return 0;
}

DoWaitChain 调用 GetThreadWaitChain 并传递结果给解析函数:

cpp
bool CMainDlg::DoWaitChain(HWCT hWct, DWORD tid) {
    WAITCHAIN_NODE_INFO nodes[WCT_MAX_NODE_COUNT];
    DWORD nodeCount = WCT_MAX_NODE_COUNT;
    BOOL cycle;
    auto success = ::GetThreadWaitChain(hWct, 0, WCTP_GETINFO_ALL_FLAGS,
                                        tid, &nodeCount, nodes, &cycle);
    if (success) {
        ParseThreadNodes(nodes, nodeCount, cycle);
    }
    return success;
}

ParseThreadNodes 解析节点并构建树形视图:

cpp
void CMainDlg::ParseThreadNodes(const WAITCHAIN_NODE_INFO* nodes,
    DWORD count, bool cycle) {
    static PCWSTR objectTypes[] = {
        L"Critical Section", L"Send Message", L"Mutex", L"ALPC",
        L"COM", L"Thread Wait", L"Process Wait", L"Thread",
        L"COM Activation", L"Unknown", L"Socket", L"SMB",
    };

    static PCWSTR statusTypes[] = {
        L"No Access", L"Running", L"Blocked", L"PID only",
        L"PID only RPCSS", L"Owned", L"Not Owned", L"Abandoned",
        L"Unknown", L"Error"
    };

    HTREEITEM hCurrentNode = TVI_ROOT;
    CString text;
    for (DWORD i = 0; i < count; i++) {
        auto& node = nodes[i];
        auto type = node.ObjectType;
        auto status = node.ObjectStatus;
        switch (type) {
            case WctThreadType:
                text.Format(L"Thread %u (PID: %u) Wait: %u (%s)",
                    node.ThreadObject.ThreadId,
                    node.ThreadObject.ProcessId,
                    node.ThreadObject.WaitTime,
                    statusTypes[status - 1]);
                break;

            case WctCriticalSectionType:
            case WctMutexType:
            case WctThreadWaitType:
            case WctProcessWaitType:
                text.Format(L"%s (%s) Name: %s",
                    objectTypes[type - 1],
                    statusTypes[status - 1],
                    node.LockObject.ObjectName);
                break;

            default:
                text.Format(L"%s (%s)", objectTypes[type - 1],
                    statusTypes[node.ObjectStatus - 1]);
                break;
        }

        auto hOld = hCurrentNode;
        hCurrentNode = m_Tree.InsertItem(text, hCurrentNode, TVI_LAST);
        m_Tree.Expand(hOld, TVE_EXPAND);
    }

    if (cycle) {
        m_Tree.InsertItem(L"Deadlock!", hCurrentNode, TVI_LAST);
        m_Tree.Expand(hCurrentNode, TVE_EXPAND);
    }
}

COM 分析需要特殊注册,在 OnInitDialog 中完成:

cpp
auto comLib = ::GetModuleHandle(L"ole32");
if (comLib) {
    ::RegisterWaitChainCOMCallback(
        (PCOGETCALLSTATE)::GetProcAddress(comLib, "CoGetCallState"),
        (PCOGETACTIVATIONSTATE)::GetProcAddress(comLib,
            "CoGetActivationState"));
}

异步 WCT 会话

异步会话需要提供符合特定原型的回调函数:

  • 参数:WctHandle、Context、CallbackStatus、NodeCount、NodeInfoArray、IsCycle
  • OpenThreadWaitChainSession 在异步模式下总是返回 FALSE
  • 如果 GetLastError 返回 ERROR_IO_PENDING 表示分析已成功启动
  • 分析完成后回调被调用
  • CallbackStatusERROR_SUCCESS 表示成功,常见失败原因是 ERROR_ACCESS_DENIED

用户模式调度(User Mode Scheduling, UMS)

第 6 章详细讨论了内核调度器。在某些极端场景下,用户模式控制调度更有优势,可以避免频繁的用户模式/内核模式切换带来的高成本。

Windows 过去提供**纤程(Fiber)**机制,但内核不识别纤程,导致 TLS 无法正确传播、TEB 结构与当前执行纤程不一致等问题。如今不应再使用纤程

从 Windows 7 和 Windows Server 2008 R2 开始,Windows 支持用户模式调度(User Mode Scheduling, UMS)——用户模式线程成为调度器,可在用户模式调度线程而无需模式转换。内核识别 UMS 机制,因此没有纤程的缺点。

文章未深入描述 UMS API,而是介绍了微软的并发运行时(Concurrency Runtime, ConcRT),它在幕后使用 UMS。

ConcRT 示例:质数计数

使用 concurrency::parallel_for 代替手动线程分区:

cpp
#include <ppl.h>

count = 0;
concurrency::parallel_for(from, to + 1, [&count](int n) {
    if (IsPrime(n))
        ::InterlockedIncrement((unsigned*)&count);
});

无需手动指定线程数量或分区方式,唯一需注意共享变量 count 使用 InterlockedIncrement 进行原子同步。

测试结果:手动分区 16 线程约 1985 毫秒,ConcRT 版本约 1640 毫秒。无论分配多少个线程,ConcRT 版本总是比手动分区表现更好,且创建的线程数量永远不会超过系统中的逻辑处理器数量

SQL Server 使用 UMS 来提高性能,因为它是一个高度多线程的服务器应用程序。

Init Once 一次性初始化

多线程环境中的单例模式需要保证只初始化一次。常见场景是多个线程可能同时通过 GetInstance() 访问单例,需确保初始化代码只执行一次。

C++11 及更高版本中,函数内的静态变量保证只初始化一次。C++11 还提供了 std::call_once 函数。

从 Windows 8 和 Server 2012 开始可用的 Windows 一次性初始化 API:

cpp
// 静态初始化 INIT_ONCE 变量
INIT_ONCE init = INIT_ONCE_STATIC_INIT;

// 或动态初始化
VOID InitOnceInitialize(PINIT_ONCE InitOnce);

// 执行一次性初始化
BOOL InitOnceExecuteOnce(
    PINIT_ONCE InitOnce,
    PINIT_ONCE_FN InitFn,
    PVOID Parameter,
    LPVOID* Context);

使用要点:

  • INIT_ONCE 变量可以通过 INIT_ONCE_STATIC_INIT 静态初始化
  • InitOnceExecuteOnce 调用初始化函数,保证只执行一次
  • 初始化函数原型返回 BOOL,成功返回 TRUE,失败返回 FALSE
  • 上下文地址必须按 4 字节对齐(最右边 2 位清零)

调试多线程应用程序

编写正确且高效的多线程应用程序并非易事。Visual Studio 提供了多种调试多线程程序的工具。

断点(Breakpoints)

在多线程程序中设置断点时,任何线程都可能触发。隔离特定线程的方法:

  • 使用"线程"窗口冻结其他线程
  • "在源代码中显示线程"选项在断点命中时添加线程图标标记
  • 断点可设置条件,包括按线程 ID 或线程名称过滤

并行堆栈(Parallel Stacks)

通过"调试/窗口/并行堆栈"访问。以图形化方式展示线程派生关系,可视化进程中所有正在运行的线程的调用堆栈,便于理解线程之间的层次关系。

并行监视(Parallel Watch)

通过类似路径访问。可同时显示多个运行相同代码的线程所使用的选定变量或表达式,每个线程有各自对应的值副本,方便对比不同线程中同一变量的状态。

线程名称

第 5 章介绍了 SetThreadDescription API 在代码中为线程命名。调试时也可在"线程"窗口中临时更改名称便于跟踪,但调试会话结束后会重置,因此最好在代码中调用 SetThreadDescription 设置持久化的线程名称。

练习

使用 concurrency 命名空间中的 parallel_for 创建计算曼德勃罗集(Mandelbrot Set)的控制台应用程序,并与第 5 章中手动线程分区的练习版本进行性能对比。

总结

本章涵盖了与高级线程处理相关的广泛主题:

  • 线程本地存储(TLS):动态 TLS 通过 TlsAlloc/TlsSetValue/TlsGetValue/TlsFree 四函数管理;静态 TLS 通过 __declspec(thread)thread_local 声明,编译器自动处理
  • 远程线程:通过 CreateRemoteThread 在其他进程中创建线程,利用系统 DLL 地址空间共享的特性
  • 线程枚举:通过 CreateToolhelp32Snapshot 配合 Thread32First/Thread32Next 枚举系统线程
  • 缓存与缓存行:理解缓存行机制对性能至关重要,行优先遍历远优于列优先;伪共享会导致多线程性能下降,可通过局部变量归并消除
  • 等待链遍历(WCT):强大的死锁分析 API,同步和异步两种模式,可跟踪临界区、互斥锁、ALPC、COM 等
  • 用户模式调度(UMS):比纤程更先进的用户模式调度机制,ConcRT 运行时在幕后使用
  • Init Once:Windows 8+ 提供的一次性初始化 API,保证线程安全
  • 多线程调试:断点条件过滤、并行堆栈、并行监视、线程命名等调试技术

下一章将探讨文件和设备的输入/输出操作。