diff --git a/imperative/python/megengine/data/_queue.py b/imperative/python/megengine/data/_queue.py index a9e328c65c56e4f4ba736b510176677b6c735c32..8e359ae06c0bb2c7306c067c63a100d5528e9bad 100644 --- a/imperative/python/megengine/data/_queue.py +++ b/imperative/python/megengine/data/_queue.py @@ -26,7 +26,7 @@ def _clear_plasma_store(): # `_PlasmaStoreManager.__del__` will not be called automaticly in subprocess, # so this function should be called explicitly global MGE_PLASMA_STORE_MANAGER - if MGE_PLASMA_STORE_MANAGER is not None: + if MGE_PLASMA_STORE_MANAGER is not None and MGE_PLASMA_STORE_MANAGER.refcount == 0: del MGE_PLASMA_STORE_MANAGER MGE_PLASMA_STORE_MANAGER = None @@ -50,6 +50,7 @@ class _PlasmaStoreManager: stderr=None if debug_flag else subprocess.DEVNULL, ) self.__initialized = True + self.refcount = 1 def __del__(self): if self.__initialized and self.plasma_store.returncode is None: @@ -83,6 +84,8 @@ class PlasmaShmQueue: "Exception happened in starting plasma_store: {}\n" "Tips: {}".format(str(e), err_info) ) + else: + MGE_PLASMA_STORE_MANAGER.refcount += 1 self.socket_name = MGE_PLASMA_STORE_MANAGER.socket_name @@ -133,6 +136,8 @@ class PlasmaShmQueue: def close(self): self.queue.close() self.disconnect_client() + global MGE_PLASMA_STORE_MANAGER + MGE_PLASMA_STORE_MANAGER.refcount -= 1 _clear_plasma_store() def cancel_join_thread(self): diff --git a/imperative/python/megengine/data/dataset/vision/coco.py b/imperative/python/megengine/data/dataset/vision/coco.py index d247e52b4f6567d03dd390864ef5b9c1ee4f600c..11366de032940bb3753811d67657465438eb49d7 100644 --- a/imperative/python/megengine/data/dataset/vision/coco.py +++ b/imperative/python/megengine/data/dataset/vision/coco.py @@ -118,7 +118,7 @@ class COCO(VisionDataset): self.ids = ids self.json_category_id_to_contiguous_id = { - v: i + 1 for i, v in enumerate(self.cats.keys()) + v: i + 1 for i, v in enumerate(sorted(self.cats.keys())) } self.contiguous_category_id_to_json_id = { diff --git a/imperative/python/megengine/data/dataset/vision/objects365.py b/imperative/python/megengine/data/dataset/vision/objects365.py index 7c1481bac99fa2af82fb8d93856b7815024373c9..e56e646231479da56c841623fcfc51ed251067e2 100644 --- a/imperative/python/megengine/data/dataset/vision/objects365.py +++ b/imperative/python/megengine/data/dataset/vision/objects365.py @@ -81,7 +81,7 @@ class Objects365(VisionDataset): self.ids = ids self.json_category_id_to_contiguous_id = { - v: i + 1 for i, v in enumerate(self.cats.keys()) + v: i + 1 for i, v in enumerate(sorted(self.cats.keys())) } self.contiguous_category_id_to_json_id = { diff --git a/imperative/python/megengine/data/dataset/vision/voc.py b/imperative/python/megengine/data/dataset/vision/voc.py index 42bf712dc172176ee040881a84ee8b2ed79a383b..b22fd2faca5a1f0a060a21fdf39846cc14c56f41 100644 --- a/imperative/python/megengine/data/dataset/vision/voc.py +++ b/imperative/python/megengine/data/dataset/vision/voc.py @@ -75,6 +75,8 @@ class PascalVOC(VisionDataset): else: raise NotImplementedError + self.img_infos = dict() + def __getitem__(self, index): target = [] for k in self.order: @@ -107,9 +109,8 @@ class PascalVOC(VisionDataset): mask = mask[:, :, np.newaxis] target.append(mask) elif k == "info": - if image is None: - image = cv2.imread(self.images[index], cv2.IMREAD_COLOR) - info = [image.shape[0], image.shape[1], self.file_names[index]] + info = self.get_img_info(index, image) + info = [info["height"], info["width"], info["file_name"]] target.append(info) else: raise NotImplementedError @@ -119,6 +120,17 @@ class PascalVOC(VisionDataset): def __len__(self): return len(self.images) + def get_img_info(self, index, image=None): + if index not in self.img_infos: + if image is None: + image = cv2.imread(self.images[index], cv2.IMREAD_COLOR) + self.img_infos[index] = dict( + height=image.shape[0], + width=image.shape[1], + file_name=self.file_names[index], + ) + return self.img_infos[index] + def _trans_mask(self, mask): label = np.ones(mask.shape[:2]) * 255 for i in range(len(self.class_colors)): @@ -171,25 +183,3 @@ class PascalVOC(VisionDataset): "train", "tvmonitor", ) - class_colors = [ - [0, 0, 128], - [0, 128, 0], - [0, 128, 128], - [128, 0, 0], - [128, 0, 128], - [128, 128, 0], - [128, 128, 128], - [0, 0, 64], - [0, 0, 192], - [0, 128, 64], - [0, 128, 192], - [128, 0, 64], - [128, 0, 192], - [128, 128, 64], - [128, 128, 192], - [0, 64, 0], - [0, 64, 128], - [0, 192, 0], - [0, 192, 128], - [128, 64, 0], - ] diff --git a/imperative/python/megengine/module/qat/module.py b/imperative/python/megengine/module/qat/module.py index 544e04aff63fb6f73dedf832a6d186d09749bd15..04da600076f317333e23605cd5665aaa401a12a4 100644 --- a/imperative/python/megengine/module/qat/module.py +++ b/imperative/python/megengine/module/qat/module.py @@ -52,7 +52,7 @@ class QATModule(Module): self.weight_fake_quant = safe_call(qconfig.weight_fake_quant) def _enable_exec(self, with_module, func, enable): - if not with_module: + if not with_module or not func: return if enable: func.enable() diff --git a/imperative/python/megengine/module/sequential.py b/imperative/python/megengine/module/sequential.py index 210dd196368bf6380064310eb1e002dfee73a018..ce021eff7bbae6066e678019cb2e995e046838e6 100644 --- a/imperative/python/megengine/module/sequential.py +++ b/imperative/python/megengine/module/sequential.py @@ -26,40 +26,40 @@ class Sequential(Module): import megengine as mge import megengine.module as M import megengine.functional as F + from collections import OrderedDict batch_size = 64 data = mge.tensor(np.zeros((batch_size, 1, 28, 28)), dtype=np.float32) label = mge.tensor(np.zeros(batch_size,), dtype=np.int32) data = data.reshape(batch_size, -1) - net = M.Sequential( + net0 = M.Sequential( M.Linear(28 * 28, 320), - M.Linear(320, 500), - M.Linear(500, 320), M.Linear(320, 10) ) - pred = net(data) + pred0 = net0(data) - loss = F.cross_entropy_with_softmax(pred, label) + modules = OrderedDict() + modules["fc0"] = nn.Linear(28 * 28, 320) + modules["fc1"] = nn.Linear(320, 10) + net1 = nn.Sequential(modules) + pred1 = net1(data) """ def __init__(self, *args): super().__init__() self.layer_keys = [] - self.layer_values = [] if len(args) == 1 and isinstance(args[0], OrderedDict): for key, module in args[0].items(): # self.add_module(key, module) setattr(self, key, module) self.layer_keys.append(key) - self.layer_values.append(module) else: for idx, module in enumerate(args): # self.add_module(str(idx), module) setattr(self, str(idx), module) self.layer_keys.append(str(idx)) - self.layer_values.append(module) def __getitem__(self, idx): if isinstance(idx, slice): @@ -67,11 +67,10 @@ class Sequential(Module): OrderedDict(zip(self.layer_keys[idx], self.layer_values[idx])) ) else: - return self.layer_values[idx] + return getattr(self, self.layer_keys[idx]) def __setitem__(self, idx, module): key = self.layer_keys[idx] - self.layer_values[idx] = module return setattr(self, key, module) def __delitem__(self, idx): @@ -79,11 +78,9 @@ class Sequential(Module): for key in self.layer_keys[idx]: delattr(self, key) del self.layer_keys[idx] - del self.layer_values[idx] else: delattr(self, self.layer_keys[idx]) del self.layer_keys[idx] - del self.layer_values[idx] def __len__(self): return len(self.layer_keys) @@ -91,6 +88,10 @@ class Sequential(Module): def __iter__(self): return iter(self.layer_values) + @property + def layer_values(self): + return [getattr(self, key) for key in self.layer_keys] + def forward(self, inp): for layer in self.layer_values: inp = layer(inp) diff --git a/imperative/python/test/unit/data/test_dataloader.py b/imperative/python/test/unit/data/test_dataloader.py new file mode 100644 index 0000000000000000000000000000000000000000..6bb0f3e32f592ea50b4514b6a3c616a4b0f8c117 --- /dev/null +++ b/imperative/python/test/unit/data/test_dataloader.py @@ -0,0 +1,183 @@ +# -*- coding: utf-8 -*- +# MegEngine is Licensed under the Apache License, Version 2.0 (the "License") +# +# Copyright (c) 2014-2020 Megvii Inc. All rights reserved. +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +import os +import time + +import numpy as np +import pytest + +from megengine.data.collator import Collator +from megengine.data.dataloader import DataLoader +from megengine.data.dataset import ArrayDataset +from megengine.data.sampler import RandomSampler, SequentialSampler +from megengine.data.transform import PseudoTransform, Transform + + +def init_dataset(): + sample_num = 100 + rand_data = np.random.randint(0, 255, size=(sample_num, 1, 32, 32), dtype=np.uint8) + label = np.random.randint(0, 10, size=(sample_num,), dtype=int) + dataset = ArrayDataset(rand_data, label) + return dataset + + +def test_dataloader_init(): + dataset = init_dataset() + with pytest.raises(ValueError): + dataloader = DataLoader(dataset, num_workers=2, divide=True) + with pytest.raises(ValueError): + dataloader = DataLoader(dataset, num_workers=-1) + with pytest.raises(ValueError): + dataloader = DataLoader(dataset, timeout=-1) + with pytest.raises(ValueError): + dataloader = DataLoader(dataset, num_workers=0, divide=True) + + dataloader = DataLoader(dataset) + assert isinstance(dataloader.sampler, SequentialSampler) + assert isinstance(dataloader.transform, PseudoTransform) + assert isinstance(dataloader.collator, Collator) + + dataloader = DataLoader( + dataset, sampler=RandomSampler(dataset, batch_size=6, drop_last=False) + ) + assert len(dataloader) == 17 + dataloader = DataLoader( + dataset, sampler=RandomSampler(dataset, batch_size=6, drop_last=True) + ) + assert len(dataloader) == 16 + + +def test_dataloader_serial(): + dataset = init_dataset() + dataloader = DataLoader( + dataset, sampler=RandomSampler(dataset, batch_size=4, drop_last=False) + ) + for (data, label) in dataloader: + assert data.shape == (4, 1, 32, 32) + assert label.shape == (4,) + + +def test_dataloader_parallel(): + # set max shared memory to 100M + os.environ["MGE_PLASMA_MEMORY"] = "100000000" + + dataset = init_dataset() + dataloader = DataLoader( + dataset, + sampler=RandomSampler(dataset, batch_size=4, drop_last=False), + num_workers=2, + divide=False, + ) + for (data, label) in dataloader: + assert data.shape == (4, 1, 32, 32) + assert label.shape == (4,) + + dataloader = DataLoader( + dataset, + sampler=RandomSampler(dataset, batch_size=4, drop_last=False), + num_workers=2, + divide=True, + ) + for (data, label) in dataloader: + assert data.shape == (4, 1, 32, 32) + assert label.shape == (4,) + + +def test_dataloader_parallel_timeout(): + dataset = init_dataset() + + class TimeoutTransform(Transform): + def __init__(self): + pass + + def apply(self, input): + time.sleep(10) + return input + + dataloader = DataLoader( + dataset, + sampler=RandomSampler(dataset, batch_size=4, drop_last=False), + transform=TimeoutTransform(), + num_workers=2, + timeout=2, + ) + with pytest.raises(RuntimeError, match=r".*timeout.*"): + data_iter = iter(dataloader) + batch_data = next(data_iter) + + +def test_dataloader_parallel_worker_exception(): + dataset = init_dataset() + + class FakeErrorTransform(Transform): + def __init__(self): + pass + + def apply(self, input): + y = x + 1 + return input + + dataloader = DataLoader( + dataset, + sampler=RandomSampler(dataset, batch_size=4, drop_last=False), + transform=FakeErrorTransform(), + num_workers=2, + ) + with pytest.raises(RuntimeError, match=r"worker.*died"): + data_iter = iter(dataloader) + batch_data = next(data_iter) + + +def _multi_instances_parallel_dataloader_worker(): + dataset = init_dataset() + + for divide_flag in [True, False]: + train_dataloader = DataLoader( + dataset, + sampler=RandomSampler(dataset, batch_size=4, drop_last=False), + num_workers=2, + divide=divide_flag, + ) + val_dataloader = DataLoader( + dataset, + sampler=RandomSampler(dataset, batch_size=10, drop_last=False), + num_workers=2, + divide=divide_flag, + ) + for idx, (data, label) in enumerate(train_dataloader): + assert data.shape == (4, 1, 32, 32) + assert label.shape == (4,) + if idx % 5 == 0: + for val_data, val_label in val_dataloader: + assert val_data.shape == (10, 1, 32, 32) + assert val_label.shape == (10,) + + +def test_dataloader_parallel_multi_instances(): + # set max shared memory to 100M + os.environ["MGE_PLASMA_MEMORY"] = "100000000" + + _multi_instances_parallel_dataloader_worker() + + +def test_dataloader_parallel_multi_instances_multiprocessing(): + # set max shared memory to 100M + os.environ["MGE_PLASMA_MEMORY"] = "100000000" + + import multiprocessing as mp + + # mp.set_start_method("spawn") + processes = [] + for i in range(4): + p = mp.Process(target=_multi_instances_parallel_dataloader_worker) + p.start() + processes.append(p) + + for p in processes: + p.join() diff --git a/imperative/python/test/unit/module/test_module.py b/imperative/python/test/unit/module/test_module.py index d3a492414f8a21bbc9373b397b58e081fb585b2b..48c815b135facebd226fcaf097ea863808bcbde7 100644 --- a/imperative/python/test/unit/module/test_module.py +++ b/imperative/python/test/unit/module/test_module.py @@ -460,9 +460,9 @@ def test_sequential_named_children(): modules["name2"] = Linear(5, 1) m = Sequential(modules) l = list(m.named_children()) - assert l[0][0] == "layer_values.0" - assert l[1][0] == "layer_values.1" - assert l[2][0] == "layer_values.2" + assert l[0][0] == "name0" + assert l[1][0] == "name1" + assert l[2][0] == "name2" def test_state_dict():