NVIDIA DALI踩坑教程

NVIDIA DALI踩坑教程

June 8, 2022·Jensen
Jensen

image

初见

事情的起因还要追溯到很久之前看了一篇论文,论文的核心就是讨论预训练策略在低层视觉任务中的作用。既然是预训练策略,那就不可避免的要用规模大一点的数据集,之所以预训练这些年在低层视觉任务中鲜被应用的,其主要的原因就是缺乏大规模数据集。这篇论文主要针对低层视觉任务中的SR(超分辨率)、DeRain(去雨)和DeNoise(去噪)三个任务,作者利用ImageNet中的图像作为基准图像,并在此基础上利用双三次插值得到低分辨率图像用于SR任务,将雨纹和高斯噪声直接加入到干净的基准图像中用于DeRain和DeNoise任务。

001

但是论文中却忽略了低层视觉中一个非常重要的问题,即低光增强任务(Low-light Image Enhancement)。我觉得主要问题就是低光增强任务中所用到的配对图像数据集是很难获得的,特别是用ImageNet来生成,更是难上加难。其一,低光环境是个很复杂的环境,不是简单调低图像的亮度就能实现的,在低光环境下拍摄的图像往往是有的地方暗有的地方亮;其二,低光环境下摄得的图像往往伴随着各种复杂的噪声,简单的往图像上叠加噪声可能并不真实。前不久我克服了这两个问题,成功从ImageNet/VOC/COCO/IAPR/StreetScenes这五个数据集中挑选一些合适的图像构造了一个大规模配对的暗光图像数据集,其中共包含了153,856对暗光/正常光图像(具体数据集是如何构造的今天就不赘述了,等论文发表后我会详细跟大家解释)。我们知道,PyTorch中提供了torch.utils.data.Dataset(*args, **kwds)torch.utils.data.DataLoader(dataset, ...)这两个类来实现数据集的构建和数据的加载,但是这两个类都是作用在CPU上的。然而我们的预训练数据集的规模达到了153,856*2张,用CPU来加载速度实在太慢了,在后续训练的过程中可能会导致模型等待数据传入的情况,即模型已经训练完一个batch的数据了,但是下一个batch的数据还在加载,没能及时传到模型中,这样会导致GPU的利用率显著下降。也就是说不仅会降低模型训练的速度,同时也没能完全压榨出显卡等硬件的性能,是一件性价比极低的事情。下面举个🌰(例子):

import os
import torch
import torch.nn as nn
from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, random_split

torch.__version__


DATA_DIR = "/home/jensen/workspace/DATASETS/SYNTHETIC_DATA"
BATCH_SIZE = 128
IMAGE_SIZE = 192


syn_trans = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)), 
    transforms.ToTensor(),
])

class Syn_Dataset(nn.Module):
    
    def __init__(self, low_path, high_path, transforms=None):
        self.low_path = low_path
        self.high_path = high_path
        self.transforms = transforms
        
    def __getitem__(self, idx):
        low_files = os.listdir(self.low_path)
        high_files = os.listdir(self.high_path)
        low_image = Image.open(os.path.join(self.low_path, low_files[idx]))
        high_image = Image.open(os.path.join(self.low_path, high_files[idx]))
        if self.transforms:
            low_image = self.transforms(low_image)
            high_image = self.transforms(high_image)
            
        return low_image, high_image
    
    def __len__(self):
        
        return len(os.listdir(self.low_path))
    
dataset = Syn_Dataset(low_path=os.path.join(DATA_DIR, "low"), high_path=os.path.join(DATA_DIR, "low"), transforms=syn_trans)
train_data, val_data = random_split(dataset, (152000, 1856))

val_loader = DataLoader(val_data, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

%%time
for idx, data in enumerate(val_loader):
    X, Y = data
    print(X.shape)

上面这个例子使用PyTorch内置的DataLoader类以batch_size = 128遍历了一遍val_dataset,并打印每个batch的尺寸。使用jupyter%%time魔法语言来计算遍历一遍所花费的时间,从下图可以直观看到,一共花费近两分钟完成一次遍历。

002

接下来再举个🌰:

from torchvision import models

model = models.alexnet(pretrained=False).cuda()
criterion = nn.CrossEntropyLoss()

for idx, data in enumerate(val_loader):
  	X, Y = data
    output = model(X.cuda())
    loss = criterion(out.cpu(), torch.empty(out.shape[0], dtype=torch.long).random_(1000))
    loss.backward()

上面这个例子是模拟将数据输入AlexNet中处理,从下图中可以看到GPU的利用率很低(大多数时间都是0%)。主要原因是模型处理数据的时间比数据加载的时间更快,也就意味着模型通常要等待DataLoader将新的batch的数据传过来,导致GPU的利用率大多数时间都处于较低的状态甚至是空闲,严重拖慢模型训练的效率。

003

要是数据规模小的话,这点时间也不算什么,但是我的数据规模是数十万级别的,每一分一秒都可以说是十分珍贵的,于是我找到了一个非常nice的加速工具:NVIDIA DALI库。

单卡环境下的NVIDIA DALI使用

(NVIDIA DALI的具体使用方式请参阅官方文档

安装

首先需要确定自己的cuda版本,可以在命令行中输入nvcc -V来查询,如下图,cuda版本为10.2。

004

紧接着在命令行中输入pip install nvidia-pyindex && pip install nvidia-dali-cuda102,请注意,这里的cuda102正是对应了上面查询的cuda版本10.2。

单卡环境下使用

所谓单卡环境下使用即训练的过程中只使用一张卡,这是最简单的形式。可以自己定义数据的迭代方式,数据的Pipeline以及加载方式。详细细节请看代码:

import os
import torch
import numpy as np
from random import shuffle
import nvidia.dali.fn as fn
import nvidia.dali.types as types
from torch.utils.data import random_split
from nvidia.dali.pipeline import Pipeline
from nvidia.dali.plugin.pytorch import DALIGenericIterator, LastBatchPolicy

torch.__version__


class ExternalInputIterator(object):
  
    def __init__(self, batch_size, files, data_dir):
        self.low_dir = os.path.join(data_dir, 'low')
        self.high_dir = os.path.join(data_dir, 'high')
        self.batch_size = batch_size
        self.files = list(files)
        shuffle(self.files)
        
    def __len__(self):
        return len(self.files)
        
    def __iter__(self):
        self.i = 0
        self.n = len(self.files)
        return self
    
    def __next__(self):
        if self.i >= self.n:
            self.__iter__()
            raise StopIteration
            
        low = []
        high = []
        
        leave_num = self.n - self.i
        current_batch_size = min(self.batch_size, leave_num)
        for _ in range(current_batch_size):
            filename = self.files[self.i]
            l = open(os.path.join(self.low_dir, filename), 'rb')
            h = open(os.path.join(self.high_dir, filename), 'rb')
            low.append(np.frombuffer(l.read(), dtype=np.uint8))
            high.append(np.frombuffer(h.read(), dtype=np.uint8))
            self.i += 1
            
        return (low, high)
    
    next = __next__
    len = __len__


class ExternalSourcePipeline(Pipeline):
    
    def __init__(self, data_iterator, batch_size, num_threads, device_id, img_size):
        super(ExternalSourcePipeline, self).__init__(batch_size, num_threads, device_id, exec_async=False, exec_pipelined=False)
        
        self.img_size = img_size
        self.batch = batch_size
        self.data_iterator = data_iterator
        self.lows, self.highs = fn.external_source(source=self.data_iterator, num_outputs=2, dtype=types.UINT8)
    
    def __len__(self):
        length = len(self.data_iterator) 
        return (length // self.batch + 1) if (length % self.batch != 0) else (length // self.batch) 
    
    def define_graph(self):
        low_decode = fn.decoders.image(self.lows, device="mixed")
        high_decode = fn.decoders.image(self.highs, device="mixed")
        low_resize = fn.resize(low_decode, device="gpu", resize_x=self.img_size, resize_y=self.img_size, interp_type=types.INTERP_TRIANGULAR)
        high_resize = fn.resize(high_decode, device="gpu", resize_x=self.img_size, resize_y=self.img_size, interp_type=types.INTERP_TRIANGULAR)
        self.low = fn.transpose(low_resize, perm=[2, 0, 1]) / 255.0
        self.high = fn.transpose(high_resize, perm=[2, 0, 1]) / 255.0

        return (self.low, self.high)
    
    def iter_setup(self):
        self.set_outputs(self.low, self.high)
        
        
class CustomDALIGenericIterator(DALIGenericIterator):
    
    def __init__(self, pipelines, **kwargs):
        output_maps = ['lows', 'highs']
        if not isinstance(pipelines, list):
            pipelines = [pipelines]
        super(CustomDALIGenericIterator, self).__init__(pipelines, output_maps, **kwargs)
        self.pipelines = pipelines  # devices > 1 ==> pipelines > 1
    
    def __next__(self):
        batch = super(CustomDALIGenericIterator, self).__next__()
        return self.parse_batch(batch)
    
    def __len__(self):
        lengths = [len(i) for i in self.pipelines]
        return sum(lengths)
    
    def parse_batch(self, batch):
        lows, highs = batch[0]['lows'], batch[0]['highs'] 
        return lows, highs
      
      
DATA_DIR = "/home/jensen/workspace/SYNTHESIS_DATA"
BATCH_SIZE = 128
IMAGE_SIZE = 192

files = os.listdir(os.path.join(DATA_DIR, 'low'))
train_files, val_files = random_split(files, (152000, 1856))

val_iter = ExternalInputIterator(batch_size=BATCH_SIZE, files=val_files, data_dir=DATA_DIR)
val_pipe = ExternalSourcePipeline(val_iter, batch_size=BATCH_SIZE, num_threads=4, device_id=0, img_size=IMAGE_SIZE)
val_loader = CustomDALIGenericIterator(val_pipe)

%%time
for idx, data in enumerate(val_loader):
    X, Y = data
    print(X.shape)

上面的例子使用NVIDIA DALI自定义了数据的迭代方式以及加载方式,同样使用jupyter%%time魔法函数来计算遍历一遍所花费的时间,如下图所示,整个过程仅花费了不到3秒钟。

005

此外,可以模拟将数据输入AlexNet中处理,GPU的利用率也会一直稳定在85%以上,表明GPU的性能被充分利用。

多卡环境下的NVIDIA DALI使用

本文中所指的多卡环境是单机多卡环境,即在一台机器的多个GPU上进行分布式训练,然而多机多卡的情况并不在本文的讨论范畴之内,我也确实没有用过这种训练方式。有关多卡环境下的使用方式也可以参考官方文档。下面直接放出我的例子吧:

import os
import nvidia.dali.fn as fn
import nvidia.dali.types as types
from torch.utils.data import random_split
from nvidia.dali.pipeline import Pipeline
from nvidia.dali.plugin.pytorch import DALIGenericIterator, LastBatchPolicy


class SyntheicDataPipeline(Pipeline):
    """
    An extended Pipeline class based on the Nvidia DALI library for low-light image enhancement.
    The effect of the Pipeline class is somewhat similar to the Dataset class in Pytorch and 
    the transforms function in torchvision. Mainly is to carry on some simple preprocessing to the input data.
    
    Args:
        batch_size (int): batch_size.
        data_dir (str): the folder path of the paired image. (excluding the 'low' and' high' folders)
        files (list): A list of paired image filenames.
        image_size (int | tuple): image size after resize operation.  Default: 192
        num_threads (int): number of CPU threads used by the pipeline.  Default: 2
        device_id (int): id of GPU used by the pipeline.  Default: 0
        seed (int): seed used for random number generation.  Default: -1
        shard_id (int): index of the shard to read.  Default: 0
        num_shards (int): partitions the data into the specified number of parts (shards).  Default: 1
        random_shuffle (bool): determines whether to randomly shuffle data.  Default: True
        
    Examples:
        When you have a GPU, device_id and shard_id should be set to 0 and num_shards should be set to 1.
        When you have four GPU, the value range for device_id and shard_id is [0-3] (device_id and shard_id 
        values are usually the same), and num_shards should be set to 4.
        
    For details, please refer to the official DALI documentation: https://docs.nvidia.com/deeplearning/dali/user-guide/docs/
    or my blog (which will be updated in the near future): https://jensen.dlab.ac.cn/ .
    """
    
    def __init__(self, batch_size, data_dir, files, 
                 image_size=192, num_threads=-1, device_id=0, seed=-1, 
                 shard_id=0, num_shards=1, random_shuffle=True, **kwargs):
        super(SyntheicDataPipeline, self).__init__(batch_size=batch_size, num_threads=num_threads, 
                                                   device_id=device_id, seed=seed, **kwargs)
        self.types = ['low', 'high']
        self.data_dir = [os.path.join(data_dir, name) for name in self.types]
        self.files = list(files)
        self.image_size = image_size
        self.shard_id = shard_id
        self.num_shards = num_shards
        self.random_shuffle = random_shuffle
        
    def define_graph(self):
        low_inputs, _ = fn.readers.file(file_root=self.data_dir[0], files=self.files, seed=1234, 
                                        shard_id=self.shard_id, num_shards=self.num_shards, 
                                        random_shuffle=self.random_shuffle, pad_last_batch=True, 
                                        name="main_reader")
        high_inputs, _ = fn.readers.file(file_root=self.data_dir[1], files=self.files, seed=1234,
                                         shard_id=self.shard_id, num_shards=self.num_shards, 
                                         random_shuffle=self.random_shuffle, pad_last_batch=True)
        inputs = {'low': low_inputs, 'high': high_inputs}
        images = {x: fn.decoders.image(inputs[x], device="mixed") for x in self.types}
        resizes = {x: fn.resize(images[x], device="gpu", resize_x=self.image_size, 
                                resize_y=self.image_size, interp_type=types.INTERP_TRIANGULAR) 
                                for x in self.types}
        self.tensors = {x: fn.transpose(resizes[x], perm=[2, 0, 1]) / 255.0 for x in self.types}
        
        return (self.tensors['low'], self.tensors['high'])
    
    def iter_setup(self):
        self.set_outputs(self.tensors['low'], self.tensors['high'])
        

class SyntheicDataIterator(DALIGenericIterator):
    """
    An extended Iterator class based on the Nvidia DALI library for low-light image enhancement.
    The effect of the Iterator class is somewhat similar to the Dataloader class in Pytorch.
    
    Args:
        pipelines (nvidia.dali.Pipeline): pipelines.
        reader_name (str): name of the reader which will be queried to the shard size, 
                           number of shards and all other properties necessary to count 
                           properly the number of relevant and padded samples that iterator 
                           needs to deal with.
        last_batch_policy (int): strategy for processing the last batch data. (especially if the
                              size of the last batch data is smaller than batch_size)
        output_map (list): list of strings which maps consecutive outputs of DALI pipelines to user specified name.
    
    Example:
        loader = SyntheicDataIterator(...)
        for idx, data in enumerate(loader):
            low, high = data
            ...
    
    For details, please refer to the official DALI documentation: https://docs.nvidia.com/deeplearning/dali/user-guide/docs/
    or my blog (which will be updated in the near future): https://jensen.dlab.ac.cn/ .
    """
    
    def __init__(self, pipelines, reader_name, last_batch_policy, 
                 output_map=['low', 'high'], **kwargs):
        super(SyntheicDataIterator, self).__init__(pipelines=pipelines, output_map=output_map, 
                                                   reader_name=reader_name, last_batch_policy=last_batch_policy, 
                                                   **kwargs)
        
    def _parse_data(self, data):
        low_data, high_data = data[0]['low'], data[0]['high']
        
        return low_data, high_data
    
    def __next__(self):
        data = super(SyntheicDataIterator, self).__next__()
        return self._parse_data(data)
    
    def __len__(self):
        return super(SyntheicDataIterator, self).__len__()
      
      
DATA_DIR = "/home/jensen/workspace/SYNTHESIS_DATA"
BATCH_SIZE = 128
IMAGE_SIZE = 192

files = os.listdir(os.path.join(DATA_DIR, 'low'))
train_files, val_files = random_split(files, (152000, 1856))

pipe = SyntheicDataPipeline(batch_size=BATCH_SIZE, num_threads=4, device_id=0, 
                            seed=1234, data_dir=DATA_DIR, files=val_files, 
                            image_size=IMAGE_SIZE, shard_id=0, num_shards=1)

val_loader = SyntheicDataIterator(pipe, reader_name="main_reader", auto_reset=True, 
                                  last_batch_policy=LastBatchPolicy.PARTIAL)

for idx, data in enumerate(val_loader):
    X, Y = data
    print(X.shape)

上面的例子其实还是单机单卡的环境,但是只要稍微修改一下就可以实现单机多卡。即将SyntheicDataPipeline类中的device_idshard_id以及num_shards这三个参数稍作修改。device_id很好理解了,例如一台机器有四张GPU,则device_id分别为0123。假如在第二张GPU上进行运算,则device_id就是2,此外shard_id一般与device_id保持一致,它是指第几个分片(分布式训练的实质就是将一个大batch的数据均分到每个GPU上来并行运算,因此第一张GPU上输入的数据应当就是均分数据得到的第一个分片)。num_shards这个参数也很好理解,它的意思即一共有多少张GPU。虽然看起来很简单,但是还是有三个参数需要修改,似乎还是有点麻烦,但是不用担心,实际上这些参数都是不需要人为去设定。因为通常分布式训练都需要从命令行传入一个参数--nproc_per_node,通过这一个参数都能自适应的完成上述参数的修改,在实际训练的时候,只需做如下修改即可:device_id=args.local_rankshard_id=args.local_ranknum_shards=args.world_size

好了,本篇博客到这里就应该要结束了,本文中提到的方法通常都能适用于各种深度学习任务,只是不同的任务在数据读取上可能会有一些异同,但我相信这些问题都可以通过参阅官方文档中的nvidia.dali.fn使用说明来解决。接下来如果有时间的话我会再介绍一下如何使用NVIDIA APEX库来实现分布式训练以及如何将NVIDIA DALI和APEX库结合起来使用。

最后更新于