小白将的博客

为人民日益增长的美好生活需要而读书


  • 首页

  • 关于

  • 分类

  • 归档

CUDA编程入门极简教程

发表于 2018-03-27

前言

2006年,NVIDIA公司发布了CUDA,CUDA是建立在NVIDIA的CPUs上的一个通用并行计算平台和编程模型,基于CUDA编程可以利用GPUs的并行计算引擎来更加高效地解决比较复杂的计算难题。近年来,GPU最成功的一个应用就是深度学习领域,基于GPU的并行计算已经成为训练深度学习模型的标配。目前,最新的CUDA版本为CUDA 9。

GPU并不是一个独立运行的计算平台,而需要与CPU协同工作,可以看成是CPU的协处理器,因此当我们在说GPU并行计算时,其实是指的基于CPU+GPU的异构计算架构。在异构计算架构中,GPU与CPU通过PCIe总线连接在一起来协同工作,CPU所在位置称为为主机端(host),而GPU所在位置称为设备端(device),如下图所示。

image.png-21.2kB

基于CPU+GPU的异构计算. 来源:Preofessional CUDA® C Programming

可以看到GPU包括更多的运算核心,其特别适合数据并行的计算密集型任务,如大型矩阵运算,而CPU的运算核心较少,但是其可以实现复杂的逻辑运算,因此其适合控制密集型任务。另外,CPU上的线程是重量级的,上下文切换开销大,但是GPU由于存在很多核心,其线程是轻量级的。因此,基于CPU+GPU的异构计算平台可以优势互补,CPU负责处理逻辑复杂的串行程序,而GPU重点处理数据密集型的并行计算程序,从而发挥最大功效。

image.png-63.3kB

基于CPU+GPU的异构计算应用执行逻辑. 来源:Preofessional CUDA® C Programming

CUDA是NVIDIA公司所开发的GPU编程模型,它提供了GPU编程的简易接口,基于CUDA编程可以构建基于GPU计算的应用程序。CUDA提供了对其它编程语言的支持,如C/C++,Python,Fortran等语言,这里我们选择CUDA C/C++接口对CUDA编程进行讲解。开发平台为Windows 10 + VS 2013,Windows系统下的CUDA安装教程可以参考这里。

image.png-116.1kB

CUDA编程模型支持的编程语言

CUDA编程模型基础

在给出CUDA的编程实例之前,这里先对CUDA编程模型中的一些概念及基础知识做个简单介绍。CUDA编程模型是一个异构模型,需要CPU和GPU协同工作。在CUDA中,host和device是两个重要的概念,我们用host指代CPU及其内存,而用device指代GPU及其内存。CUDA程序中既包含host程序,又包含device程序,它们分别在CPU和GPU上运行。同时,host与device之间可以进行通信,这样它们之间可以进行数据拷贝。典型的CUDA程序的执行流程如下:

  1. 分配host内存,并进行数据初始化;
  2. 分配device内存,并从host将数据拷贝到device上;
  3. 调用CUDA的核函数在device上完成指定的运算;
  4. 将device上的运算结果拷贝到host上;
  5. 释放device和host上分配的内存。

上面流程中最重要的一个过程是调用CUDA的核函数来执行并行计算,kernel是CUDA中一个重要的概念,kernel是在device上线程中并行执行的函数,核函数用__global__符号声明,在调用时需要用<<<grid, block>>>来指定kernel要执行的线程数量,在CUDA中,每一个线程都要执行核函数,并且每个线程会分配一个唯一的线程号thread ID,这个ID值可以通过核函数的内置变量threadIdx来获得。

由于GPU实际上是异构模型,所以需要区分host和device上的代码,在CUDA中是通过函数类型限定词开区别host和device上的函数,主要的三个函数类型限定词如下:

  • __global__:在device上执行,从host中调用(一些特定的GPU也可以从device上调用),返回类型必须是void,不支持可变参数参数,不能成为类成员函数。注意用__global__定义的kernel是异步的,这意味着host不会等待kernel执行完就执行下一步。
  • __device__:在device上执行,单仅可以从device中调用,不可以和__global__同时用。
  • __host__:在host上执行,仅可以从host上调用,一般省略不写,不可以和__global__同时用,但可和__device__,此时函数会在device和host都编译。

要深刻理解kernel,必须要对kernel的线程层次结构有一个清晰的认识。首先GPU上很多并行化的轻量级线程。kernel在device上执行时实际上是启动很多线程,一个kernel所启动的所有线程称为一个网格(grid),同一个网格上的线程共享相同的全局内存空间,grid是线程结构的第一层次,而网格又可以分为很多线程块(block),一个线程块里面包含很多线程,这是第二个层次。线程两层组织结构如下图所示,这是一个gird和block均为2-dim的线程组织。grid和block都是定义为dim3类型的变量,dim3可以看成是包含三个无符号整数(x,y,z)成员的结构体变量,在定义时,缺省值初始化为1。因此grid和block可以灵活地定义为1-dim,2-dim以及3-dim结构,对于图中结构(主要水平方向为x轴),定义的grid和block如下所示,kernel在调用时也必须通过执行配置<<<grid, block>>>来指定kernel所使用的线程数及结构。

dim3 grid(3, 2);
dim3 block(4, 3);
kernel_fun<<< grid, block >>>(prams...);

image.png-43.2kB

Kernel上的两层线程组织结构(2-dim)

所以,一个线程需要两个内置的坐标变量(blockIdx,threadIdx)来唯一标识,它们都是dim3类型变量,其中blockIdx指明线程所在grid中的位置,而threaIdx指明线程所在block中的位置,如图中的Thread (1,1)满足:

threadIdx.x = 1
threadIdx.y = 1
blockIdx.x = 1
blockIdx.y = 1

一个线程块上的线程是放在同一个流式多处理器(SM)上的,但是单个SM的资源有限,这导致线程块中的线程数是有限制的,现代GPUs的线程块可支持的线程数可达1024个。有时候,我们要知道一个线程在blcok中的全局ID,此时就必须还要知道block的组织结构,这是通过线程的内置变量blockDim来获得。它获取线程块各个维度的大小。对于一个2-dim的block$(D_x, D_y)$,线程$(x, y)$的ID值为$(x + y D_x)$,如果是3-dim的block$(D_x, D_y, D_z)$,线程$(x, y, z)$的ID值为$(x + y D_x + z D_x D_y)$。另外线程还有内置变量gridDim,用于获得网格块各个维度的大小。

kernel的这种线程组织结构天然适合vector,matrix等运算,如我们将利用上图2-dim结构实现两个矩阵的加法,每个线程负责处理每个位置的两个元素相加,代码如下所示。线程块大小为(16, 16),然后将N*N大小的矩阵均分为不同的线程块来执行加法运算。

// Kernel定义
__global__ void MatAdd(float A[N][N], float B[N][N], float C[N][N]) 
{ 
    int i = blockIdx.x * blockDim.x + threadIdx.x; 
    int j = blockIdx.y * blockDim.y + threadIdx.y; 
    if (i < N && j < N) 
        C[i][j] = A[i][j] + B[i][j]; 
}
int main() 
{ 
    ...
    // Kernel 线程配置
    dim3 threadsPerBlock(16, 16); 
    dim3 numBlocks(N / threadsPerBlock.x, N / threadsPerBlock.y);
    // kernel调用
    MatAdd<<<numBlocks, threadsPerBlock>>>(A, B, C); 
    ...
}

此外这里简单介绍一下CUDA的内存模型,如下图所示。可以看到,每个线程有自己的私有本地内存(Local Memory),而每个线程块有包含共享内存(Shared Memory),可以被线程块中所有线程共享,其生命周期与线程块一致。此外,所有的线程都可以访问全局内存(Global Memory)。还可以访问一些只读内存块:常量内存(Constant Memory)和纹理内存(Texture Memory)。内存结构涉及到程序优化,这里不深入探讨它们。

image.png-53kB

CUDA内存模型

还有重要一点,你需要对GPU的硬件实现有一个基本的认识。上面说到了kernel的线程组织层次,那么一个kernel实际上会启动很多线程,这些线程是逻辑上并行的,但是在物理层却并不一定。这其实和CPU的多线程有类似之处,多线程如果没有多核支持,在物理层也是无法实现并行的。但是好在GPU存在很多CUDA核心,充分利用CUDA核心可以充分发挥GPU的并行计算能力。GPU硬件的一个核心组件是SM,前面已经说过,SM是英文名是 Streaming Multiprocessor,翻译过来就是流式多处理器。SM的核心组件包括CUDA核心,共享内存,寄存器等,SM可以并发地执行数百个线程,并发能力就取决于SM所拥有的资源数。当一个kernel被执行时,它的gird中的线程块被分配到SM上,一个线程块只能在一个SM上被调度。SM一般可以调度多个线程块,这要看SM本身的能力。那么有可能一个kernel的各个线程块被分配多个SM,所以grid只是逻辑层,而SM才是执行的物理层。SM采用的是SIMT (Single-Instruction, Multiple-Thread,单指令多线程)架构,基本的执行单元是线程束(wraps),线程束包含32个线程,这些线程同时执行相同的指令,但是每个线程都包含自己的指令地址计数器和寄存器状态,也有自己独立的执行路径。所以尽管线程束中的线程同时从同一程序地址执行,但是可能具有不同的行为,比如遇到了分支结构,一些线程可能进入这个分支,但是另外一些有可能不执行,它们只能死等,因为GPU规定线程束中所有线程在同一周期执行相同的指令,线程束分化会导致性能下降。当线程块被划分到某个SM上时,它将进一步划分为多个线程束,因为这才是SM的基本执行单元,但是一个SM同时并发的线程束数是有限的。这是因为资源限制,SM要为每个线程块分配共享内存,而也要为每个线程束中的线程分配独立的寄存器。所以SM的配置会影响其所支持的线程块和线程束并发数量。总之,就是网格和线程块只是逻辑划分,一个kernel的所有线程其实在物理层是不一定同时并发的。所以kernel的grid和block的配置不同,性能会出现差异,这点是要特别注意的。还有,由于SM的基本执行单元是包含32个线程的线程束,所以block大小一般要设置为32的倍数。

image.png-45.6kB

CUDA编程的逻辑层和物理层

在进行CUDA编程前,可以先检查一下自己的GPU的硬件配置,这样才可以有的放矢,可以通过下面的程序获得GPU的配置属性:

int dev = 0;
cudaDeviceProp devProp;
CHECK(cudaGetDeviceProperties(&devProp, dev));
std::cout << "使用GPU device " << dev << ": " << devProp.name << std::endl;
std::cout << "SM的数量:" << devProp.multiProcessorCount << std::endl;
std::cout << "每个线程块的共享内存大小:" << devProp.sharedMemPerBlock / 1024.0 << " KB" << std::endl;
std::cout << "每个线程块的最大线程数:" << devProp.maxThreadsPerBlock << std::endl;
std::cout << "每个EM的最大线程数:" << devProp.maxThreadsPerMultiProcessor << std::endl;
std::cout << "每个EM的最大线程束数:" << devProp.maxThreadsPerMultiProcessor / 32 << std::endl;

// 输出如下
使用GPU device 0: GeForce GT 730
SM的数量:2
每个线程块的共享内存大小:48 KB
每个线程块的最大线程数:1024
每个EM的最大线程数:2048
每个EM的最大线程束数:64

好吧,GT 730显卡确实有点渣,只有2个SM,呜呜……

向量加法实例

知道了CUDA编程基础,我们就来个简单的实战,利用CUDA编程实现两个向量的加法,在实现之前,先简单介绍一下CUDA编程中内存管理API。首先是在device上分配内存的cudaMalloc函数:

cudaError_t cudaMalloc(void** devPtr, size_t size);

这个函数和C语言中的malloc类似,但是在device上申请一定字节大小的显存,其中devPtr是指向所分配内存的指针。同时要释放分配的内存使用cudaFree函数,这和C语言中的free函数对应。另外一个重要的函数是负责host和device之间数据通信的cudaMemcpy函数:

cudaError_t cudaMemcpy(void* dst, const void* src, size_t count, cudaMemcpyKind kind)

其中src指向数据源,而dst是目标区域,count是复制的字节数,其中kind控制复制的方向:cudaMemcpyHostToHost, cudaMemcpyHostToDevice, cudaMemcpyDeviceToHost及cudaMemcpyDeviceToDevice,如cudaMemcpyHostToDevice将host上数据拷贝到device上。

现在我们来实现一个向量加法的实例,这里grid和block都设计为1-dim,首先定义kernel如下:

// 两个向量加法kernel,grid和block均为一维
__global__ void add(float* x, float * y, float* z, int n)
{
    // 获取全局索引
    int index = threadIdx.x + blockIdx.x * blockDim.x;
    // 步长
    int stride = blockDim.x * gridDim.x;
    for (int i = index; i < n; i += stride)
    {
        z[i] = x[i] + y[i];
    }
}

其中stride是整个grid的线程数,有时候向量的元素数很多,这时候可以将在每个线程实现多个元素(元素总数/线程总数)的加法,相当于使用了多个grid来处理,这是一种grid-stride loop方式,不过下面的例子一个线程只处理一个元素,所以kernel里面的循环是不执行的。下面我们具体实现向量加法:

int main()
{
    int N = 1 << 20;
    int nBytes = N * sizeof(float);
    // 申请host内存
    float *x, *y, *z;
    x = (float*)malloc(nBytes);
    y = (float*)malloc(nBytes);
    z = (float*)malloc(nBytes);

    // 初始化数据
    for (int i = 0; i < N; ++i)
    {
        x[i] = 10.0;
        y[i] = 20.0;
    }

    // 申请device内存
    float *d_x, *d_y, *d_z;
    cudaMalloc((void**)&d_x, nBytes);
    cudaMalloc((void**)&d_y, nBytes);
    cudaMalloc((void**)&d_z, nBytes);

    // 将host数据拷贝到device
    cudaMemcpy((void*)d_x, (void*)x, nBytes, cudaMemcpyHostToDevice);
    cudaMemcpy((void*)d_y, (void*)y, nBytes, cudaMemcpyHostToDevice);
    // 定义kernel的执行配置
    dim3 blockSize(256);
    dim3 gridSize((N + blockSize.x - 1) / blockSize.x);
    // 执行kernel
    add << < gridSize, blockSize >> >(d_x, d_y, d_z, N);

    // 将device得到的结果拷贝到host
    cudaMemcpy((void*)z, (void*)d_z, nBytes, cudaMemcpyHostToDevice);

    // 检查执行结果
    float maxError = 0.0;
    for (int i = 0; i < N; i++)
        maxError = fmax(maxError, fabs(z[i] - 30.0));
    std::cout << "最大误差: " << maxError << std::endl;

    // 释放device内存
    cudaFree(d_x);
    cudaFree(d_y);
    cudaFree(d_z);
    // 释放host内存
    free(x);
    free(y);
    free(z);

    return 0;
}

这里我们的向量大小为1<<20,而block大小为256,那么grid大小是4096,kernel的线程层级结构如下图所示:

image.png-102.1kB

kernel的线程层次结构. 来源:https://devblogs.nvidia.com/even-easier-introduction-cuda/

使用nvprof工具可以分析kernel运行情况,结果如下所示,可以看到kernel函数费时约1.5ms。

nvprof cuda9.exe
==7244== NVPROF is profiling process 7244, command: cuda9.exe
最大误差: 4.31602e+008
==7244== Profiling application: cuda9.exe
==7244== Profiling result:
            Type  Time(%)      Time     Calls       Avg       Min       Max  Name
 GPU activities:   67.57%  3.2256ms         2  1.6128ms  1.6017ms  1.6239ms  [CUDA memcpy HtoD]
                   32.43%  1.5478ms         1  1.5478ms  1.5478ms  1.5478ms  add(float*, float*, float*, int)

你调整block的大小,对比不同配置下的kernel运行情况,我这里测试的是当block为128时,kernel费时约1.6ms,而block为512时kernel费时约1.7ms,当block为64时,kernel费时约2.3ms。看来不是block越大越好,而要适当选择。

在上面的实现中,我们需要单独在host和device上进行内存分配,并且要进行数据拷贝,这是很容易出错的。好在CUDA 6.0引入统一内存(Unified Memory)来避免这种麻烦,简单来说就是统一内存使用一个托管内存来共同管理host和device中的内存,并且自动在host和device中进行数据传输。CUDA中使用cudaMallocManaged函数分配托管内存:

cudaError_t cudaMallocManaged(void **devPtr, size_t size, unsigned int flag=0);

利用统一内存,可以将上面的程序简化如下:

int main()
{
    int N = 1 << 20;
    int nBytes = N * sizeof(float);

    // 申请托管内存
    float *x, *y, *z;
    cudaMallocManaged((void**)&x, nBytes);
    cudaMallocManaged((void**)&y, nBytes);
    cudaMallocManaged((void**)&z, nBytes);

    // 初始化数据
    for (int i = 0; i < N; ++i)
    {
        x[i] = 10.0;
        y[i] = 20.0;
    }

    // 定义kernel的执行配置
    dim3 blockSize(256);
    dim3 gridSize((N + blockSize.x - 1) / blockSize.x);
    // 执行kernel
    add << < gridSize, blockSize >> >(x, y, z, N);

    // 同步device 保证结果能正确访问
    cudaDeviceSynchronize();
    // 检查执行结果
    float maxError = 0.0;
    for (int i = 0; i < N; i++)
        maxError = fmax(maxError, fabs(z[i] - 30.0));
    std::cout << "最大误差: " << maxError << std::endl;

    // 释放内存
    cudaFree(x);
    cudaFree(y);
    cudaFree(z);

    return 0;
}

相比之前的代码,使用统一内存更简洁了,值得注意的是kernel执行是与host异步的,由于托管内存自动进行数据传输,这里要用cudaDeviceSynchronize()函数保证device和host同步,这样后面才可以正确访问kernel计算的结果。

矩阵乘法实例

最后我们再实现一个稍微复杂一些的例子,就是两个矩阵的乘法,设输入矩阵为$A$和$B$,要得到$C=A\times B$。实现思路是每个线程计算$C$的一个元素值$C_{i,j}$,对于矩阵运算,应该选用grid和block为2-D的。首先定义矩阵的结构体:

// 矩阵类型,行优先,M(row, col) = *(M.elements + row * M.width + col)
struct Matrix
{
    int width;
    int height;
    float *elements;
    Matrix(int w, int h, float* e = NULL)
    {
        width = w;
        height = h;
        elements = e;
    }

};

image.png-21.4kB

矩阵乘法实现模式

然后实现矩阵乘法的核函数,这里我们定义了两个辅助的__device__函数分别用于获取矩阵的元素值和为矩阵元素赋值,具体代码如下:

// 获取矩阵A的(row, col)元素
__device__ float getElement(const Matrix A, int row, int col)
{
    return A.elements[row * A.width + col];
}

// 为矩阵A的(row, col)元素赋值
__device__ void setElement(Matrix A, int row, int col, float value)
{
    A.elements[row * A.width + col] = value;
}

// 矩阵相乘kernel,2-D,每个线程计算一个元素
__global__ void matMulKernel(const Matrix A, const Matrix B, Matrix C)
{
    float Cvalue = 0.0;
    int row = threadIdx.y + blockIdx.y * blockDim.y;
    int col = threadIdx.x + blockIdx.x * blockDim.x;
    for (int i = 0; i < A.width; ++i)
    {
        Cvalue += getElement(A, row, i) * getElement(B, i, col);
    }
    setElement(C, row, col, Cvalue);
}

最后我们采用统一内存编写矩阵相乘的测试实例:

int main()
{
    int width = 1 << 10;
    int height = 1 << 10;
    Matrix A(width, height, NULL);
    Matrix B(width, height, NULL);
    Matrix C(width, height, NULL);

    int nBytes = width * height * sizeof(float);
    // 申请托管内存
    cudaMallocManaged((void**)&A.elements, nBytes);
    cudaMallocManaged((void**)&B.elements, nBytes);
    cudaMallocManaged((void**)&C.elements, nBytes);

    // 初始化数据
    for (int i = 0; i < width * height; ++i)
    {
        A.elements[i] = 1.0;
        B.elements[i] = 2.0;
    }

    // 定义kernel的执行配置
    dim3 blockSize(32, 32);
    dim3 gridSize((width + blockSize.x - 1) / blockSize.x, 
        (height + blockSize.y - 1) / blockSize.y);
    // 执行kernel
    matMulKernel << < gridSize, blockSize >> >(A, B, C);


    // 同步device 保证结果能正确访问
    cudaDeviceSynchronize();
    // 检查执行结果
    float maxError = 0.0;
    for (int i = 0; i < width * height; i++)

        maxError = fmax(maxError, fabs(C.elements[i] - 2 * width));
    std::cout << C.elements[0] << std::endl;
    std::cout << "最大误差: " << maxError << std::endl;

    return 0;
}

这里矩阵大小为$1024\times 1024$,设计的线程的block大小为(32, 32),那么grid大小为(32, 32),最终测试结果如下:

nvprof cuda9.exe
==2456== NVPROF is profiling process 2456, command: cuda9.exe
最大误差: 0
==2456== Profiling application: cuda9.exe
==2456== Profiling result:
            Type  Time(%)      Time     Calls       Avg       Min       Max  Name
 GPU activities:  100.00%  2.67533s         1  2.67533s  2.67533s  2.67533s  matMulKernel(Matrix, Matrix, Matrix)
      API calls:   92.22%  2.67547s         1  2.67547s  2.67547s  2.67547s  cudaDeviceSynchronize
                    6.06%  175.92ms         3  58.640ms  2.3933ms  170.97ms  cudaMallocManaged
                    1.65%  47.845ms         1  47.845ms  47.845ms  47.845ms  cudaLaunch
                    0.05%  1.4405ms        94  15.324us       0ns  938.54us  cuDeviceGetAttribute
                    0.01%  371.49us         1  371.49us  371.49us  371.49us  cuDeviceGetName
                    0.00%  13.474us         1  13.474us  13.474us  13.474us  cuDeviceTotalMem
                    0.00%  6.9300us         1  6.9300us  6.9300us  6.9300us  cudaConfigureCall
                    0.00%  3.8500us         3  1.2830us     385ns  1.9250us  cuDeviceGetCount
                    0.00%  3.4650us         3  1.1550us       0ns  2.3100us  cudaSetupArgument
                    0.00%  2.3100us         2  1.1550us     385ns  1.9250us  cuDeviceGet

==2456== Unified Memory profiling result:
Device "GeForce GT 730 (0)"
   Count  Avg Size  Min Size  Max Size  Total Size  Total Time  Name
    2048  4.0000KB  4.0000KB  4.0000KB  8.000000MB  22.70431ms  Host To Device
     266  46.195KB  32.000KB  1.0000MB  12.00000MB  7.213048ms  Device To Host

当然,这不是最高效的实现,后面可以继续优化…

小结

最后只有一句话:CUDA入门容易,但是深入难!希望不是从入门到放弃…

参考资料

  1. John Cheng, Max Grossman, Ty McKercher. Professional CUDA C Programming, 2014.
  2. CUDA docs.
  3. An Even Easier Introduction to CUDA

实例介绍TensorFlow的输入流水线

发表于 2018-03-27

前言

在训练模型时,我们首先要处理的就是训练数据的加载与预处理的问题,这里称这个过程为输入流水线(input pipelines,或输入管道,参考)。在TensorFlow中,典型的输入流水线包含三个流程(ETL流程):

  1. 提取(Extract):从存储介质(如硬盘)中读取数据,可能是本地读取,也可能是远程读取(比如在分布式存储系统HDFS)
  2. 预处理(Transform):利用CPU处理器解析和预处理提取的数据,如图像解压缩,数据扩增或者变换,然后会做random shuffle,并形成batch。
  3. 加载(load):将预处理后的数据加载到加速设备中(如GPUs)来执行模型的训练。

输入流水线对于加速模型训练还是很重要的,如果你的CPU处理数据能力跟不上GPU的处理速度,此时CPU预处理数据就成为了训练模型的瓶颈环节。除此之外,上述输入流水线本身也有很多优化的地方。比如,一个典型的模型训练过程中,CPU预处理数据时,GPU是闲置的,当GPU训练模型时,CPU是闲置的,这个过程如下所示:

image

这样一个训练step中所花费的时间是CPU预处理数据和GPU训练模型时间的总和。显然这个过程中有资源浪费,一个改进的方法就是交叉CPU数据处理和GPU模型训练这两个过程,当GPU处于第$N$个训练阶段,CPU正在准备第$N+1$步所需的数据,如下图所示:

image_1c8udsv2lq3ha6b9fm17nd8i716.png-21.2kB

明显上述设计可以充分最大化利用CPU和GPU,从而减少资源的闲置。另外当存在多个CPU核心时,这又会涉及到CPU的并行化技术(多线程)来加速数据预处理过程,因为每个训练样本的预处理过程往往是互相独立的。关于输入流程线的优化可以参考TensorFlow官网上的Input Pipeline Performance Guide,相信你会受益匪浅。

幸运的是,最新的TensorFlow版本提供了tf.data这一套APIs来帮助我们快速实现高效又灵活的输入流水线。在TensorFlow中最常见的加载训练数据的方式是通过Feeding方式,其主要是定义placeholder,然后将通过Session.run()的feed_dict参数送入数据,但是这其实是最低效的加载数据方式。后来,TensorFlow增加了QueueRunner机制,其主要是基于文件队列以及多线程技术,实现了更高效的输入流水线,但是其APIs很是让人难懂,所以就有了现在的tf.data来替代它。

这里我们通过mnist实例来讲解如何使用tf.data建立简洁而高效的输入流水线,在介绍之前,我们先介绍如何制作TFRecords文件,这是TensorFlow支持的一种标准文件格式

制作TFRecords文件

TFRecords文件是TensorFlow中的标准数据格式,它是基于protobuf的二进制文件,每个TFRecord文件的基本元素是tf.train.Example,其对应的是数据集中的一个样本数据,每个Example包含Features,存储该样本的各个feature,每个feature包含一个键值对,分别对应feature的特征名与实际值。下面是一个Example实例(参考):

// An Example for a movie recommendation application:
   features {
     feature {
       key: "age"
       value { float_list {
         value: 29.0
       }}
     }
     feature {
       key: "movie"
       value { bytes_list {
         value: "The Shawshank Redemption"
         value: "Fight Club"
       }}
     }
     feature {
       key: "movie_ratings"
       value { float_list {
         value: 9.0
         value: 9.7
       }}
     }
     feature {
       key: "suggestion"
       value { bytes_list {
         value: "Inception"
       }}
     }
     feature {
       key: "suggestion_purchased"
       value { float_list {
         value: 1.0
       }}
    }
     feature {
       key: "purchase_price"
       value { float_list {
         value: 9.99
       }}
     }
  }

上面是一个电影推荐系统中的一个样本,可以看到它共含有6个特征,每个特征都是key-value类型,key是特征名,而value是特征值,值得注意的是value其实存储的是一个list,根据数据类型共分为三种:bytes_list, float_list和int64_list,分别存储字节、浮点及整数类型(见这里)。

作为标准数据格式,TensorFlow当然提供了创建TFRecords文件的python接口,下面我们创建mnist数据集对应的TFRecords文件。对于mnist数据集,每个Example需要存储两个feature,一个是图像的像素值,这里可以用bytes类型,因为一个像素点正好可以用一个字节存储,另外是图像的标签值,只能用int64类型存储了。因此,我们先定义这两个类型的接口函数:

# int64
def _int64_feature(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

# bytes
def _bytes_feature(value):
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

创建TFRecord文件,主要通过TF中的tf.python_io.TFRecordWriter函数来实现,具体代码如下:

def convert_to_TFRecords(dataset, name):
    """Convert mnist dataset to TFRecords"""
    images, labels = dataset.images, dataset.labels
    n_examples = dataset.num_examples

    filename = os.path.join(DIR, name + ".tfrecords")
    print("Writing", filename)

    with tf.python_io.TFRecordWriter(filename) as writer:
        for index in range(n_examples):
            image_bytes = images[index].tostring()
            label = labels[index]
            example = tf.train.Example(features=tf.train.Features(
                feature={"image": _bytes_feature(image_bytes),
                         "label": _int64_feature(label)}))
            writer.write(example.SerializeToString())

对于mnist数据集,主要分为train、validation和test,利用上面的函数分别创建三个不同的TFRecords文件:

mnist_datasets = mnist.read_data_sets("mnist_data", dtype=tf.uint8, reshape=False)
convert_to_TFRecords(mnist_datasets.train, "train")
convert_to_TFRecords(mnist_datasets.validation, "validation")
convert_to_TFRecords(mnist_datasets.test, "test")

好了,这样我们就创建3个TFRecords文件了。

读取TFRecords文件

上面我们创建了TFRecords文件,但是怎么去读取它们呢,当然TF提供了读取TFRecords文件的接口函数,这里首先介绍如何利用TF中操作TFRecord的python接口来读取TFRecord文件,主要是tf.python_io.tf_record_iterator函数,它输入TFRecord文件,但是得到一个迭代器,每个元素是一个Example,但是却是一个字符串,这里可以用tf.train.Example来解析它,具体代码如下:

def read_TFRecords_test(name):
    filename = os.path.join(DIR, name + ".tfrecords")
    record_itr = tf.python_io.tf_record_iterator(path=filename)
    for r in record_itr:
        example = tf.train.Example()
        example.ParseFromString(r)

        label = example.features.feature["label"].int64_list.value[0]
        print("Label", label)
        image_bytes = example.features.feature["image"].bytes_list.value[0]
        img = np.fromstring(image_bytes, dtype=np.uint8).reshape(28, 28)
        print(img)
        plt.imshow(img, cmap="gray")
        plt.show()
        break  # 只读取一个Example

上面仅是纯python的读取方式,这不是TFRecords文件的正确使用方式。既然是官方标准数据格式,TF也提供了使用TFRecords文件建立输入流水线的方式。在tf.data出现之前,使用的是QueueRunner方式,即文件队列机制,其原理如下图所示:

AnimatedFileQueues.gif-503.9kB

文件队列机制主要分为两个阶段:第一个阶段将输入文件打乱,并在文件队列入列,然后Reader从文件队列中读取一个文件,同时文件队列出列这个文件,Reader同时对文件进行解码,然后生产数据样本,并将样本在样本队列中入列,可以定义多个Reader并发地从多个文件同时读取数据。从样本队列中的出列一定量的样本数据即可以用于一个训练过程。TF提供了配套的API来完成这个过程,注意的是这个输入流水线是直接嵌入训练的Graph中,即是整个图模型的一部分。根据文件的不同,可以使用不同类型的Reader,对于TFRecord文件,可以使用tf.TFRecordReader,下面是具体的实现代码:

def read_example(filename_queue):
    """Read one example from filename_queue"""
    reader = tf.TFRecordReader()
    key, value = reader.read(filename_queue)
    features = tf.parse_single_example(value, features={"image": tf.FixedLenFeature([], tf.string),
                                                        "label": tf.FixedLenFeature([], tf.int64)})
    image = tf.decode_raw(features["image"], tf.uint8)
    image = tf.reshape(image, [28, 28])
    label = tf.cast(features["label"], tf.int32)
    return image, label

if __name__ == "__main__":
    queue = tf.train.string_input_producer(["TFRecords/train.tfrecords"], num_epochs=10)
    image, label = read_example(queue)

    img_batch, label_batch = tf.train.shuffle_batch([image, label], batch_size=32, capacity=5000,
                                                    min_after_dequeue=2000, num_threads=4)
    with tf.Session() as sess:
        sess.run(tf.local_variables_initializer())
        sess.run(tf.global_variables_initializer())

        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)

        try:
            while not coord.should_stop():
                # Run training steps or whatever
                images, labels = sess.run([img_batch, label_batch])
                print(images.shape, labels.shape)

        except tf.errors.OutOfRangeError:
            print('Done training -- epoch limit reached')

        coord.request_stop()
        coord.join(threads)

对于队列机制,估计大家看的云里雾里的,代码确实让人难懂,但是其实只要按照官方提供的标准代码,还是很容易在自己的数据集上进行修改的。不过现在有了tf.data,可以更加优雅地实现上面的过程。

tf.data简介

使用tf.data可以更方便地创建高效的输入流水线,但是其相比队列机制API更友好,这主要是因为tf.data提供了高级抽象。第一个抽象是使用tf.data.Dataset来表示一个数据集合,集合里面的每个元素包含一个或者多个Tensor,一般就是对应一个训练样本。第二个抽象是使用tf.data.Iterator来从数据集中提取数据,这是一个迭代器对象,可以通过Iterator.get_next()从Dataset中产生一个样本。利用这两个抽象,Dataset的使用简化为三个步骤:

  1. 创建Dataset实例对象;
  2. 创建遍历Dataset的Iterator实例对象;
  3. 从Iterator中不断地产生样本,并送入模型中进行训练。

创建Dataset

TF提供了很多方式创建Dataset,下面是几种方式:

# 从Numpy的array
dataset1 = tf.data.Dataset.from_tensor_slices(np.random.randn((5, 10))
print(dataset1.output_types)  # ==> "tf.float32"
print(dataset1.output_shapes)  # ==> "(10,)"

# 从Tensor
dataset2 = tf.data.Dataset.from_tensor_slices((tf.random_uniform([4]),
                                  tf.random_uniform([4, 100], maxval=100, dtype=tf.int32)))
print(dataset2.output_types)  # ==> "(tf.float32, tf.int32)"
print(dataset2.output_shapes)  # ==> "((), (100,))"

# 从文件
filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset3 = tf.data.TFRecordDataset(filenames)

更重要的是Dataset可以进行一系列的变换操作,并且支持链式调用,这对于数据预处理很重要:

dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(...)  # 解析数据或者对数据预处理,如normalize.
dataset = dataset.repeat()  # 重复数据集,一般设置num_epochs
dataset = dataset.batch(32) # 形成batch

创建Iterator

创建了Dataset之后,我们需要创建Iterator来遍历数据集,返回的是迭代器对象,并从中可以产生数据,以用于模型训练。TF共支持4中迭代器类型,分别是one-shot, initializable, reinitializable和feedable。下面逐个介绍它们。

One-shot Iterator

这是最简单的Iterator,它仅仅遍历整个数据集一次,而且不需要显示初始化,下面是个实例:

dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()

with tf.Session() as sess:
    for i in range(10):
        sess.run(next_element) # 0, 1, ..., 9

Initializable Iterator

相比one-shot Iterator,它需要在使用前显示初始化,这样就可以支持参数化,每次初始化时送入不同的参数,就可以支持数据集的简单参数化,下面是一个实例:

max_value = tf.placeholder(tf.int64, [])
dataset = tf.data.Dataset.range(max_value)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()

with tf.Session() as sess:
    # 需要显示初始化
    sess.run(iterator.initializer, feed_dict={max_value: 10})
    for i in range(10):
        print(sess.run(next_element)) # 0, 1, ..., 9

Reinitializable Iterator

相比initializable Iterator,它可以支持从不同的Dataset进行初始化,有时候你需要训练集和测试集,但是两者并不同,此时就可以定义两个不同的Dataset,并配合reinitializable Iterator来定义一个通用的迭代器,在使用前只需要送入不同的Dataset进行初始化就可以,下面是一个实例:

train_data = np.random.randn(100, 5)
test_data = np.random.randn(20, 5)
train_dataset = tf.data.Dataset.from_tensor_slices(train_data)
test_dataset = tf.data.Dataset.from_tensor_slices(test_data)

# 创建一个reinitializable iterator
re_iterator = tf.data.Iterator.from_structure(train_dataset.output_types,
                                              train_dataset.output_shapes)
next_element = re_iterator.get_next()
train_init_op = re_iterator.make_initializer(train_dataset)
test_init_op = re_iterator.make_initializer(test_dataset)

with tf.Session() as sess:
    # 训练
    n_epochs = 2
    for i in range(n_epochs):
        sess.run(train_init_op)
        for j in range(100):
            print(sess.run(next_element))
    # 测试
    sess.run(test_init_op)
    for i in range(20):
        print(sess.run(next_element))

Feedable Iterator

对于reinitializable iterator,它可以支持送入不同Dataset,从而完成数据集的切换,但是每次切换时必须要重新初始化。对于Feedable Iterator,其可以认为支持送入不同的Iterator,通过切换迭代器的string handle来完成不同数据集的切换,并且在切换时迭代器的状态还会被保留,这相比reinitializable iterator更加灵活,下面是一个实例:

train_data = np.random.randn(100, 5)
val_data = np.random.randn(20, 5)
n_epochs = 20
train_dataset = tf.data.Dataset.from_tensor_slices(train_data).repeat(n_epochs)
val_dataset = tf.data.Dataset.from_tensor_slices(val_data)

# 创建一个feedable iterator
handle = tf.placeholder(tf.string, [])
feed_iterator = tf.data.Iterator.from_string_handle(handle, train_dataset.output_types,
                                                  train_dataset.output_shapes)
next_element = feed_iterator.get_next()

# 创建不同的iterator
train_iterator = train_dataset.make_one_shot_iterator()
val_iterator = val_dataset.make_initializable_iterator()

with tf.Session() as sess:
    # 生成对应的handle
    train_handle = sess.run(train_iterator.string_handle())
    val_handle = sess.run(val_iterator.string_handle())

    # 训练
    for n in range(n_epochs):
        for i in range(100):
            print(i, sess.run(next_element, feed_dict={handle: train_handle}))
        # 验证
        if n % 10 == 0:
            sess.run(val_iterator.initializer)
            for i in range(20):
                print(sess.run(next_element, feed_dict={handle: val_handle}))

关于tf.data的基础知识就这么多了,更多内容可以参考官方文档,另外这里要说一点就是,对于迭代器对象,当其元素取尽之后,会抛出tf.errors.OutOfRangeError错误,当然一般情况下你是知道自己的迭代器对象的元素数,那么也就可以不用通过捕获错误来实现终止条件。下面,我们将使用tf.data实现mnist的完整训练过程。

MNIST完整实例

我们采用feedable Iterator来实现mnist数据集的训练过程,分别创建两个Dataset,一个为训练集,一个为验证集,对于验证集不需要shuffle操作。首先我们创建Dataset对象的辅助函数,主要是解析TFRecords文件,并对image做归一化处理:

def decode(serialized_example):
    """decode the serialized example"""
    features = tf.parse_single_example(serialized_example,
                            features={"image": tf.FixedLenFeature([], tf.string),
                                      "label": tf.FixedLenFeature([], tf.int64)})
    image = tf.decode_raw(features["image"], tf.uint8)
    image = tf.cast(image, tf.float32)
    image = tf.reshape(image, [784])
    label = tf.cast(features["label"], tf.int64)
    return image, label

def normalize(image, label):
    """normalize the image to [-0.5, 0.5]"""
    image = image / 255.0 - 0.5
    return image, label

然后定义创建Dataset的函数,对于训练集和验证集,两者的参数会不同:

def create_dataset(filename, batch_size=64, is_shuffle=False, n_repeats=0):
    """create dataset for train and validation dataset"""
    dataset = tf.data.TFRecordDataset(filename)
    if n_repeats > 0:
        dataset = dataset.repeat(n_repeats) # for train

    dataset = dataset.map(decode).map(normalize) # decode and normalize

    if is_shuffle:
        dataset = dataset.shuffle(1000 + 3 * batch_size) # shuffle
    dataset = dataset.batch(batch_size)
    return dataset

我们使用一个简单的全连接层网络来实现mnist的分类模型:

def model(inputs, hidden_sizes=(500, 500)):
    h1, h2 = hidden_sizes
    net = tf.layers.dense(inputs, h1, activation=tf.nn.relu)
    net = tf.layers.dense(net, h2, activation=tf.nn.relu)
    net = tf.layers.dense(net, 10, activation=None)
    return net

然后是训练的主体代码:

n_train_examples = 55000
n_val_examples = 5000
n_epochs = 50
batch_size = 64
train_dataset = create_dataset("TFRecords/train.tfrecords", batch_size=batch_size, is_shuffle=True,
                               n_repeats=n_epochs)
val_dataset = create_dataset("TFRecords/validation.tfrecords", batch_size=batch_size)

# 创建一个feedable iterator
handle = tf.placeholder(tf.string, [])
feed_iterator = tf.data.Iterator.from_string_handle(handle, train_dataset.output_types,
                                                  train_dataset.output_shapes)
images, labels = feed_iterator.get_next()

# 创建不同的iterator
train_iterator = train_dataset.make_one_shot_iterator()
val_iterator = val_dataset.make_initializable_iterator()

# 创建模型
logits = model(images, [500, 500])
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels, logits=logits)
loss = tf.reduce_mean(loss)
train_op = tf.train.AdamOptimizer(learning_rate=1e-04).minimize(loss)
predictions = tf.argmax(logits, axis=1)
accuracy = tf.reduce_mean(tf.cast(tf.equal(predictions, labels), tf.float32))

init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer())

with tf.Session() as sess:
    sess.run(init_op)
    # 生成对应的handle
    train_handle = sess.run(train_iterator.string_handle())
    val_handle = sess.run(val_iterator.string_handle())

    # 训练
    for n in range(n_epochs):
        ls = []
        for i in range(n_train_examples // batch_size):
            _, l = sess.run([train_op, loss], feed_dict={handle: train_handle})
            ls.append(l)
        print("Epoch %d, train loss: %f" % (n, np.mean(ls)))
        if (n + 1) % 10 == 0:
            sess.run(val_iterator.initializer)
            accs = []
            for i in range(n_val_examples // batch_size):
                acc = sess.run(accuracy, feed_dict={handle: val_handle})
                accs.append(acc)
            print("\t validation accuracy: %f" % (np.mean(accs)))

大约可以在验证集上的accuracy达到98%。

小结

看起来最新的tf.data还是比较好用的,如果你是TensorFlow用户,可以尝试着使用它,当然上面的例子并不能包含关于tf.data的所有内容,想继续深入的话可以移步TF的官网。

参考

  1. Programmers guide: import data.
  2. How to use Dataset in TensorFlow.
  3. Reading data.
  4. Performance: datasets performance.
  5. Introduction to Artificial Neural Networks and Deep Learning: A Practical Guide with Applications in Python.

小白将

万事胜意

2 日志
2 标签
RSS
知乎 Github CSDN博客
© 2018 小白将
由 Hexo 强力驱动
|
主题 — NexT.Gemini v5.1.4