spawn.py 24.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import multiprocessing
import os
import signal
import sys
import warnings

R
Roc 已提交
21
from paddle.distributed.utils.launch_utils import _print_arguments, _prepare_trainer_env, get_host_name_ip
X
xiongkun 已提交
22 23
from paddle.distributed.cloud_utils import get_cluster_and_pod, _get_trainers_num
from paddle.distributed.fleet.launch import get_cluster_from_args
24
from paddle.distributed.fleet.cloud_utils import use_paddlecloud
X
xiongkun 已提交
25
from paddle.distributed.fleet.launch_utils import DeviceMode, check_backend, block_windows_and_macos
26 27 28 29
from paddle.device import get_device

# deprecated module import
from paddle.fluid import core
30
from paddle.fluid.framework import set_flags
31

32 33
__all__ = []

34 35

class ParallelEnvArgs(object):
36

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
    def __init__(self):
        # Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..
        self.cluster_node_ips = None

        # The current node ip.
        self.node_ip = None

        # whether to use paddlecloud platform to run your multi-process job.
        # If false, no need to set this argument.
        self.use_paddlecloud = None

        # The trainer's started port on a single node
        self.started_port = None

        # Print the config or not
        self.print_config = True

54 55 56
        # It's for gpu training and the training process will run
        # on the selected_devices, each process is bound to a single GPU.
        # And if it's not set, this module will use all the gpu cards
57
        # for training.
58
        self.selected_devices = None
59 60 61 62 63 64 65 66 67 68 69


def _py_supported_check():
    if not sys.version_info >= (3, 4):
        raise RuntimeError(
            "Use `paddle.distributed.spawn` to start parallel training "
            "requires python version greater than 3.4, if your python "
            "is lower than this version, please use "
            "`paddle.distributed.launch` instead.")


70
def _options_valid_check(options):
71
    # `print_config` keeped as a debug options, not show to users
X
xiongkun 已提交
72
    supported_options = [
73
        'start_method', 'ips', 'gpus', 'xpus', 'mlus', 'print_config', 'backend'
X
xiongkun 已提交
74
    ]
75
    deprecated_options = [
76
        'selected_devices', 'started_port', 'cluster_node_ips', 'node_ip',
77
        'use_paddlecloud'
78 79 80
    ]
    for key in options:
        if key not in supported_options:
81 82 83 84 85 86 87 88 89 90 91
            if key in deprecated_options:
                warnings.warn(
                    "The config option (%s) of `paddle.distributed.spawn` is deprecated. "
                    "Please use the latest config options stated in the `spawn` API documentation."
                    % key, DeprecationWarning)
            else:
                raise ValueError(
                    "The config option (%s) of `paddle.distributed.spawn` is not supported."
                    % key)


92 93 94 95 96 97
def _get_default_nprocs():
    device = get_device()
    if 'gpu' in device:
        return core.get_cuda_device_count()
    elif 'xpu' in device:
        return core.get_xpu_device_count()
98 99
    elif 'mlu' in device:
        return core.get_mlu_device_count()
X
xiongkun 已提交
100 101 102 103
    elif 'cpu' in device:
        return multiprocessing.cpu_count()
    else:
        raise RuntimeError(
104 105
            "`paddle.distributed.spawn` does not support parallel training on device `{}` now."
            .format(device))
X
xiongkun 已提交
106 107 108 109 110 111 112 113


def _get_default_backend():
    device = get_device()
    if 'gpu' in device:
        return 'nccl'
    elif 'xpu' in device:
        return 'bkcl'
114 115
    elif 'mlu' in device:
        return 'cncl'
X
xiongkun 已提交
116 117
    elif 'cpu' in device:
        return 'gloo'
118 119
    else:
        raise RuntimeError(
120 121
            "`paddle.distributed.spawn` does not support parallel training on device `{}` now."
            .format(device))
122 123


124 125 126 127 128 129 130 131
def _get_node_ip(ips):
    node_ip = None
    node_ips = [x.strip() for x in ips.split(',')]
    if len(node_ips) == 1:
        node_ip = node_ips[0]
    else:
        _, node_ip = get_host_name_ip()
    return node_ip
132 133


134
def _get_subprocess_env_list(nprocs, options):
135 136 137
    # NOTE (xiongkun03) Why put backend deduction  here ?
    # Becase _get_subprocess_env_list is used by many testcases.
    # So for campability, we put backend deduction here
X
xiongkun 已提交
138 139 140 141 142 143 144

    # logic for handle backend option
    if 'backend' not in options or options['backend'] == 'auto':
        options['backend'] = _get_default_backend()
    check_backend(options['backend'])
    block_windows_and_macos(options['backend'])

145 146 147 148 149 150
    # contruct processes env list
    processes_env_list = []

    # get args from kwargs
    args = ParallelEnvArgs()

151 152
    # deal with `ips`
    args.cluster_node_ips = options.get('ips', None)
153
    if args.cluster_node_ips is None:
154 155 156
        args.cluster_node_ips = options.get('cluster_node_ips', None)
        if args.cluster_node_ips is None:
            args.cluster_node_ips = "127.0.0.1"
157

158 159
    # deal with `gpus` or `xpus`
    # set default selected devices(gpus or xpus)
160
    # e.g. if the nprocs is 4, the selected gpus is "0,1,2,3"
161 162 163
    # NOTE(chenweihang): [ why not use FLAGS_selected_gpus or FLAGS_selected_xpus directly? ]
    # because the FLAGS_selected_gpus or FLAGS_selected_xpus may be used in other place,
    # if we set FLAGS_selected_gpus or FLAGS_selected_xpus to be `0,1,2,3`, it may cause error
164
    # when using `ParallelEnv`
165
    # NOTE(chenweihang): use absolute gpu or xpu card id
X
xiongkun 已提交
166
    if options['backend'] == 'nccl':
167 168 169 170 171 172
        args.selected_devices = options.get('gpus', None)
        if args.selected_devices is None:
            args.selected_devices = options.get('selected_devices', None)
        env_devices = os.getenv("CUDA_VISIBLE_DEVICES", None)
        if env_devices is None or env_devices == "":
            env_devices_list = [
173
                str(x) for x in range(core.get_cuda_device_count())
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
            ]
        else:
            env_devices_list = env_devices.split(',')
        if args.selected_devices is None:
            if len(env_devices_list) < nprocs:
                raise RuntimeError(
                    "the number of visible devices(%d) is less than the number "
                    "of spawn processes(%d), please ensure that the correct "
                    "`nprocs` argument is passed or the environment variable "
                    "`CUDA_VISIBLE_DEVICES` is correctly configured." %
                    (len(env_devices_list), nprocs))
            args.selected_devices = ",".join(
                [str(env_devices_list[x]) for x in range(0, nprocs)])
        else:
            selected_device_list = args.selected_devices.split(',')
            if len(selected_device_list) != nprocs:
                raise ValueError(
                    "The number of selected devices(%s) is not equal to "
                    "the number of spawn processes(%d), please ensure that the "
                    "correct `nprocs` and `gpus` arguments are passed." %
                    (len(selected_device_list), nprocs))
            for card_id in selected_device_list:
                if card_id not in env_devices_list:
                    raise ValueError("The selected gpu card %s cannot found in "
                                     "CUDA_VISIBLE_DEVICES (%s)." %
                                     (card_id, ",".join(env_devices_list)))

X
xiongkun 已提交
201
    elif options['backend'] == 'bkcl':
202 203 204 205 206 207
        args.selected_devices = options.get('xpus', None)
        if args.selected_devices is None:
            args.selected_devices = options.get('selected_devices', None)
        env_devices = os.getenv("XPU_VISIBLE_DEVICES", None)
        if env_devices is None or env_devices == "":
            env_devices_list = [
208
                str(x) for x in range(core.get_xpu_device_count())
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
            ]
        else:
            env_devices_list = env_devices.split(',')
        if args.selected_devices is None:
            if len(env_devices_list) < nprocs:
                raise RuntimeError(
                    "the number of visible devices(%d) is less than the number "
                    "of spawn processes(%d), please ensure that the correct "
                    "`nprocs` argument is passed or the environment variable "
                    "`XPU_VISIBLE_DEVICES` is correctly configured." %
                    (len(env_devices_list), nprocs))
            args.selected_devices = ",".join(
                [str(env_devices_list[x]) for x in range(0, nprocs)])
        else:
            selected_device_list = args.selected_devices.split(',')
            if len(selected_device_list) != nprocs:
                raise ValueError(
                    "The number of selected devices(%s) is not equal to "
                    "the number of spawn processes(%d), please ensure that the "
                    "correct `nprocs` and `xpus` arguments are passed." %
                    (len(selected_device_list), nprocs))
            for card_id in selected_device_list:
                if card_id not in env_devices_list:
                    raise ValueError("The selected xpu card %s cannot found in "
                                     "XPU_VISIBLE_DEVICES (%s)." %
                                     (card_id, ",".join(env_devices_list)))
235 236 237 238 239 240 241
    elif options['backend'] == 'cncl':
        args.selected_devices = options.get('mlus', None)
        if args.selected_devices is None:
            args.selected_devices = options.get('selected_devices', None)
        env_devices = os.getenv("MLU_VISIBLE_DEVICES", None)
        if env_devices is None or env_devices == "":
            env_devices_list = [
242
                str(x) for x in range(core.get_mlu_device_count())
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
            ]
        else:
            env_devices_list = env_devices.split(',')
        if args.selected_devices is None:
            if len(env_devices_list) < nprocs:
                raise RuntimeError(
                    "the number of visible devices(%d) is less than the number "
                    "of spawn processes(%d), please ensure that the correct "
                    "`nprocs` argument is passed or the environment variable "
                    "`MLU_VISIBLE_DEVICES` is correctly configured." %
                    (len(env_devices_list), nprocs))
            args.selected_devices = ",".join(
                [str(env_devices_list[x]) for x in range(0, nprocs)])
        else:
            selected_device_list = args.selected_devices.split(',')
            if len(selected_device_list) != nprocs:
                raise ValueError(
                    "The number of selected devices(%s) is not equal to "
                    "the number of spawn processes(%d), please ensure that the "
                    "correct `nprocs` and `mlus` arguments are passed." %
                    (len(selected_device_list), nprocs))
            for card_id in selected_device_list:
                if card_id not in env_devices_list:
                    raise ValueError("The selected mlu card %s cannot found in "
                                     "MLU_VISIBLE_DEVICES (%s)." %
                                     (card_id, ",".join(env_devices_list)))
X
xiongkun 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
    elif options['backend'] == 'gloo':
        # TODO check gpu / xpu flag must not exist
        warnings.warn(
            "Your model will be trained under CPUONLY mode by using GLOO,"
            "because CPUPlace is specified manually or your installed PaddlePaddle only support CPU Device."
        )
        args.paddle_cpuonly = True
        args.selected_devices = None
        args.ips = args.cluster_node_ips
        assert options.get(
            'use_paddlecloud',
            None) is None, "CPUONLY spawn doesn't support use paddle cloud"
        assert len(
            args.cluster_node_ips.split(',')
        ) <= 1, "CPUONLY spawn only support single trainer, that is len(ips)=1, but got %s."
        assert _get_trainers_num(
        ) == 1, "CPUONLY spawn doesn't support multi-trainer"
286

287 288 289 290 291
    # set other inner args
    args.node_ip = options.get('node_ip', None)
    if args.node_ip is None:
        args.node_ip = _get_node_ip(args.cluster_node_ips)

292 293
    args.started_port = options.get('started_port', None)

294 295 296 297 298
    args.use_paddlecloud = options.get('use_paddlecloud', None)
    if args.use_paddlecloud is None:
        args.use_paddlecloud = use_paddlecloud()

    # get cluster and pod config
X
xiongkun 已提交
299 300 301 302 303 304
    if options['backend'] == 'gloo':
        devices_per_proc = [x for x in range(0, nprocs)]
        cluster, pod = get_cluster_from_args(args, DeviceMode.CPU,
                                             devices_per_proc)
    else:
        cluster, pod = get_cluster_and_pod(args)
305 306 307

    # prepare subprocess env list
    for trainer in pod.trainers:
X
xiongkun 已提交
308 309
        processes_env_list.append(
            _prepare_trainer_env(cluster, trainer, options['backend']))
310

311 312
    # [Debug] print config
    args.print_config = options.get('print_config', False)
313 314 315 316 317 318 319
    if args.print_config:
        _print_arguments(args)

    return processes_env_list


def _remove_risky_env():
320
    # remove useless env vars
321 322 323 324 325
    # no copy, each process will hold env vars itself
    os.environ.pop("http_proxy", None)
    os.environ.pop("https_proxy", None)


X
xiongkun 已提交
326
def _set_trainer_env(env_dict, backend):
327
    # NOTE(chenweihang): [ Why need set FLAGS_selected_gpus or FLAGS_selected_xpus here? ]
328 329
    # When the child process starts, it will inherit the configuration of the
    # main process and set the FLAGS once, but the environment variable has
330
    # not been set at this time, which leads to the FLAGS_selected_gpus or FLAGS_selected_xpus
331
    # is keep same with mainprocess(usually empty), so manually update the flags here
X
xiongkun 已提交
332 333 334 335 336

    # NOTE(xiongkun): why put backend here?  because if gloo, we shouldn't set FLAGS_selectedXXX
    #

    if backend == 'nccl':
337
        set_flags({'FLAGS_selected_gpus': env_dict['FLAGS_selected_gpus']})
X
xiongkun 已提交
338
    elif backend == 'bkcl':
339
        set_flags({'FLAGS_selected_xpus': env_dict['FLAGS_selected_xpus']})
340 341
    elif backend == 'cncl':
        set_flags({'FLAGS_selected_mlus': env_dict['FLAGS_selected_mlus']})
342
    else:
343 344
        #NOTE(xiongkun) why not raise Error ?
        # So far, we added support for CPU parallel, and will be applied when paddle is not
X
xiongkun 已提交
345 346 347
        # compiled with cuda or xp. just do nothing.
        pass

348 349 350 351
    for var_name in env_dict:
        os.environ[var_name] = env_dict[var_name]


X
xiongkun 已提交
352
def _func_wrapper(func, args, error_queue, return_queue, env_dict, backend):
353 354 355
    try:
        # config subprocess environment variables
        _remove_risky_env()
X
xiongkun 已提交
356
        _set_trainer_env(env_dict, backend)
357 358 359 360 361 362 363 364 365 366 367 368 369
        # execute function
        result = func(*args)
        # record function return value
        return_queue.put(result)
    except KeyboardInterrupt:
        pass
    except Exception:
        import traceback
        error_queue.put(traceback.format_exc())
        sys.exit(1)


class MultiprocessContext(object):
370

371 372 373
    def __init__(self, processes, error_queues, return_queues):
        _py_supported_check()
        self.error_queues = error_queues
374 375 376 377
        # NOTE(chenweihang): The `spawn` method is mainly used
        # to wrap the outermost execution function of the program for
        # parallel execution. Generally, the return value is not concerned,
        # but if the user needs to obtain the return value, users can get
378 379 380 381 382 383 384 385 386 387 388 389
        # the return result of each process from context.return_queues
        self.return_queues = return_queues
        self.processes = processes
        self.sentinels = {
            process.sentinel: index
            for index, process in enumerate(processes)
        }

    def join(self, timeout=None):
        if len(self.sentinels) == 0:
            return True

390 391
        ready = multiprocessing.connection.wait(self.sentinels.keys(),
                                                timeout=timeout)
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419

        error_index = None
        for sentinel in ready:
            index = self.sentinels.pop(sentinel)
            process = self.processes[index]
            process.join()
            if process.exitcode != 0:
                error_index = index
                break

        if error_index is None:
            return len(self.sentinels) == 0

        for process in self.processes:
            if process.is_alive():
                process.terminate()
            process.join()

        self._throw_exception(error_index)

    def _throw_exception(self, error_index):
        if self.error_queues[error_index].empty():
            exitcode = self.processes[error_index].exitcode
            if exitcode < 0:
                name = signal.Signals(-exitcode).name
                raise Exception("Process %d terminated with signal %s." %
                                (error_index, name))
            else:
C
Chen Weihang 已提交
420 421
                raise Exception("Process %d terminated with exit code %d." %
                                (error_index, exitcode))
422 423 424 425 426 427 428 429 430 431 432 433 434

        original_trace = self.error_queues[error_index].get()
        msg = "\n\n----------------------------------------------\n" \
              "Process %d terminated with the following error:\n" \
              "----------------------------------------------\n\n" % error_index
        msg += original_trace
        raise Exception(msg)


def spawn(func, args=(), nprocs=-1, join=True, daemon=False, **options):
    """
    Start multiple processes with ``spawn`` method for parallel training.

435
    .. note::
436 437 438
        ``spawn`` now only supports GPU or XPU or MLU collective mode. The collective mode
        of GPU and XPU and MLU cannot be started at the same time, so the option `gpus` and
        `xpus` and 'mlus' cannot be configured at the same time.
439

440 441 442 443
    Args:
        func (function): The target function is called by spawned process.
            This function need to be able to pickled, so it must be defined
            at the top level of a module.
444
        args (list|tuple, optional): Arguments passed to ``func``.
445
        nprocs (int, optional): Number of processed to start. Default: -1.
C
Chen Weihang 已提交
446 447 448 449 450
            when nprocs is -1, the available device will be obtained from
            the environment variable when the model is executed: If use GPU,
            the currently available device ID is obtained from the environment
            variable CUDA_VISIBLE_DEVICES; If use XPU, the currently available
            device ID is obtained from the environment variable XPU_VISIBLE_DEVICES.
451 452 453
        join (bool, optional): Perform a blocking join on all spawned processes.
            Default: True.
        daemon (bool, optional): The spawned processes' daemon flag. Default: False.
C
Chen Weihang 已提交
454 455 456 457 458 459 460 461 462 463 464
        **options(dict, optional): Other initial parallel execution environment
            configuration options. The following options are currently supported:
            (1) start_method (string): the way to start a process.
            The start method can be ``spawn`` , ``fork`` , ``forkserver`` .
            Because the CUDA runtime does not support the ``fork`` start method,
            when use CUDA in subprocesses, we should start process by ``spawn``
            or ``forkserver`` method. Default: "spawn" ;
            (2) gpus (string): The training process will run on the
            selected gpus, such as "0,1,2,3". Default: None;
            (3) xpus (string): The training process will run on the
            selected xpus, such as "0,1,2,3". Default: None;
465 466 467
            (4) mlus (string): The training process will run on the
            selected mlus, such as "0,1,2,3". Default: None;
            (5) ips (string): Paddle cluster nodes ips, such as
C
Chen Weihang 已提交
468
            "192.168.0.16,192.168.0.17". Default: "127.0.0.1" .
469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485

    Returns:
        ``MultiprocessContext`` object, it hold the spawned processes.

    Examples:
        .. code-block:: python

            import paddle
            import paddle.nn as nn
            import paddle.optimizer as opt
            import paddle.distributed as dist

            class LinearNet(nn.Layer):
                def __init__(self):
                    super(LinearNet, self).__init__()
                    self._linear1 = nn.Linear(10, 10)
                    self._linear2 = nn.Linear(10, 1)
C
Chen Weihang 已提交
486

487 488 489
                def forward(self, x):
                    return self._linear2(self._linear1(x))

C
Chen Weihang 已提交
490
            def train(print_result=False):
491
                # 1. initialize parallel environment
492 493
                group = dist.init_parallel_env()
                process_group = group.process_group if group else None
494

495
                # 2. create data parallel layer & optimizer
496
                layer = LinearNet()
497
                dp_layer = paddle.DataParallel(layer, group = process_group)
498 499 500 501 502

                loss_fn = nn.MSELoss()
                adam = opt.Adam(
                    learning_rate=0.001, parameters=dp_layer.parameters())

503
                # 3. run layer
504 505 506 507
                inputs = paddle.randn([10, 10], 'float32')
                outputs = dp_layer(inputs)
                labels = paddle.randn([10, 1], 'float32')
                loss = loss_fn(outputs, labels)
C
Chen Weihang 已提交
508

509 510
                if print_result is True:
                    print("loss:", loss.numpy())
C
Chen Weihang 已提交
511

512 513 514 515 516
                loss.backward()

                adam.step()
                adam.clear_grad()

C
Chen Weihang 已提交
517 518 519
            # Usage 1: only pass function.
            # If your training method no need any argument, and
            # use all visible devices for parallel training.
520 521 522 523
            if __name__ == '__main__':
                dist.spawn(train)

            # Usage 2: pass function and arguments.
C
Chen Weihang 已提交
524
            # If your training method need some arguments, and
525 526 527 528 529
            # use all visible devices for parallel training.
            if __name__ == '__main__':
                dist.spawn(train, args=(True,))

            # Usage 3: pass function, arguments and nprocs.
C
Chen Weihang 已提交
530
            # If your training method need some arguments, and
531 532
            # only use part of visible devices for parallel training.
            # If your machine hold 8 cards {0,1,2,3,4,5,6,7},
C
Chen Weihang 已提交
533
            # this case will use cards {0,1}; If you set
534 535 536 537 538
            # CUDA_VISIBLE_DEVICES=4,5,6,7, this case will use
            # cards {4,5}
            if __name__ == '__main__':
                dist.spawn(train, args=(True,), nprocs=2)

539
            # Usage 4: pass function, arguments, nprocs and gpus.
C
Chen Weihang 已提交
540
            # If your training method need some arguments, and
541
            # only use part of visible devices for parallel training,
C
Chen Weihang 已提交
542
            # but you can't set your machine's environment variable
543
            # CUDA_VISIBLE_DEVICES, such as it is None or all cards
C
Chen Weihang 已提交
544
            # {0,1,2,3,4,5,6,7}, you can pass `gpus` to
545 546 547
            # select the GPU cards you want to use. For example,
            # this case will use cards {4,5} if your machine hold 8 cards.
            if __name__ == '__main__':
548
                dist.spawn(train, args=(True,), nprocs=2, gpus='4,5')
549 550 551
    """
    # NOTE(chenweihang): [ why only supports python3.4+ ? ]
    # Python supported setting the child process startup method
552 553
    # since 3.4. The previous version can only use the default startup
    # method, while the default startup method of Unix is fork, which
554 555 556
    # cannot support CUDA runtime multi-process
    _py_supported_check()

557
    # Give an error hint when the users enter a configuration option
558 559 560
    # that does not exist
    _options_valid_check(options)

561 562
    # get default nprocs
    if nprocs == -1:
563
        nprocs = _get_default_nprocs()
564 565

    # NOTE(chenweihang): [ why need get cluster info before run? ]
566 567
    # when using `paddle.distributed.spawn` start parallel training,
    # we should get cluster info before starting subprocess, and pass
568 569 570 571 572
    # correct info to each subprocess
    procs_env_list = _get_subprocess_env_list(nprocs, options)

    # start processes
    # NOTE(chenweihang): [ why default start method is spawn? ]
573 574
    # The CUDA runtime does not support the fork start method,
    # either the spawn or forkserver start method are required
575 576 577 578 579 580 581 582 583 584 585 586
    # to use CUDA in subprocesses.
    start_method = options.get('start_method', None)
    if start_method is None:
        start_method = 'spawn'
    mp = multiprocessing.get_context(start_method)

    error_queues = []
    return_queues = []
    processes = []
    for i in range(nprocs):
        error_queue = mp.SimpleQueue()
        return_queue = mp.SimpleQueue()
587 588 589
        process = mp.Process(target=_func_wrapper,
                             args=(func, args, error_queue, return_queue,
                                   procs_env_list[i], options['backend']))
590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
        process.daemon = daemon
        process.start()
        error_queues.append(error_queue)
        return_queues.append(return_queue)
        processes.append(process)

    context = MultiprocessContext(processes, error_queues, return_queues)
    if not join:
        return context

    # loop until all process end
    while not context.join():
        pass

    # finally return context
    return context