NVIDIA DALI踩坑教程
初见
事情的起因还要追溯到很久之前看了一篇论文,论文的核心就是讨论预训练策略在低层视觉任务中的作用。既然是预训练策略,那就不可避免的要用规模大一点的数据集,之所以预训练这些年在低层视觉任务中鲜被应用的,其主要的原因就是缺乏大规模数据集。这篇论文主要针对低层视觉任务中的SR(超分辨率)、DeRain(去雨)和DeNoise(去噪)三个任务,作者利用ImageNet中的图像作为基准图像,并在此基础上利用双三次插值得到低分辨率图像用于SR任务,将雨纹和高斯噪声直接加入到干净的基准图像中用于DeRain和DeNoise任务。
但是论文中却忽略了低层视觉中一个非常重要的问题,即低光增强任务(Low-light Image Enhancement)。我觉得主要问题就是低光增强任务中所用到的配对图像数据集是很难获得的,特别是用ImageNet来生成,更是难上加难。其一,低光环境是个很复杂的环境,不是简单调低图像的亮度就能实现的,在低光环境下拍摄的图像往往是有的地方暗有的地方亮;其二,低光环境下摄得的图像往往伴随着各种复杂的噪声,简单的往图像上叠加噪声可能并不真实。前不久我克服了这两个问题,成功从ImageNet/VOC/COCO/IAPR/StreetScenes这五个数据集中挑选一些合适的图像构造了一个大规模配对的暗光图像数据集,其中共包含了153,856对暗光/正常光图像(具体数据集是如何构造的今天就不赘述了,等论文发表后我会详细跟大家解释)。我们知道,PyTorch中提供了torch.utils.data.Dataset(*args, **kwds)
和torch.utils.data.DataLoader(dataset, ...)
这两个类来实现数据集的构建和数据的加载,但是这两个类都是作用在CPU上的。然而我们的预训练数据集的规模达到了153,856*2张,用CPU来加载速度实在太慢了,在后续训练的过程中可能会导致模型等待数据传入的情况,即模型已经训练完一个batch的数据了,但是下一个batch的数据还在加载,没能及时传到模型中,这样会导致GPU的利用率显著下降。也就是说不仅会降低模型训练的速度,同时也没能完全压榨出显卡等硬件的性能,是一件性价比极低的事情。下面举个🌰(例子):
import os
import torch
import torch.nn as nn
from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, random_split
torch.__version__
DATA_DIR = "/home/jensen/workspace/DATASETS/SYNTHETIC_DATA"
BATCH_SIZE = 128
IMAGE_SIZE = 192
syn_trans = transforms.Compose([
transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
transforms.ToTensor(),
])
class Syn_Dataset(nn.Module):
def __init__(self, low_path, high_path, transforms=None):
self.low_path = low_path
self.high_path = high_path
self.transforms = transforms
def __getitem__(self, idx):
low_files = os.listdir(self.low_path)
high_files = os.listdir(self.high_path)
low_image = Image.open(os.path.join(self.low_path, low_files[idx]))
high_image = Image.open(os.path.join(self.low_path, high_files[idx]))
if self.transforms:
low_image = self.transforms(low_image)
high_image = self.transforms(high_image)
return low_image, high_image
def __len__(self):
return len(os.listdir(self.low_path))
dataset = Syn_Dataset(low_path=os.path.join(DATA_DIR, "low"), high_path=os.path.join(DATA_DIR, "low"), transforms=syn_trans)
train_data, val_data = random_split(dataset, (152000, 1856))
val_loader = DataLoader(val_data, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
%%time
for idx, data in enumerate(val_loader):
X, Y = data
print(X.shape)
上面这个例子使用PyTorch内置的DataLoader类以batch_size = 128
遍历了一遍val_dataset
,并打印每个batch的尺寸。使用jupyter
的%%time
魔法语言来计算遍历一遍所花费的时间,从下图可以直观看到,一共花费近两分钟完成一次遍历。
接下来再举个🌰:
from torchvision import models
model = models.alexnet(pretrained=False).cuda()
criterion = nn.CrossEntropyLoss()
for idx, data in enumerate(val_loader):
X, Y = data
output = model(X.cuda())
loss = criterion(out.cpu(), torch.empty(out.shape[0], dtype=torch.long).random_(1000))
loss.backward()
上面这个例子是模拟将数据输入AlexNet中处理,从下图中可以看到GPU的利用率很低(大多数时间都是0%)。主要原因是模型处理数据的时间比数据加载的时间更快,也就意味着模型通常要等待DataLoader将新的batch的数据传过来,导致GPU的利用率大多数时间都处于较低的状态甚至是空闲,严重拖慢模型训练的效率。
要是数据规模小的话,这点时间也不算什么,但是我的数据规模是数十万级别的,每一分一秒都可以说是十分珍贵的,于是我找到了一个非常nice的加速工具:NVIDIA DALI库。
单卡环境下的NVIDIA DALI使用
(NVIDIA DALI的具体使用方式请参阅官方文档)
安装
首先需要确定自己的cuda版本,可以在命令行中输入nvcc -V
来查询,如下图,cuda版本为10.2。
紧接着在命令行中输入pip install nvidia-pyindex && pip install nvidia-dali-cuda102
,请注意,这里的cuda102
正是对应了上面查询的cuda版本10.2。
单卡环境下使用
所谓单卡环境下使用即训练的过程中只使用一张卡,这是最简单的形式。可以自己定义数据的迭代方式,数据的Pipeline以及加载方式。详细细节请看代码:
import os
import torch
import numpy as np
from random import shuffle
import nvidia.dali.fn as fn
import nvidia.dali.types as types
from torch.utils.data import random_split
from nvidia.dali.pipeline import Pipeline
from nvidia.dali.plugin.pytorch import DALIGenericIterator, LastBatchPolicy
torch.__version__
class ExternalInputIterator(object):
def __init__(self, batch_size, files, data_dir):
self.low_dir = os.path.join(data_dir, 'low')
self.high_dir = os.path.join(data_dir, 'high')
self.batch_size = batch_size
self.files = list(files)
shuffle(self.files)
def __len__(self):
return len(self.files)
def __iter__(self):
self.i = 0
self.n = len(self.files)
return self
def __next__(self):
if self.i >= self.n:
self.__iter__()
raise StopIteration
low = []
high = []
leave_num = self.n - self.i
current_batch_size = min(self.batch_size, leave_num)
for _ in range(current_batch_size):
filename = self.files[self.i]
l = open(os.path.join(self.low_dir, filename), 'rb')
h = open(os.path.join(self.high_dir, filename), 'rb')
low.append(np.frombuffer(l.read(), dtype=np.uint8))
high.append(np.frombuffer(h.read(), dtype=np.uint8))
self.i += 1
return (low, high)
next = __next__
len = __len__
class ExternalSourcePipeline(Pipeline):
def __init__(self, data_iterator, batch_size, num_threads, device_id, img_size):
super(ExternalSourcePipeline, self).__init__(batch_size, num_threads, device_id, exec_async=False, exec_pipelined=False)
self.img_size = img_size
self.batch = batch_size
self.data_iterator = data_iterator
self.lows, self.highs = fn.external_source(source=self.data_iterator, num_outputs=2, dtype=types.UINT8)
def __len__(self):
length = len(self.data_iterator)
return (length // self.batch + 1) if (length % self.batch != 0) else (length // self.batch)
def define_graph(self):
low_decode = fn.decoders.image(self.lows, device="mixed")
high_decode = fn.decoders.image(self.highs, device="mixed")
low_resize = fn.resize(low_decode, device="gpu", resize_x=self.img_size, resize_y=self.img_size, interp_type=types.INTERP_TRIANGULAR)
high_resize = fn.resize(high_decode, device="gpu", resize_x=self.img_size, resize_y=self.img_size, interp_type=types.INTERP_TRIANGULAR)
self.low = fn.transpose(low_resize, perm=[2, 0, 1]) / 255.0
self.high = fn.transpose(high_resize, perm=[2, 0, 1]) / 255.0
return (self.low, self.high)
def iter_setup(self):
self.set_outputs(self.low, self.high)
class CustomDALIGenericIterator(DALIGenericIterator):
def __init__(self, pipelines, **kwargs):
output_maps = ['lows', 'highs']
if not isinstance(pipelines, list):
pipelines = [pipelines]
super(CustomDALIGenericIterator, self).__init__(pipelines, output_maps, **kwargs)
self.pipelines = pipelines # devices > 1 ==> pipelines > 1
def __next__(self):
batch = super(CustomDALIGenericIterator, self).__next__()
return self.parse_batch(batch)
def __len__(self):
lengths = [len(i) for i in self.pipelines]
return sum(lengths)
def parse_batch(self, batch):
lows, highs = batch[0]['lows'], batch[0]['highs']
return lows, highs
DATA_DIR = "/home/jensen/workspace/SYNTHESIS_DATA"
BATCH_SIZE = 128
IMAGE_SIZE = 192
files = os.listdir(os.path.join(DATA_DIR, 'low'))
train_files, val_files = random_split(files, (152000, 1856))
val_iter = ExternalInputIterator(batch_size=BATCH_SIZE, files=val_files, data_dir=DATA_DIR)
val_pipe = ExternalSourcePipeline(val_iter, batch_size=BATCH_SIZE, num_threads=4, device_id=0, img_size=IMAGE_SIZE)
val_loader = CustomDALIGenericIterator(val_pipe)
%%time
for idx, data in enumerate(val_loader):
X, Y = data
print(X.shape)
上面的例子使用NVIDIA DALI自定义了数据的迭代方式以及加载方式,同样使用jupyter
的%%time
魔法函数来计算遍历一遍所花费的时间,如下图所示,整个过程仅花费了不到3秒钟。
此外,可以模拟将数据输入AlexNet中处理,GPU的利用率也会一直稳定在85%以上,表明GPU的性能被充分利用。
多卡环境下的NVIDIA DALI使用
本文中所指的多卡环境是单机多卡环境,即在一台机器的多个GPU上进行分布式训练,然而多机多卡的情况并不在本文的讨论范畴之内,我也确实没有用过这种训练方式。有关多卡环境下的使用方式也可以参考官方文档。下面直接放出我的例子吧:
import os
import nvidia.dali.fn as fn
import nvidia.dali.types as types
from torch.utils.data import random_split
from nvidia.dali.pipeline import Pipeline
from nvidia.dali.plugin.pytorch import DALIGenericIterator, LastBatchPolicy
class SyntheicDataPipeline(Pipeline):
"""
An extended Pipeline class based on the Nvidia DALI library for low-light image enhancement.
The effect of the Pipeline class is somewhat similar to the Dataset class in Pytorch and
the transforms function in torchvision. Mainly is to carry on some simple preprocessing to the input data.
Args:
batch_size (int): batch_size.
data_dir (str): the folder path of the paired image. (excluding the 'low' and' high' folders)
files (list): A list of paired image filenames.
image_size (int | tuple): image size after resize operation. Default: 192
num_threads (int): number of CPU threads used by the pipeline. Default: 2
device_id (int): id of GPU used by the pipeline. Default: 0
seed (int): seed used for random number generation. Default: -1
shard_id (int): index of the shard to read. Default: 0
num_shards (int): partitions the data into the specified number of parts (shards). Default: 1
random_shuffle (bool): determines whether to randomly shuffle data. Default: True
Examples:
When you have a GPU, device_id and shard_id should be set to 0 and num_shards should be set to 1.
When you have four GPU, the value range for device_id and shard_id is [0-3] (device_id and shard_id
values are usually the same), and num_shards should be set to 4.
For details, please refer to the official DALI documentation: https://docs.nvidia.com/deeplearning/dali/user-guide/docs/
or my blog (which will be updated in the near future): https://jensen.dlab.ac.cn/ .
"""
def __init__(self, batch_size, data_dir, files,
image_size=192, num_threads=-1, device_id=0, seed=-1,
shard_id=0, num_shards=1, random_shuffle=True, **kwargs):
super(SyntheicDataPipeline, self).__init__(batch_size=batch_size, num_threads=num_threads,
device_id=device_id, seed=seed, **kwargs)
self.types = ['low', 'high']
self.data_dir = [os.path.join(data_dir, name) for name in self.types]
self.files = list(files)
self.image_size = image_size
self.shard_id = shard_id
self.num_shards = num_shards
self.random_shuffle = random_shuffle
def define_graph(self):
low_inputs, _ = fn.readers.file(file_root=self.data_dir[0], files=self.files, seed=1234,
shard_id=self.shard_id, num_shards=self.num_shards,
random_shuffle=self.random_shuffle, pad_last_batch=True,
name="main_reader")
high_inputs, _ = fn.readers.file(file_root=self.data_dir[1], files=self.files, seed=1234,
shard_id=self.shard_id, num_shards=self.num_shards,
random_shuffle=self.random_shuffle, pad_last_batch=True)
inputs = {'low': low_inputs, 'high': high_inputs}
images = {x: fn.decoders.image(inputs[x], device="mixed") for x in self.types}
resizes = {x: fn.resize(images[x], device="gpu", resize_x=self.image_size,
resize_y=self.image_size, interp_type=types.INTERP_TRIANGULAR)
for x in self.types}
self.tensors = {x: fn.transpose(resizes[x], perm=[2, 0, 1]) / 255.0 for x in self.types}
return (self.tensors['low'], self.tensors['high'])
def iter_setup(self):
self.set_outputs(self.tensors['low'], self.tensors['high'])
class SyntheicDataIterator(DALIGenericIterator):
"""
An extended Iterator class based on the Nvidia DALI library for low-light image enhancement.
The effect of the Iterator class is somewhat similar to the Dataloader class in Pytorch.
Args:
pipelines (nvidia.dali.Pipeline): pipelines.
reader_name (str): name of the reader which will be queried to the shard size,
number of shards and all other properties necessary to count
properly the number of relevant and padded samples that iterator
needs to deal with.
last_batch_policy (int): strategy for processing the last batch data. (especially if the
size of the last batch data is smaller than batch_size)
output_map (list): list of strings which maps consecutive outputs of DALI pipelines to user specified name.
Example:
loader = SyntheicDataIterator(...)
for idx, data in enumerate(loader):
low, high = data
...
For details, please refer to the official DALI documentation: https://docs.nvidia.com/deeplearning/dali/user-guide/docs/
or my blog (which will be updated in the near future): https://jensen.dlab.ac.cn/ .
"""
def __init__(self, pipelines, reader_name, last_batch_policy,
output_map=['low', 'high'], **kwargs):
super(SyntheicDataIterator, self).__init__(pipelines=pipelines, output_map=output_map,
reader_name=reader_name, last_batch_policy=last_batch_policy,
**kwargs)
def _parse_data(self, data):
low_data, high_data = data[0]['low'], data[0]['high']
return low_data, high_data
def __next__(self):
data = super(SyntheicDataIterator, self).__next__()
return self._parse_data(data)
def __len__(self):
return super(SyntheicDataIterator, self).__len__()
DATA_DIR = "/home/jensen/workspace/SYNTHESIS_DATA"
BATCH_SIZE = 128
IMAGE_SIZE = 192
files = os.listdir(os.path.join(DATA_DIR, 'low'))
train_files, val_files = random_split(files, (152000, 1856))
pipe = SyntheicDataPipeline(batch_size=BATCH_SIZE, num_threads=4, device_id=0,
seed=1234, data_dir=DATA_DIR, files=val_files,
image_size=IMAGE_SIZE, shard_id=0, num_shards=1)
val_loader = SyntheicDataIterator(pipe, reader_name="main_reader", auto_reset=True,
last_batch_policy=LastBatchPolicy.PARTIAL)
for idx, data in enumerate(val_loader):
X, Y = data
print(X.shape)
上面的例子其实还是单机单卡的环境,但是只要稍微修改一下就可以实现单机多卡。即将SyntheicDataPipeline
类中的device_id
和shard_id
以及num_shards
这三个参数稍作修改。device_id
很好理解了,例如一台机器有四张GPU,则device_id
分别为0
、1
、2
、3
。假如在第二张GPU上进行运算,则device_id
就是2
,此外shard_id
一般与device_id
保持一致,它是指第几个分片(分布式训练的实质就是将一个大batch的数据均分到每个GPU上来并行运算,因此第一张GPU上输入的数据应当就是均分数据得到的第一个分片)。num_shards
这个参数也很好理解,它的意思即一共有多少张GPU。虽然看起来很简单,但是还是有三个参数需要修改,似乎还是有点麻烦,但是不用担心,实际上这些参数都是不需要人为去设定。因为通常分布式训练都需要从命令行传入一个参数--nproc_per_node
,通过这一个参数都能自适应的完成上述参数的修改,在实际训练的时候,只需做如下修改即可:device_id=args.local_rank
、shard_id=args.local_rank
、num_shards=args.world_size
。
好了,本篇博客到这里就应该要结束了,本文中提到的方法通常都能适用于各种深度学习任务,只是不同的任务在数据读取上可能会有一些异同,但我相信这些问题都可以通过参阅官方文档中的nvidia.dali.fn
使用说明来解决。接下来如果有时间的话我会再介绍一下如何使用NVIDIA APEX库来实现分布式训练以及如何将NVIDIA DALI和APEX库结合起来使用。