import multiprocessing as mp import platform import numpy as np import pytest import megengine as mge import megengine.distributed as dist import megengine.quantization.observer as ob from megengine.distributed.helper import get_device_count_by_fork def test_min_max_observer(): x = np.random.rand(3, 3, 3, 3).astype("float32") np_min, np_max = x.min(), x.max() x = mge.tensor(x) m = ob.MinMaxObserver() m(x) assert m.min_val == np_min and m.max_val == np_max @pytest.mark.skipif( platform.system() == "Darwin", reason="do not imp GPU mode at macos now" ) @pytest.mark.skipif( platform.system() == "Windows", reason="windows disable MGB_ENABLE_OPR_MM" ) @pytest.mark.skipif(get_device_count_by_fork("gpu") < 2, reason="need more gpu device") @pytest.mark.isolated_distributed def test_sync_min_max_observer(): x = np.random.rand(6, 3, 3, 3).astype("float32") np_min, np_max = x.min(), x.max() world_size = 2 port = dist.get_free_ports(1)[0] server = dist.Server(port) def worker(rank, slc): dist.init_process_group("localhost", port, world_size, rank, rank) m = ob.SyncMinMaxObserver() y = mge.tensor(x[slc]) m(y) assert m.min_val == np_min and m.max_val == np_max procs = [] for rank in range(world_size): slc = slice(rank * 3, (rank + 1) * 3) p = mp.Process(target=worker, args=(rank, slc,), daemon=True) p.start() procs.append(p) for p in procs: p.join(20) assert p.exitcode == 0