test_downpoursgd.py 9.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14
"""Test cases for Downpour."""
15 16 17

import os
import sys
18 19 20 21 22 23 24
import unittest

from google.protobuf import text_format

import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.parameter_server.pslib.ps_pb2 as pslib
25 26
from paddle.fluid.incubate.fleet.parameter_server.pslib.node import (
    DownpourServer,
27
    DownpourWorker,
28
)
29
from paddle.fluid.trainer_factory import TrainerFactory
30

31 32
cache_path = os.path.expanduser('~/.cache/paddle/dataset')

33

34
class TestListenAndServOp(unittest.TestCase):
35
    """This class is Test Listen And ServOp."""
36

37
    def setUp(self):
38 39 40
        """This function is set Up."""
        if not os.path.exists(cache_path):
            os.makedirs(cache_path)
41 42

    def test_device_work_use_cvm(self):
43
        """test device work use_cvm."""
44 45 46 47
        if sys.platform == 'win32' or sys.platform == 'sys.platform':
            pass
        else:
            print(sys.platform)
48 49 50
            if not os.path.exists(
                '{}/{}'.format(cache_path, 'fleet_desc.prototxt')
            ):
51
                cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt -P {}/".format(
52 53
                    cache_path
                )
54
                os.system(cmd)
G
GGBond8488 已提交
55
            x = paddle.static.data(name='x', shape=[-1, 1], dtype='int64')
56 57 58
            x_emb = fluid.layers.embedding(
                input=x, size=[1, 2], is_distributed=True
            )
C
Charles-hit 已提交
59
            y_predict = paddle.static.nn.fc(x=x_emb, size=1)
G
GGBond8488 已提交
60
            y = paddle.static.data(name='y', shape=[-1, 1], dtype='float32')
61 62 63
            cost = paddle.nn.functional.square_error_cost(
                input=y_predict, label=y
            )
64
            avg_cost = paddle.mean(cost)
65 66

            ps_param = pslib.PSParameter()
67
            with open("{}/fleet_desc.prototxt".format(cache_path)) as f:
68 69 70 71 72 73 74 75 76 77 78
                text_format.Merge(f.read(), ps_param)
            fleet_desc = ps_param
            exe = fluid.Executor(fluid.CPUPlace())
            exe.run(fluid.default_startup_program())

            opt_info = {}
            main_program = fluid.default_main_program()
            program_id = str(id(avg_cost.block.program))
            program_configs = {}
            program_configs[program_id] = {
                "pull_sparse": [0],
79
                "push_sparse": [0],
80 81 82 83 84 85 86 87 88 89 90 91 92 93
            }
            program_configs[program_id]["pull_dense"] = [1]
            program_configs[program_id]["push_dense"] = [1]

            worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
            opt_info["program_configs"] = program_configs
            opt_info["trainer"] = "DistMultiTrainer"
            opt_info["device_worker"] = "DownpourSGD"
            opt_info["optimizer"] = "DownpourSGD"
            opt_info["fleet_desc"] = ps_param
            opt_info["worker_skipped_ops"] = worker_skipped_ops
            opt_info["use_cvm"] = True
            opt_info["scale_datanorm"] = -1
            opt_info["dump_slot"] = False
94
            opt_info["stat_var_names"] = []
95
            worker = DownpourWorker(None)
96 97
            server = DownpourServer()
            server.add_sparse_table(0, {})
98 99
            worker.get_desc().CopyFrom(ps_param.trainer_param[0])
            opt_info["program_id_to_worker"] = {program_id: worker}
100 101

            main_program._fleet_opt = opt_info
102
            trainer = TrainerFactory()._create_trainer(main_program._fleet_opt)
103 104 105 106
            trainer._set_program(main_program)
            trainer._gen_trainer_desc()

    def test_device_work(self):
107
        """This function is test devicve worker."""
108 109 110 111
        if sys.platform == 'win32' or sys.platform == 'sys.platform':
            pass
        else:
            print(sys.platform)
112 113 114
            if not os.path.exists(
                '{}/{}'.format(cache_path, 'fleet_desc.prototxt')
            ):
115
                cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt -P {}/".format(
116 117
                    cache_path
                )
118
                os.system(cmd)
G
GGBond8488 已提交
119
            x = paddle.static.data(name='x', shape=[-1, 1], dtype='int64')
120 121 122
            x_emb = fluid.layers.embedding(
                input=x, size=[1, 2], is_distributed=True
            )
C
Charles-hit 已提交
123
            y_predict = paddle.static.nn.fc(x=x_emb, size=1)
G
GGBond8488 已提交
124
            y = paddle.static.data(name='y', shape=[-1, 1], dtype='float32')
125 126 127
            cost = paddle.nn.functional.square_error_cost(
                input=y_predict, label=y
            )
128
            avg_cost = paddle.mean(cost)
129 130

            ps_param = pslib.PSParameter()
131
            with open("{}/fleet_desc.prototxt".format(cache_path)) as f:
132 133 134 135 136 137 138 139 140 141 142
                text_format.Merge(f.read(), ps_param)
            fleet_desc = ps_param
            exe = fluid.Executor(fluid.CPUPlace())
            exe.run(fluid.default_startup_program())

            opt_info = {}
            main_program = fluid.default_main_program()
            program_id = str(id(avg_cost.block.program))
            program_configs = {}
            program_configs[program_id] = {
                "pull_sparse": [0],
143
                "push_sparse": [0],
144 145 146 147 148 149 150 151 152 153 154 155 156 157
            }
            program_configs[program_id]["pull_dense"] = [1]
            program_configs[program_id]["push_dense"] = [1]

            worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
            opt_info["program_configs"] = program_configs
            opt_info["trainer"] = "DistMultiTrainer"
            opt_info["device_worker"] = "DownpourSGD"
            opt_info["optimizer"] = "DownpourSGD"
            opt_info["fleet_desc"] = ps_param
            opt_info["worker_skipped_ops"] = worker_skipped_ops
            opt_info["use_cvm"] = False
            opt_info["scale_datanorm"] = -1
            opt_info["dump_slot"] = False
158
            opt_info["stat_var_names"] = []
159 160 161
            worker = DownpourWorker(None)
            worker.get_desc().CopyFrom(ps_param.trainer_param[0])
            opt_info["program_id_to_worker"] = {program_id: worker}
162 163

            main_program._fleet_opt = opt_info
164
            trainer = TrainerFactory()._create_trainer(main_program._fleet_opt)
165 166 167
            trainer._set_program(main_program)
            trainer._gen_trainer_desc()

168
    def test_downpour_opt_work(self):
169
        """This function is test devicve worker."""
170 171 172 173
        if sys.platform == 'win32' or sys.platform == 'sys.platform':
            pass
        else:
            print(sys.platform)
174 175 176
            if not os.path.exists(
                '{}/{}'.format(cache_path, 'fleet_desc.prototxt')
            ):
177
                cmd = "wget --no-check-certificate https://pslib.bj.bcebos.com/fleet_desc.prototxt -P {}/".format(
178 179
                    cache_path
                )
180
                os.system(cmd)
G
GGBond8488 已提交
181
            x = paddle.static.data(name='x', shape=[-1, 1], dtype='int64')
182 183 184
            x_emb = fluid.layers.embedding(
                input=x, size=[1, 2], is_distributed=True
            )
C
Charles-hit 已提交
185
            y_predict = paddle.static.nn.fc(x=x_emb, size=1)
G
GGBond8488 已提交
186
            y = paddle.static.data(name='y', shape=[-1, 1], dtype='float32')
187 188 189
            cost = paddle.nn.functional.square_error_cost(
                input=y_predict, label=y
            )
190
            avg_cost = paddle.mean(cost)
191 192

            ps_param = pslib.PSParameter()
193
            with open("{}/fleet_desc.prototxt".format(cache_path)) as f:
194 195 196 197 198 199 200 201 202 203 204
                text_format.Merge(f.read(), ps_param)
            fleet_desc = ps_param
            exe = fluid.Executor(fluid.CPUPlace())
            exe.run(fluid.default_startup_program())

            opt_info = {}
            main_program = fluid.default_main_program()
            program_id = str(id(avg_cost.block.program))
            program_configs = {}
            program_configs[program_id] = {
                "pull_sparse": [0],
205
                "push_sparse": [0],
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
            }
            program_configs[program_id]["pull_dense"] = [1]
            program_configs[program_id]["push_dense"] = [1]

            worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
            opt_info["program_configs"] = program_configs
            opt_info["trainer"] = "DistMultiTrainer"
            opt_info["device_worker"] = "DownpourSGDOPT"
            opt_info["optimizer"] = "DownpourSGD"
            opt_info["fleet_desc"] = ps_param
            opt_info["worker_skipped_ops"] = worker_skipped_ops
            opt_info["use_cvm"] = False
            opt_info["scale_datanorm"] = -1
            opt_info["dump_slot"] = False
            opt_info["stat_var_names"] = []
L
lxsbupt 已提交
221
            opt_info["user_define_dump_filename"] = "./dump_filename/dump.txt"
222 223 224 225 226
            worker = DownpourWorker(None)
            worker.get_desc().CopyFrom(ps_param.trainer_param[0])
            opt_info["program_id_to_worker"] = {program_id: worker}

            main_program._fleet_opt = opt_info
227
            trainer = TrainerFactory()._create_trainer(main_program._fleet_opt)
228 229 230
            trainer._set_program(main_program)
            trainer._gen_trainer_desc()

231 232 233

if __name__ == "__main__":
    unittest.main()