mpi_helper.py 3.3 KB
Newer Older
1 2 3
#!/usr/bin/env python
# -*- coding: utf-8 -*-

H
hj 已提交
4

5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
class MPIHelper(object):
    def __init__(self):
        try:
            from mpi4py import MPI
        except:
            # local run
            self._size = 1
            self._rank = 0
            self._multi_machine = False

            import socket
            self._name = socket.gethostname()
        else:
            # in mpi environment
            self._comm = MPI.COMM_WORLD
            self._size = self._comm.Get_size()
            self._rank = self._comm.Get_rank()
            self._name = MPI.Get_processor_name()
            if self._size > 1:
                self._multi_machine = True
            else:
                self._multi_machine = False

    @property
    def multi_machine(self):
        return self._multi_machine

    @property
    def rank(self):
        return self._rank

    @property
    def size(self):
        return self._size

    @property
    def name(self):
        return self._name

    def bcast(self, data):
        if self._multi_machine:
            # call real bcast
H
hj 已提交
47
            return self._comm.bcast(data, root=0)
48 49 50 51 52 53 54
        else:
            # do nothing
            return data

    def gather(self, data):
        if self._multi_machine:
            # call real gather
H
hj 已提交
55
            return self._comm.gather(data, root=0)
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
        else:
            # do nothing
            return [data]

    def allgather(self, data):
        if self._multi_machine:
            # call real allgather
            return self._comm.allgather(data)
        else:
            # do nothing
            return [data]

    # calculate split range on mpi environment
    def split_range(self, array_length):
        if self._size == 1:
            return 0, array_length
        average_count = array_length / self._size
        if array_length % self._size == 0:
            return average_count * self._rank, average_count * (self._rank + 1)
        else:
            if self._rank < array_length % self._size:
H
hj 已提交
77 78
                return (average_count + 1) * self._rank, (average_count + 1) * (
                    self._rank + 1)
79 80 81 82 83 84 85 86 87
            else:
                start = (average_count + 1) * (array_length % self._size) \
                      + average_count * (self._rank - array_length % self._size)
                return start, start + average_count


if __name__ == "__main__":

    mpi = MPIHelper()
H
hj 已提交
88 89
    print("Hello world from process {} of {} at {}.".format(
        mpi.rank, mpi.size, mpi.name))
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111

    all_node_names = mpi.gather(mpi.name)
    print("all node names using gather: {}".format(all_node_names))

    all_node_names = mpi.allgather(mpi.name)
    print("all node names using allgather: {}".format(all_node_names))

    if mpi.rank == 0:
        data = range(10)
    else:
        data = None
    data = mpi.bcast(data)
    print("after bcast, process {} have data {}".format(mpi.rank, data))

    data = [i + mpi.rank for i in data]
    print("after modify, process {} have data {}".format(mpi.rank, data))

    new_data = mpi.gather(data)
    print("after gather, process {} have data {}".format(mpi.rank, new_data))

    # test for split
    for i in range(12):
H
hj 已提交
112
        length = i + mpi.size  # length should >= mpi.size
113 114 115
        [start, end] = mpi.split_range(length)
        split_result = mpi.gather([start, end])
        print("length {}, split_result {}".format(length, split_result))