未验证 提交 c0b27fb0 编写于 作者: H Hari Prasad 提交者: GitHub

Added drop_last to DeepSpeedDataLoader (#1321)

* Added drop_last to DeepSpeedDataLoader

This solves issue #326

* Updated drop_last in engine.py

added drop_last as a ds_config as mentioned by @tjruwase

* Update engine.py

* Update engine.py

* updated config.py and constants.py

* Update constants.py

* added dataloader_ prefix

* Update dataloader.py

* corrected yapf test errors

* Update test_data.py

Added dataloader_drop_last unit test

* Corrected yapf and formatting issues

* updated simple_model.py and test_data.py

* Update simple_model.py

* pre-commit fix

* corrected issues

* Update test_data.py

* Update test_data.py

* Update test_data.py

* Update test_data.py

* removed batch_size from test_data.py

* Update simple_model.py

* Update test_data.py

* Update test_data.py

* Fix unit test issues

* Use fp32 to make things work
Co-authored-by: NOlatunji Ruwase <olruwase@microsoft.com>
Co-authored-by: NJeff Rasley <jerasley@microsoft.com>
上级 ddffbae0
......@@ -658,6 +658,12 @@ def get_checkpoint_tag_validation_mode(checkpoint_params):
f"value of {tag_validation_mode}, expecting one of {CHECKPOINT_TAG_VALIDATION_MODES}")
def get_dataloader_drop_last(param_dict):
return get_scalar_param(param_dict,
DATALOADER_DROP_LAST,
DATALOADER_DROP_LAST_DEFAULT)
'''Write deepspeed config files by modifying basic templates.
Can be used for quicly changing parameters via command line parameters.'''
......@@ -851,6 +857,8 @@ class DeepSpeedConfig(object):
self.aio_config = get_aio_config(param_dict)
self.dataloader_drop_last = get_dataloader_drop_last(param_dict)
def _batch_assertion(self):
train_batch = self.train_batch_size
......
......@@ -416,3 +416,15 @@ QUANTIZE_ROUNDING_DEFAULT = 0 #nearest
FP16_MIXED_QUANTIZE_ENABLED_DEFAULT = False
QUANTIZE_CHANGE_RATIO_DEFAULT = 0.001
QUANTIZE_VERBOSE_DEFAULT = False
#########################################
# Drop the last incomplete Batch
# #########################################
# dataloader_drop_last. By default, this feature is not enabled.
# Users can configure in ds_config.json as below example:
DATALOADER_DROP_LAST_FORMAT = '''
The last incomplete batch can be dropped by setting:
"dataloader_drop_last": True
'''
DATALOADER_DROP_LAST = "dataloader_drop_last"
DATALOADER_DROP_LAST_DEFAULT = False
......@@ -41,7 +41,8 @@ class DeepSpeedDataLoader(object):
num_local_io_workers=None,
data_sampler=None,
data_parallel_world_size=None,
data_parallel_rank=None):
data_parallel_rank=None,
dataloader_drop_last=False):
self.tput_timer = tput_timer
self.batch_size = batch_size
......@@ -69,6 +70,7 @@ class DeepSpeedDataLoader(object):
self.pin_memory = pin_memory
self.len = len(self.data_sampler)
self.data = None
self.dataloader_drop_last = dataloader_drop_last
def __iter__(self):
self._create_dataloader()
......@@ -88,14 +90,16 @@ class DeepSpeedDataLoader(object):
batch_size=self.batch_size,
pin_memory=self.pin_memory,
sampler=self.data_sampler,
num_workers=self.num_local_io_workers)
num_workers=self.num_local_io_workers,
drop_last=self.dataloader_drop_last)
else:
self.dataloader = DataLoader(self.dataset,
batch_size=self.batch_size,
pin_memory=self.pin_memory,
sampler=self.data_sampler,
collate_fn=self.collate_fn,
num_workers=self.num_local_io_workers)
num_workers=self.num_local_io_workers,
drop_last=self.dataloader_drop_last)
self.data = (x for x in self.dataloader)
return self.dataloader
......
......@@ -1143,6 +1143,9 @@ class DeepSpeedEngine(Module):
torch.utils.data.IterableDataset
) # hasattr(obj, "__iter__") should work as well
def dataloader_drop_last(self):
return self._config.dataloader_drop_last
def deepspeed_io(self,
dataset,
batch_size=None,
......@@ -1185,7 +1188,8 @@ class DeepSpeedEngine(Module):
num_local_io_workers=num_local_io_workers,
data_sampler=data_sampler,
data_parallel_world_size=data_parallel_world_size,
data_parallel_rank=data_parallel_rank)
data_parallel_rank=data_parallel_rank,
dataloader_drop_last=self.dataloader_drop_last())
def train(self, mode=True):
r"""
......
......@@ -178,13 +178,18 @@ class PLD_SimpleModel(SimpleModel):
return hidden_dim
def random_dataloader(model, total_samples, hidden_dim, device, dtype=torch.half):
batch_size = model.train_micro_batch_size_per_gpu()
def random_dataset(total_samples, hidden_dim, device, dtype=torch.half):
train_data = torch.randn(total_samples, hidden_dim, device=device, dtype=dtype)
train_label = torch.empty(total_samples,
dtype=torch.long,
device=device).random_(hidden_dim)
train_dataset = torch.utils.data.TensorDataset(train_data, train_label)
return train_dataset
def random_dataloader(model, total_samples, hidden_dim, device, dtype=torch.half):
batch_size = model.train_micro_batch_size_per_gpu()
train_dataset = random_dataset(total_samples, hidden_dim, device, dtype=dtype)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size)
return train_loader
......
from deepspeed.utils import RepeatingLoader
import torch
import pytest
import deepspeed
from common import distributed_test
from simple_model import SimpleModel, args_from_dict, random_dataset
def test_repeating_loader():
......@@ -9,3 +14,45 @@ def test_repeating_loader():
assert next(loader) == 1
assert next(loader) == 2
assert next(loader) == 3
@pytest.mark.parametrize('train_batch_size, drop_last',
[(1,
True),
(4,
True),
(1,
False),
(4,
False)])
def test_dataloader_drop_last(tmpdir, train_batch_size, drop_last):
config_dict = {
"train_batch_size": train_batch_size,
"dataloader_drop_last": drop_last,
"steps_per_print": 1
}
args = args_from_dict(tmpdir, config_dict)
hidden_dim = 10
model = SimpleModel(hidden_dim)
@distributed_test(world_size=[1])
def _test_dataloader_drop_last(args, model, hidden_dim):
optimizer = torch.optim.AdamW(params=model.parameters())
#TODO: Figure out why this breaks with cuda device
train_dataset = random_dataset(total_samples=50,
hidden_dim=hidden_dim,
device=torch.device('cpu'),
dtype=torch.float32)
model, _, training_dataloader, _ = deepspeed.initialize(args=args,
model=model,
training_data=train_dataset,
optimizer=optimizer)
for n, batch in enumerate(training_dataloader):
x = batch[0].to(torch.cuda.current_device())
y = batch[1].to(torch.cuda.current_device())
loss = model(x, y)
model.backward(loss)
model.step()
_test_dataloader_drop_last(args=args, model=model, hidden_dim=hidden_dim)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册