ps_instance.py 4.3 KB
Newer Older
D
dongdaxiang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

H
heqiaozhi 已提交
14 15 16
import helper as dist_helper
import sys

H
heqiaozhi 已提交
17

H
heqiaozhi 已提交
18
class PaddlePSInstance(object):
H
heqiaozhi 已提交
19 20 21 22 23 24 25 26 27
    """
        PaddlePSInstance class is used to generate A instance of server or worker 
        Args:
            server_worker_mode: is a value 0 or 1, default is 1
            proc_per_node: process per node, default is 2 
        Examples:
            instance = PaddlePSInstance(1, 2)
    """

H
heqiaozhi 已提交
28
    def __init__(self, server_worker_mode, proc_per_node):
H
heqiaozhi 已提交
29 30 31 32 33
        self.dh = dist_helper.MPIHelper()
        self._rankid = self.dh.get_rank()
        self._server_worker_mode = server_worker_mode
        self._proc_per_node = proc_per_node
        self._nodes = self.dh.get_size()
H
heqiaozhi 已提交
34

H
heqiaozhi 已提交
35 36 37 38
        self._ip = 0
        self._worker_num = self._nodes * self._proc_per_node / 2
        self._server_num = self._nodes * self._proc_per_node / 2
        self._total_server_worker = self._worker_num + self._server_num
H
heqiaozhi 已提交
39
        self._node_type = None  #IDLE=-1, WORKER=1, SERVER=0
H
heqiaozhi 已提交
40 41 42 43 44 45
        self._set_nodetype()
        self._comm = None
        self._split_comm()

    def _set_nodetype(self):
        if self._server_worker_mode == 0:
H
heqiaozhi 已提交
46
            if self._rankid < self._server_num:
H
heqiaozhi 已提交
47 48 49 50 51 52 53 54 55 56 57
                self._node_type = 1
            elif self._rankid < self._total_server_worker:
                self._node_type = 0
            else:
                self._node_type = -1
        elif self._server_worker_mode == 1:
            if self._rankid < self._total_server_worker:
                if 0 == self._rankid % self._proc_per_node % 2:
                    self._node_type = 0
                else:
                    self._node_type = 1
H
heqiaozhi 已提交
58 59
            else:
                self._node_type = -1
H
heqiaozhi 已提交
60 61
        else:
            self._node_type = -1
H
heqiaozhi 已提交
62

H
heqiaozhi 已提交
63
        #if self._rankid == 0:
H
heqiaozhi 已提交
64
        #print "node type: ", self._node_type
H
heqiaozhi 已提交
65 66 67 68 69 70 71 72 73

    def _split_comm(self):
        if self.is_server():
            self._comm = self.dh.comm.Split(self._node_type)
        elif self.is_worker():
            self._comm = self.dh.comm.Split(self._node_type)
        pass

    def get_worker_index(self):
H
heqiaozhi 已提交
74 75 76
        """
        Return worker index 
        """
H
heqiaozhi 已提交
77 78 79 80 81 82
        if self._server_worker_mode == 0:
            return self._rankid == self.server_num
        else:
            return self._rankid / self._proc_per_node

    def get_server_index(self):
H
heqiaozhi 已提交
83 84 85
        """
        Return server index 
        """
H
heqiaozhi 已提交
86 87 88 89 90 91
        if self._server_worker_mode == 0:
            return self.rank_id
        else:
            return self.rank_id / self._proc_per_node

    def is_worker(self):
H
heqiaozhi 已提交
92 93 94
        """
        Return instance is worker or not
        """
H
heqiaozhi 已提交
95 96 97
        return self._node_type == 1

    def is_server(self):
H
heqiaozhi 已提交
98 99 100
        """
        Return instance is server or not
        """
H
heqiaozhi 已提交
101 102 103
        return self._node_type == 0

    def is_first_worker(self):
H
heqiaozhi 已提交
104 105 106
        """
        Return instance is first worker or not
        """
H
heqiaozhi 已提交
107 108 109
        return self.is_worker() and 0 == self.get_worker_index()

    def set_ip(self, ip):
H
heqiaozhi 已提交
110 111 112
        """
            set server ip
        """
H
heqiaozhi 已提交
113 114 115
        self._ip = ip

    def gather_ips(self):
H
heqiaozhi 已提交
116 117 118
        """
        Return all servers and workers ip throught mpi allgather 
        """
H
heqiaozhi 已提交
119 120 121 122
        self._ips = self.dh.comm.allgather(self._ip)
        return self._ips

    def get_node_cnt(self):
H
heqiaozhi 已提交
123 124 125
        """
        Return node cnt
        """
H
heqiaozhi 已提交
126 127 128
        return self._nodes

    def barrier_all(self):
H
heqiaozhi 已提交
129 130 131
        """
        barrier workers and servers
        """
H
heqiaozhi 已提交
132 133 134
        self.dh.comm.barrier()

    def barrier_worker(self):
H
heqiaozhi 已提交
135 136 137
        """
        barrier workers
        """
H
heqiaozhi 已提交
138 139 140 141 142
        if self.is_worker():
            self._comm.barrier()
        pass

    def finalize(self):
H
heqiaozhi 已提交
143 144 145
        """
        MPI finalize
        """
H
heqiaozhi 已提交
146
        self.dh.finalize()
H
heqiaozhi 已提交
147 148 149 150 151 152
        pass


if __name__ == "__main__":
    instance = PaddlePSInstance(1, 1, 2, 50)
    instance.barrier_all()