ctr_trainer.py 20.4 KB
Newer Older
T
tangwei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

X
xiexionghang 已提交
15
import abc
X
xiexionghang 已提交
16 17 18 19 20 21
import sys
import copy
import yaml
import time
import json
import datetime
T
tangwei 已提交
22 23 24

import numpy as np

X
xiexionghang 已提交
25
import paddle.fluid as fluid
T
tangwei 已提交
26 27 28 29 30 31 32 33

from .. utils import fs as fs
from .. utils import util as util
from .. metrics.auc_metrics import AUCMetric
from .. models import base as model_basic
from .. reader import dataset
from . import trainer

X
xiexionghang 已提交
34
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
35
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
X
xiexionghang 已提交
36

T
tangwei 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77

def wroker_numric_opt(value, env, opt):
    """
    numric count opt for workers
    Args:
        value: value for count
        env: mpi/gloo
        opt: count operator, SUM/MAX/MIN/AVG
    Return:
        count result
    """
    local_value = np.array([value])
    global_value = np.copy(local_value) * 0
    fleet._role_maker.all_reduce_worker(local_value, global_value, opt)
    return global_value[0]


def worker_numric_sum(value, env="mpi"):
    """R
    """
    return wroker_numric_opt(value, env, "sum")


def worker_numric_avg(value, env="mpi"):
    """R
    """
    return worker_numric_sum(value, env) / fleet.worker_num()


def worker_numric_min(value, env="mpi"):
    """R
    """
    return wroker_numric_opt(value, env, "min")


def worker_numric_max(value, env="mpi"):
    """R
    """
    return wroker_numric_opt(value, env, "max")


T
tangwei 已提交
78
class CtrPaddleTrainer(trainer.Trainer):
X
xiexionghang 已提交
79 80
    """R
    """
T
tangwei 已提交
81

X
xiexionghang 已提交
82
    def __init__(self, config):
X
xiexionghang 已提交
83 84
        """R
        """
T
tangwei 已提交
85 86
        trainer.Trainer.__init__(self, config)
        config['output_path'] = util.get_absolute_path(
X
xiexionghang 已提交
87 88 89 90 91 92
            config['output_path'], config['io']['afs'])
        self.global_config = config
        self._place = fluid.CPUPlace()
        self._exe = fluid.Executor(self._place)
        self._exector_context = {}
        self._metrics = {}
T
tangwei 已提交
93
        self._path_generator = util.PathGenerator({
X
xiexionghang 已提交
94
            'templates': [
X
xiexionghang 已提交
95 96 97 98 99 100 101 102 103
                {'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'},
                {'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'},
                {'name': 'xbox_base', 'template': config['output_path'] + '/xbox/{day}/base/'},
                {'name': 'xbox_delta', 'template': config['output_path'] + '/xbox/{day}/delta-{pass_id}/'},
                {'name': 'batch_model', 'template': config['output_path'] + '/batch_model/{day}/{pass_id}/'}
            ]
        })
        if 'path_generator' in config:
            self._path_generator.add_path_template(config['path_generator'])
T
tangwei 已提交
104

X
xiexionghang 已提交
105 106 107 108 109 110 111
        self.regist_context_processor('uninit', self.init)
        self.regist_context_processor('startup', self.startup)
        self.regist_context_processor('begin_day', self.begin_day)
        self.regist_context_processor('train_pass', self.train_pass)
        self.regist_context_processor('end_day', self.end_day)

    def init(self, context):
X
xiexionghang 已提交
112 113
        """R
        """
114 115 116
        role_maker = None
        if self.global_config.get('process_mode', 'mpi') == 'brilliant_cpu':
            afs_config = self.global_config['io']['afs']
T
tangwei 已提交
117
            role_maker = GeneralRoleMaker(
118 119 120 121
                hdfs_name=afs_config['fs_name'], hdfs_ugi=afs_config['fs_ugi'],
                path=self.global_config['output_path'] + "/gloo",
                init_timeout_seconds=1200, run_timeout_seconds=1200)
        fleet.init(role_maker)
X
xiexionghang 已提交
122 123 124 125 126 127 128 129 130 131
        data_var_list = []
        data_var_name_dict = {}
        runnnable_scope = []
        runnnable_cost_op = []
        context['status'] = 'startup'

        for executor in self.global_config['executor']:
            scope = fluid.Scope()
            self._exector_context[executor['name']] = {}
            self._exector_context[executor['name']]['scope'] = scope
T
tangwei 已提交
132
            self._exector_context[executor['name']]['model'] = model_basic.create(executor)
T
tangwei 已提交
133
            model = self._exector_context[executor['name']]['model']
X
xiexionghang 已提交
134 135 136 137 138 139 140
            self._metrics.update(model.get_metrics())
            runnnable_scope.append(scope)
            runnnable_cost_op.append(model.get_cost_op())
            for var in model._data_var:
                if var.name in data_var_name_dict:
                    continue
                data_var_list.append(var)
T
tangwei 已提交
141
                data_var_name_dict[var.name] = var
X
xiexionghang 已提交
142

T
tangwei 已提交
143
        optimizer = model_basic.YamlModel.build_optimizer({
T
tangwei 已提交
144
            'metrics': self._metrics,
X
xiexionghang 已提交
145
            'optimizer_conf': self.global_config['optimizer']
X
xiexionghang 已提交
146 147 148 149
        })
        optimizer.minimize(runnnable_cost_op, runnnable_scope)
        for executor in self.global_config['executor']:
            scope = self._exector_context[executor['name']]['scope']
T
tangwei 已提交
150
            model = self._exector_context[executor['name']]['model']
X
xiexionghang 已提交
151 152 153 154
            program = model._build_param['model']['train_program']
            if not executor['is_update_sparse']:
                program._fleet_opt["program_configs"][str(id(model.get_cost_op().block.program))]["push_sparse"] = []
            if 'train_thread_num' not in executor:
X
xiexionghang 已提交
155
                executor['train_thread_num'] = self.global_config['train_thread_num']
X
xiexionghang 已提交
156 157 158 159
            with fluid.scope_guard(scope):
                self._exe.run(model._build_param['model']['startup_program'])
            model.dump_model_program('./')

T
tangwei 已提交
160
        # server init done
X
xiexionghang 已提交
161 162
        if fleet.is_server():
            return 0
T
tangwei 已提交
163

X
xiexionghang 已提交
164 165 166 167 168
        self._dataset = {}
        for dataset_item in self.global_config['dataset']['data_list']:
            dataset_item['data_vars'] = data_var_list
            dataset_item.update(self.global_config['io']['afs'])
            dataset_item["batch_size"] = self.global_config['batch_size']
T
tangwei 已提交
169
            self._dataset[dataset_item['name']] = dataset.FluidTimeSplitDataset(dataset_item)
T
tangwei 已提交
170
        # if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= last_day and config.reqi_dnn_plugin_pass >= last_pass:
X
xiexionghang 已提交
171 172 173 174 175
        #    util.reqi_changeslot(config.hdfs_dnn_plugin_path, join_save_params, common_save_params, update_save_params, scope2, scope3)
        fleet.init_worker()
        pass

    def print_log(self, log_str, params):
X
xiexionghang 已提交
176 177
        """R
        """
X
xiexionghang 已提交
178
        params['index'] = fleet.worker_index()
T
tangwei 已提交
179 180 181 182 183 184 185 186
        if params['master']:
            if fleet.worker_index() == 0:
                print(log_str)
                sys.stdout.flush()
        else:
            print(log_str)
        if 'stdout' in params:
            params['stdout'] += str(datetime.datetime.now()) + log_str
X
xiexionghang 已提交
187 188

    def print_global_metrics(self, scope, model, monitor_data, stdout_str):
X
xiexionghang 已提交
189 190
        """R
        """
X
xiexionghang 已提交
191
        metrics = model.get_metrics()
T
tangwei 已提交
192
        metric_calculator = AUCMetric(None)
X
xiexionghang 已提交
193
        for metric in metrics:
T
tangwei 已提交
194
            metric_param = {'label': metric, 'metric_dict': metrics[metric]}
X
xiexionghang 已提交
195
            metric_calculator.calculate(scope, metric_param)
T
tangwei 已提交
196
            metric_result = metric_calculator.get_result_to_string()
X
xiexionghang 已提交
197
            self.print_log(metric_result, {'master': True, 'stdout': stdout_str})
X
xiexionghang 已提交
198 199
            monitor_data += metric_result
            metric_calculator.clear(scope, metric_param)
T
tangwei 已提交
200

X
xiexionghang 已提交
201
    def save_model(self, day, pass_index, base_key):
X
xiexionghang 已提交
202 203
        """R
        """
T
tangwei 已提交
204
        cost_printer = util.CostPrinter(util.print_cost,
T
tangwei 已提交
205
                                              {'master': True, 'log_format': 'save model cost %s sec'})
X
xiexionghang 已提交
206
        model_path = self._path_generator.generate_path('batch_model', {'day': day, 'pass_id': pass_index})
T
tangwei 已提交
207 208 209
        save_mode = 0  # just save all
        if pass_index < 1:  # batch_model
            save_mode = 3  # unseen_day++, save all
T
tangwei 已提交
210
        util.rank0_print("going to save_model %s" % model_path)
X
xiexionghang 已提交
211
        fleet.save_persistables(None, model_path, mode=save_mode)
212 213
        if fleet._role_maker.is_first_worker():
            self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True)
X
xiexionghang 已提交
214 215
        cost_printer.done()
        return model_path
T
tangwei 已提交
216

X
xiexionghang 已提交
217
    def save_xbox_model(self, day, pass_index, xbox_base_key, monitor_data):
X
xiexionghang 已提交
218 219
        """R
        """
X
xiexionghang 已提交
220 221
        stdout_str = ""
        xbox_patch_id = str(int(time.time()))
T
tangwei 已提交
222
        util.rank0_print("begin save delta model")
T
tangwei 已提交
223

X
xiexionghang 已提交
224 225
        model_path = ""
        xbox_model_donefile = ""
T
tangwei 已提交
226
        cost_printer = util.CostPrinter(util.print_cost, {'master': True, \
T
tangwei 已提交
227 228
                                                                      'log_format': 'save xbox model cost %s sec',
                                                                      'stdout': stdout_str})
X
xiexionghang 已提交
229 230 231
        if pass_index < 1:
            save_mode = 2
            xbox_patch_id = xbox_base_key
X
xiexionghang 已提交
232 233
            model_path = self._path_generator.generate_path('xbox_base', {'day': day})
            xbox_model_donefile = self._path_generator.generate_path('xbox_base_done', {'day': day})
X
xiexionghang 已提交
234 235
        else:
            save_mode = 1
X
xiexionghang 已提交
236 237
            model_path = self._path_generator.generate_path('xbox_delta', {'day': day, 'pass_id': pass_index})
            xbox_model_donefile = self._path_generator.generate_path('xbox_delta_done', {'day': day})
X
xiexionghang 已提交
238 239 240
        total_save_num = fleet.save_persistables(None, model_path, mode=save_mode)
        cost_printer.done()

T
tangwei 已提交
241
        cost_printer = util.CostPrinter(util.print_cost, {'master': True,
T
tangwei 已提交
242 243
                                                                      'log_format': 'save cache model cost %s sec',
                                                                      'stdout': stdout_str})
T
tangwei 已提交
244
        model_file_handler = fs.FileHandler(self.global_config['io']['afs'])
X
xiexionghang 已提交
245 246 247
        if self.global_config['save_cache_model']:
            cache_save_num = fleet.save_cache_model(None, model_path, mode=save_mode)
            model_file_handler.write(
T
tangwei 已提交
248 249
                "file_prefix:part\npart_num:16\nkey_num:%d\n" % cache_save_num,
                model_path + '/000_cache/sparse_cache.meta', 'w')
X
xiexionghang 已提交
250
        cost_printer.done()
T
tangwei 已提交
251
        util.rank0_print("save xbox cache model done, key_num=%s" % cache_save_num)
X
xiexionghang 已提交
252 253 254 255 256

        save_env_param = {
            'executor': self._exe,
            'save_combine': True
        }
T
tangwei 已提交
257
        cost_printer = util.CostPrinter(util.print_cost, {'master': True,
T
tangwei 已提交
258 259
                                                                      'log_format': 'save dense model cost %s sec',
                                                                      'stdout': stdout_str})
260 261 262 263 264 265 266
        if fleet._role_maker.is_first_worker():
            for executor in self.global_config['executor']:
                if 'layer_for_inference' not in executor:
                    continue
                executor_name = executor['name']
                model = self._exector_context[executor_name]['model']
                save_env_param['inference_list'] = executor['layer_for_inference']
T
tangwei 已提交
267
                save_env_param['scope'] = self._exector_context[executor_name]['scope']
268 269
                model.dump_inference_param(save_env_param)
                for dnn_layer in executor['layer_for_inference']:
T
tangwei 已提交
270 271
                    model_file_handler.cp(dnn_layer['save_file_name'],
                                          model_path + '/dnn_plugin/' + dnn_layer['save_file_name'])
272
        fleet._role_maker._barrier_worker()
X
xiexionghang 已提交
273 274 275
        cost_printer.done()

        xbox_done_info = {
X
xiexionghang 已提交
276 277 278 279 280 281 282 283 284
            "id": xbox_patch_id,
            "key": xbox_base_key,
            "ins_path": "",
            "ins_tag": "feasign",
            "partition_type": "2",
            "record_count": "111111",
            "monitor_data": monitor_data,
            "mpi_size": str(fleet.worker_num()),
            "input": model_path.rstrip("/") + "/000",
T
tangwei 已提交
285 286
            "job_id": util.get_env_value("JOB_ID"),
            "job_name": util.get_env_value("JOB_NAME")
X
xiexionghang 已提交
287
        }
288 289 290 291 292
        if fleet._role_maker.is_first_worker():
            model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a')
            if pass_index > 0:
                self._train_pass.save_train_progress(day, pass_index, xbox_base_key, model_path, is_checkpoint=False)
        fleet._role_maker._barrier_worker()
T
tangwei 已提交
293 294
        return stdout_str

X
xiexionghang 已提交
295
    def run_executor(self, executor_config, dataset, stdout_str):
X
xiexionghang 已提交
296 297
        """R
        """
X
xiexionghang 已提交
298 299 300 301 302 303 304
        day = self._train_pass.date()
        pass_id = self._train_pass._pass_id
        xbox_base_key = self._train_pass._base_key
        executor_name = executor_config['name']
        scope = self._exector_context[executor_name]['scope']
        model = self._exector_context[executor_name]['model']
        with fluid.scope_guard(scope):
T
tangwei 已提交
305
            util.rank0_print("Begin " + executor_name + " pass")
X
xiexionghang 已提交
306 307 308
            begin = time.time()
            program = model._build_param['model']['train_program']
            self._exe.train_from_dataset(program, dataset, scope,
T
tangwei 已提交
309
                                         thread=executor_config['train_thread_num'], debug=self.global_config['debug'])
X
xiexionghang 已提交
310
            end = time.time()
X
xiexionghang 已提交
311
            local_cost = (end - begin) / 60.0
T
tangwei 已提交
312 313 314
            avg_cost = worker_numric_avg(local_cost)
            min_cost = worker_numric_min(local_cost)
            max_cost = worker_numric_max(local_cost)
T
tangwei 已提交
315
            util.rank0_print("avg train time %s mins, min %s mins, max %s mins" % (avg_cost, min_cost, max_cost))
X
xiexionghang 已提交
316 317 318 319
            self._exector_context[executor_name]['cost'] = max_cost

            monitor_data = ""
            self.print_global_metrics(scope, model, monitor_data, stdout_str)
T
tangwei 已提交
320
            util.rank0_print("End " + executor_name + " pass")
X
xiexionghang 已提交
321 322
            if self._train_pass.need_dump_inference(pass_id) and executor_config['dump_inference_model']:
                stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data)
323
        fleet._role_maker._barrier_worker()
X
xiexionghang 已提交
324 325

    def startup(self, context):
X
xiexionghang 已提交
326 327
        """R
        """
X
xiexionghang 已提交
328 329 330 331 332
        if fleet.is_server():
            fleet.run_server()
            context['status'] = 'wait'
            return
        stdout_str = ""
T
tangwei 已提交
333
        self._train_pass = util.TimeTrainPass(self.global_config)
X
xiexionghang 已提交
334
        if not self.global_config['cold_start']:
T
tangwei 已提交
335
            cost_printer = util.CostPrinter(util.print_cost,
T
tangwei 已提交
336 337
                                                  {'master': True, 'log_format': 'load model cost %s sec',
                                                   'stdout': stdout_str})
X
xiexionghang 已提交
338
            self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True})
T
tangwei 已提交
339
            # if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date()
X
xiexionghang 已提交
340 341
            #    and config.reqi_dnn_plugin_pass >= self._pass_id:
            #    fleet.load_one_table(0, self._train_pass._checkpoint_model_path)
T
tangwei 已提交
342
            # else:
X
xiexionghang 已提交
343 344 345 346
            fleet.init_server(self._train_pass._checkpoint_model_path, mode=0)
            cost_printer.done()
        if self.global_config['save_first_base']:
            self.print_log("save_first_base=True", {'master': True})
X
xiexionghang 已提交
347
            self.print_log("going to save xbox base model", {'master': True, 'stdout': stdout_str})
X
xiexionghang 已提交
348
            self._train_pass._base_key = int(time.time())
X
xiexionghang 已提交
349
            stdout_str += self.save_xbox_model(self._train_pass.date(), 0, self._train_pass._base_key, "")
X
xiexionghang 已提交
350
        context['status'] = 'begin_day'
T
tangwei 已提交
351

X
xiexionghang 已提交
352
    def begin_day(self, context):
X
xiexionghang 已提交
353 354
        """R
        """
X
xiexionghang 已提交
355 356 357 358 359
        stdout_str = ""
        if not self._train_pass.next():
            context['is_exit'] = True
        day = self._train_pass.date()
        pass_id = self._train_pass._pass_id
X
xiexionghang 已提交
360
        self.print_log("======== BEGIN DAY:%s ========" % day, {'master': True, 'stdout': stdout_str})
X
xiexionghang 已提交
361 362 363 364
        if pass_id == self._train_pass.max_pass_num_day():
            context['status'] = 'end_day'
        else:
            context['status'] = 'train_pass'
T
tangwei 已提交
365

X
xiexionghang 已提交
366
    def end_day(self, context):
X
xiexionghang 已提交
367 368
        """R
        """
X
xiexionghang 已提交
369 370 371 372 373
        day = self._train_pass.date()
        pass_id = self._train_pass._pass_id
        xbox_base_key = int(time.time())
        context['status'] = 'begin_day'

T
tangwei 已提交
374 375
        util.rank0_print("shrink table")
        cost_printer = util.CostPrinter(util.print_cost,
T
tangwei 已提交
376
                                              {'master': True, 'log_format': 'shrink table done, cost %s sec'})
X
xiexionghang 已提交
377 378 379 380 381 382 383 384 385
        fleet.shrink_sparse_table()
        for executor in self._exector_context:
            self._exector_context[executor]['model'].shrink({
                'scope': self._exector_context[executor]['scope'],
                'decay': self.global_config['optimizer']['dense_decay_rate']
            })
        cost_printer.done()

        next_date = self._train_pass.date(delta_day=1)
T
tangwei 已提交
386
        util.rank0_print("going to save xbox base model")
X
xiexionghang 已提交
387
        self.save_xbox_model(next_date, 0, xbox_base_key, "")
T
tangwei 已提交
388
        util.rank0_print("going to save batch model")
X
xiexionghang 已提交
389 390
        self.save_model(next_date, 0, xbox_base_key)
        self._train_pass._base_key = xbox_base_key
391
        fleet._role_maker._barrier_worker()
X
xiexionghang 已提交
392 393

    def train_pass(self, context):
X
xiexionghang 已提交
394 395
        """R
        """
X
xiexionghang 已提交
396 397 398 399 400
        stdout_str = ""
        day = self._train_pass.date()
        pass_id = self._train_pass._pass_id
        base_key = self._train_pass._base_key
        pass_time = self._train_pass._current_train_time.strftime("%Y%m%d%H%M")
X
xiexionghang 已提交
401
        self.print_log("    ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout': stdout_str})
X
xiexionghang 已提交
402 403
        train_begin_time = time.time()

T
tangwei 已提交
404
        cost_printer = util.CostPrinter(util.print_cost, \
T
tangwei 已提交
405 406
                                              {'master': True, 'log_format': 'load into memory done, cost %s sec',
                                               'stdout': stdout_str})
X
xiexionghang 已提交
407 408 409 410
        current_dataset = {}
        for name in self._dataset:
            current_dataset[name] = self._dataset[name].load_dataset({
                'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(),
X
xiexionghang 已提交
411
                'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass
X
xiexionghang 已提交
412
            })
413
        fleet._role_maker._barrier_worker()
X
xiexionghang 已提交
414
        cost_printer.done()
T
tangwei 已提交
415

T
tangwei 已提交
416 417
        util.rank0_print("going to global shuffle")
        cost_printer = util.CostPrinter(util.print_cost, {
X
xiexionghang 已提交
418
            'master': True, 'stdout': stdout_str,
T
tangwei 已提交
419
            'log_format': 'global shuffle done, cost %s sec'})
X
xiexionghang 已提交
420 421 422 423
        for name in current_dataset:
            current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread'])
        cost_printer.done()
        # str(dataset.get_shuffle_data_size(fleet))
424
        fleet._role_maker._barrier_worker()
X
xiexionghang 已提交
425 426

        if self.global_config['prefetch_data']:
T
tangwei 已提交
427 428
            next_pass_time = (self._train_pass._current_train_time +
                              datetime.timedelta(minutes=self._train_pass._interval_per_pass)).strftime("%Y%m%d%H%M")
X
xiexionghang 已提交
429 430 431
            for name in self._dataset:
                self._dataset[name].preload_dataset({
                    'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(),
X
xiexionghang 已提交
432
                    'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass
X
xiexionghang 已提交
433
                })
T
tangwei 已提交
434

435
        fleet._role_maker._barrier_worker()
X
xiexionghang 已提交
436 437 438
        pure_train_begin = time.time()
        for executor in self.global_config['executor']:
            self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str)
T
tangwei 已提交
439
        cost_printer = util.CostPrinter(util.print_cost, \
T
tangwei 已提交
440
                                              {'master': True, 'log_format': 'release_memory cost %s sec'})
X
xiexionghang 已提交
441 442 443
        for name in current_dataset:
            current_dataset[name].release_memory()
        pure_train_cost = time.time() - pure_train_begin
T
tangwei 已提交
444

X
xiexionghang 已提交
445 446 447 448 449
        if self._train_pass.is_checkpoint_pass(pass_id):
            self.save_model(day, pass_id, base_key)

        train_end_time = time.time()
        train_cost = train_end_time - train_begin_time
T
tangwei 已提交
450
        other_cost = train_cost - pure_train_cost
X
xiexionghang 已提交
451 452 453
        log_str = "finished train day %s pass %s time cost:%s sec job time cost:" % (day, pass_id, train_cost)
        for executor in self._exector_context:
            log_str += '[' + executor + ':' + str(self._exector_context[executor]['cost']) + ']'
T
tangwei 已提交
454
        log_str += '[other_cost:' + str(other_cost) + ']'
T
tangwei 已提交
455 456
        util.rank0_print(log_str)
        stdout_str += util.now_time_str() + log_str
X
xiexionghang 已提交
457
        sys.stdout.write(stdout_str)
458
        fleet._role_maker._barrier_worker()
X
xiexionghang 已提交
459 460 461 462 463 464
        stdout_str = ""
        if pass_id == self._train_pass.max_pass_num_day():
            context['status'] = 'end_day'
            return
        elif not self._train_pass.next():
            context['is_exit'] = True