abacus_trainer.py 18.6 KB
Newer Older
X
xiexionghang 已提交
1 2 3 4
"""
A paddle trainer Adapt to Abacus
"""
import abc
X
xiexionghang 已提交
5 6 7 8 9 10 11
import sys
import copy
import yaml
import time
import json
import datetime
import paddle.fluid as fluid
X
xiexionghang 已提交
12 13 14 15 16 17
import kagle.kagle_fs as kagle_fs
import kagle.kagle_util as kagle_util
import kagle.kagle_model as kagle_model
import kagle.kagle_metric as kagle_metric
import kagle.kagle_dataset as kagle_dataset
import kagle.trainer.kagle_trainer as kagle_trainer
X
xiexionghang 已提交
18
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
19
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
X
xiexionghang 已提交
20 21

class AbacusPaddleTrainer(kagle_trainer.Trainer):
X
xiexionghang 已提交
22 23
    """R
    """
X
xiexionghang 已提交
24
    def __init__(self, config):
X
xiexionghang 已提交
25 26
        """R
        """
X
xiexionghang 已提交
27 28 29 30 31 32 33 34 35
	kagle_trainer.Trainer.__init__(self, config)
        config['output_path'] = kagle_util.get_absolute_path(
            config['output_path'], config['io']['afs'])
        self.global_config = config
        self._place = fluid.CPUPlace()
        self._exe = fluid.Executor(self._place)
        self._exector_context = {}
        self._metrics = {}
        self._path_generator = kagle_util.PathGenerator({
X
xiexionghang 已提交
36
            'templates': [
X
xiexionghang 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
                {'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'},
                {'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'},
                {'name': 'xbox_base', 'template': config['output_path'] + '/xbox/{day}/base/'},
                {'name': 'xbox_delta', 'template': config['output_path'] + '/xbox/{day}/delta-{pass_id}/'},
                {'name': 'batch_model', 'template': config['output_path'] + '/batch_model/{day}/{pass_id}/'}
            ]
        })
        if 'path_generator' in config:
            self._path_generator.add_path_template(config['path_generator'])
        
        self.regist_context_processor('uninit', self.init)
        self.regist_context_processor('startup', self.startup)
        self.regist_context_processor('begin_day', self.begin_day)
        self.regist_context_processor('train_pass', self.train_pass)
        self.regist_context_processor('end_day', self.end_day)

    def init(self, context):
X
xiexionghang 已提交
54 55
        """R
        """
56 57 58 59 60 61 62 63
        role_maker = None
        if self.global_config.get('process_mode', 'mpi') == 'brilliant_cpu':
            afs_config = self.global_config['io']['afs']
            role_maker = fluid.incubate.fleet.base.role_maker.GeneralRoleMaker(
                hdfs_name=afs_config['fs_name'], hdfs_ugi=afs_config['fs_ugi'],
                path=self.global_config['output_path'] + "/gloo",
                init_timeout_seconds=1200, run_timeout_seconds=1200)
        fleet.init(role_maker)
X
xiexionghang 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
        data_var_list = []
        data_var_name_dict = {}
        runnnable_scope = []
        runnnable_cost_op = []
        context['status'] = 'startup'

        for executor in self.global_config['executor']:
            scope = fluid.Scope()
            self._exector_context[executor['name']] = {}
            self._exector_context[executor['name']]['scope'] = scope
            self._exector_context[executor['name']]['model'] = kagle_model.create(executor)
            model =  self._exector_context[executor['name']]['model']
            self._metrics.update(model.get_metrics())
            runnnable_scope.append(scope)
            runnnable_cost_op.append(model.get_cost_op())
            for var in model._data_var:
                if var.name in data_var_name_dict:
                    continue
                data_var_list.append(var)
                data_var_name_dict[var.name] = var 

        optimizer = kagle_model.FluidModel.build_optimizer({
X
xiexionghang 已提交
86 87
	    'metrics': self._metrics,
            'optimizer_conf': self.global_config['optimizer']
X
xiexionghang 已提交
88 89 90 91 92 93 94 95 96
        })
        optimizer.minimize(runnnable_cost_op, runnnable_scope)
        for executor in self.global_config['executor']:
            scope = self._exector_context[executor['name']]['scope']
            model =  self._exector_context[executor['name']]['model']
            program = model._build_param['model']['train_program']
            if not executor['is_update_sparse']:
                program._fleet_opt["program_configs"][str(id(model.get_cost_op().block.program))]["push_sparse"] = []
            if 'train_thread_num' not in executor:
X
xiexionghang 已提交
97
                executor['train_thread_num'] = self.global_config['train_thread_num']
X
xiexionghang 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
            with fluid.scope_guard(scope):
                self._exe.run(model._build_param['model']['startup_program'])
            model.dump_model_program('./')

        #server init done
        if fleet.is_server():
            return 0
        
        self._dataset = {}
        for dataset_item in self.global_config['dataset']['data_list']:
            dataset_item['data_vars'] = data_var_list
            dataset_item.update(self.global_config['io']['afs'])
            dataset_item["batch_size"] = self.global_config['batch_size']
            self._dataset[dataset_item['name']] = kagle_dataset.FluidTimeSplitDataset(dataset_item)
        #if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= last_day and config.reqi_dnn_plugin_pass >= last_pass:
        #    util.reqi_changeslot(config.hdfs_dnn_plugin_path, join_save_params, common_save_params, update_save_params, scope2, scope3)
        fleet.init_worker()
        pass

    def print_log(self, log_str, params):
X
xiexionghang 已提交
118 119
        """R
        """
X
xiexionghang 已提交
120 121 122 123
        params['index'] = fleet.worker_index()
        return kagle_util.print_log(log_str, params)

    def print_global_metrics(self, scope, model, monitor_data, stdout_str):
X
xiexionghang 已提交
124 125
        """R
        """
X
xiexionghang 已提交
126 127 128
        metrics = model.get_metrics()
        metric_calculator = kagle_metric.PaddleAUCMetric(None)
        for metric in metrics:
X
xiexionghang 已提交
129
            metric_param =  {'label': metric, 'metric_dict': metrics[metric]}
X
xiexionghang 已提交
130 131
            metric_calculator.calculate(scope, metric_param)
            metric_result = metric_calculator.get_result_to_string() 
X
xiexionghang 已提交
132
            self.print_log(metric_result, {'master': True, 'stdout': stdout_str})
X
xiexionghang 已提交
133 134 135 136
            monitor_data += metric_result
            metric_calculator.clear(scope, metric_param)
       
    def save_model(self, day, pass_index, base_key):
X
xiexionghang 已提交
137 138
        """R
        """
X
xiexionghang 已提交
139
        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, 
X
xiexionghang 已提交
140
            {'master': True, 'log_format': 'save model cost %s sec'}) 
X
xiexionghang 已提交
141 142 143 144 145 146
        model_path = self._path_generator.generate_path('batch_model', {'day': day, 'pass_id': pass_index})
        save_mode = 0     # just save all 
        if pass_index < 1: #batch_model
            save_mode = 3     # unseen_day++, save all
        kagle_util.rank0_print("going to save_model %s" % model_path)
        fleet.save_persistables(None, model_path, mode=save_mode)
147 148
        if fleet._role_maker.is_first_worker():
            self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True)
X
xiexionghang 已提交
149 150 151 152
        cost_printer.done()
        return model_path
        
    def save_xbox_model(self, day, pass_index, xbox_base_key, monitor_data):
X
xiexionghang 已提交
153 154
        """R
        """
X
xiexionghang 已提交
155 156 157 158 159 160
        stdout_str = ""
        xbox_patch_id = str(int(time.time()))
        kagle_util.rank0_print("begin save delta model")
        
        model_path = ""
        xbox_model_donefile = ""
X
xiexionghang 已提交
161 162
        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, \
            'log_format': 'save xbox model cost %s sec', 'stdout': stdout_str})
X
xiexionghang 已提交
163 164 165
        if pass_index < 1:
            save_mode = 2
            xbox_patch_id = xbox_base_key
X
xiexionghang 已提交
166 167
            model_path = self._path_generator.generate_path('xbox_base', {'day': day})
            xbox_model_donefile = self._path_generator.generate_path('xbox_base_done', {'day': day})
X
xiexionghang 已提交
168 169
        else:
            save_mode = 1
X
xiexionghang 已提交
170 171
            model_path = self._path_generator.generate_path('xbox_delta', {'day': day, 'pass_id': pass_index})
            xbox_model_donefile = self._path_generator.generate_path('xbox_delta_done', {'day': day})
X
xiexionghang 已提交
172 173 174 175
        total_save_num = fleet.save_persistables(None, model_path, mode=save_mode)
        cost_printer.done()

        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 
X
xiexionghang 已提交
176
                'log_format': 'save cache model cost %s sec', 'stdout': stdout_str})
X
xiexionghang 已提交
177 178 179 180 181 182 183 184 185 186 187 188 189 190
        model_file_handler = kagle_fs.FileHandler(self.global_config['io']['afs'])
        if self.global_config['save_cache_model']:
            cache_save_num = fleet.save_cache_model(None, model_path, mode=save_mode)
            model_file_handler.write(
                    "file_prefix:part\npart_num:16\nkey_num:%d\n" % cache_save_num,
                    model_path + '/000_cache/sparse_cache.meta', 'w')
        cost_printer.done()
        kagle_util.rank0_print("save xbox cache model done, key_num=%s" % cache_save_num)

        save_env_param = {
            'executor': self._exe,
            'save_combine': True
        }
        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 
X
xiexionghang 已提交
191
                'log_format': 'save dense model cost %s sec', 'stdout': stdout_str})
192 193 194 195 196 197 198 199 200 201 202 203 204
        if fleet._role_maker.is_first_worker():
            for executor in self.global_config['executor']:
                if 'layer_for_inference' not in executor:
                    continue
                executor_name = executor['name']
                model = self._exector_context[executor_name]['model']
                save_env_param['inference_list'] = executor['layer_for_inference']
                save_env_param['scope'] =  self._exector_context[executor_name]['scope']
                model.dump_inference_param(save_env_param)
                for dnn_layer in executor['layer_for_inference']:
                    model_file_handler.cp(dnn_layer['save_file_name'], 
                        model_path + '/dnn_plugin/' + dnn_layer['save_file_name'])
        fleet._role_maker._barrier_worker()
X
xiexionghang 已提交
205 206 207
        cost_printer.done()

        xbox_done_info = {
X
xiexionghang 已提交
208 209 210 211 212 213 214 215 216 217 218
            "id": xbox_patch_id,
            "key": xbox_base_key,
            "ins_path": "",
            "ins_tag": "feasign",
            "partition_type": "2",
            "record_count": "111111",
            "monitor_data": monitor_data,
            "mpi_size": str(fleet.worker_num()),
            "input": model_path.rstrip("/") + "/000",
            "job_id": kagle_util.get_env_value("JOB_ID"),
            "job_name": kagle_util.get_env_value("JOB_NAME")
X
xiexionghang 已提交
219
        }
220 221 222 223 224
        if fleet._role_maker.is_first_worker():
            model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a')
            if pass_index > 0:
                self._train_pass.save_train_progress(day, pass_index, xbox_base_key, model_path, is_checkpoint=False)
        fleet._role_maker._barrier_worker()
X
xiexionghang 已提交
225 226 227
        return stdout_str 
                
    def run_executor(self, executor_config, dataset, stdout_str):
X
xiexionghang 已提交
228 229
        """R
        """
X
xiexionghang 已提交
230 231 232 233 234 235 236 237 238 239 240 241 242
        day = self._train_pass.date()
        pass_id = self._train_pass._pass_id
        xbox_base_key = self._train_pass._base_key
        executor_name = executor_config['name']
        scope = self._exector_context[executor_name]['scope']
        model = self._exector_context[executor_name]['model']
        with fluid.scope_guard(scope):
            kagle_util.rank0_print("Begin " + executor_name + " pass")
            begin = time.time()
            program = model._build_param['model']['train_program']
            self._exe.train_from_dataset(program, dataset, scope,
                thread=executor_config['train_thread_num'], debug=self.global_config['debug'])
            end = time.time()
X
xiexionghang 已提交
243
            local_cost = (end - begin) / 60.0
X
xiexionghang 已提交
244 245 246 247 248 249 250 251 252 253 254
            avg_cost = kagle_util.worker_numric_avg(local_cost)
            min_cost = kagle_util.worker_numric_min(local_cost)
            max_cost = kagle_util.worker_numric_max(local_cost)
            kagle_util.rank0_print("avg train time %s mins, min %s mins, max %s mins" % (avg_cost, min_cost, max_cost))
            self._exector_context[executor_name]['cost'] = max_cost

            monitor_data = ""
            self.print_global_metrics(scope, model, monitor_data, stdout_str)
            kagle_util.rank0_print("End " + executor_name + " pass")
            if self._train_pass.need_dump_inference(pass_id) and executor_config['dump_inference_model']:
                stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data)
255
        fleet._role_maker._barrier_worker()
X
xiexionghang 已提交
256 257

    def startup(self, context):
X
xiexionghang 已提交
258 259
        """R
        """
X
xiexionghang 已提交
260 261 262 263 264 265 266 267
        if fleet.is_server():
            fleet.run_server()
            context['status'] = 'wait'
            return
        stdout_str = ""
        self._train_pass = kagle_util.TimeTrainPass(self.global_config)
        if not self.global_config['cold_start']:
            cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, 
X
xiexionghang 已提交
268
                {'master': True, 'log_format': 'load model cost %s sec', 'stdout': stdout_str})
X
xiexionghang 已提交
269 270 271 272 273 274 275 276 277
            self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True})
            #if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date()
            #    and config.reqi_dnn_plugin_pass >= self._pass_id:
            #    fleet.load_one_table(0, self._train_pass._checkpoint_model_path)
            #else:
            fleet.init_server(self._train_pass._checkpoint_model_path, mode=0)
            cost_printer.done()
        if self.global_config['save_first_base']:
            self.print_log("save_first_base=True", {'master': True})
X
xiexionghang 已提交
278
            self.print_log("going to save xbox base model", {'master': True, 'stdout': stdout_str})
X
xiexionghang 已提交
279
            self._train_pass._base_key = int(time.time())
X
xiexionghang 已提交
280
            stdout_str += self.save_xbox_model(self._train_pass.date(), 0, self._train_pass._base_key, "")
X
xiexionghang 已提交
281 282 283
        context['status'] = 'begin_day'
    
    def begin_day(self, context):
X
xiexionghang 已提交
284 285
        """R
        """
X
xiexionghang 已提交
286 287 288 289 290
        stdout_str = ""
        if not self._train_pass.next():
            context['is_exit'] = True
        day = self._train_pass.date()
        pass_id = self._train_pass._pass_id
X
xiexionghang 已提交
291
        self.print_log("======== BEGIN DAY:%s ========" % day, {'master': True, 'stdout': stdout_str})
X
xiexionghang 已提交
292 293 294 295 296 297
        if pass_id == self._train_pass.max_pass_num_day():
            context['status'] = 'end_day'
        else:
            context['status'] = 'train_pass'
    
    def end_day(self, context):
X
xiexionghang 已提交
298 299
        """R
        """
X
xiexionghang 已提交
300 301 302 303 304 305 306
        day = self._train_pass.date()
        pass_id = self._train_pass._pass_id
        xbox_base_key = int(time.time())
        context['status'] = 'begin_day'

        kagle_util.rank0_print("shrink table")
        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, 
X
xiexionghang 已提交
307
            {'master': True, 'log_format': 'shrink table done, cost %s sec'})
X
xiexionghang 已提交
308 309 310 311 312 313 314 315 316 317 318 319 320 321
        fleet.shrink_sparse_table()
        for executor in self._exector_context:
            self._exector_context[executor]['model'].shrink({
                'scope': self._exector_context[executor]['scope'],
                'decay': self.global_config['optimizer']['dense_decay_rate']
            })
        cost_printer.done()

        next_date = self._train_pass.date(delta_day=1)
        kagle_util.rank0_print("going to save xbox base model")
        self.save_xbox_model(next_date, 0, xbox_base_key, "")
        kagle_util.rank0_print("going to save batch model")
        self.save_model(next_date, 0, xbox_base_key)
        self._train_pass._base_key = xbox_base_key
322
        fleet._role_maker._barrier_worker()
X
xiexionghang 已提交
323 324

    def train_pass(self, context):
X
xiexionghang 已提交
325 326
        """R
        """
X
xiexionghang 已提交
327 328 329 330 331
        stdout_str = ""
        day = self._train_pass.date()
        pass_id = self._train_pass._pass_id
        base_key = self._train_pass._base_key
        pass_time = self._train_pass._current_train_time.strftime("%Y%m%d%H%M")
X
xiexionghang 已提交
332
        self.print_log("    ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout': stdout_str})
X
xiexionghang 已提交
333 334
        train_begin_time = time.time()

X
xiexionghang 已提交
335 336
        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, \
            {'master': True, 'log_format': 'load into memory done, cost %s sec', 'stdout': stdout_str})
X
xiexionghang 已提交
337 338 339 340
        current_dataset = {}
        for name in self._dataset:
            current_dataset[name] = self._dataset[name].load_dataset({
                'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(),
X
xiexionghang 已提交
341
                'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass
X
xiexionghang 已提交
342
            })
343
        fleet._role_maker._barrier_worker()
X
xiexionghang 已提交
344 345 346 347
        cost_printer.done()
                
        kagle_util.rank0_print("going to global shuffle")
        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, { 
X
xiexionghang 已提交
348 349
            'master': True, 'stdout': stdout_str,
            'log_format': 'global shuffle done, cost %s sec'}) 
X
xiexionghang 已提交
350 351 352 353
        for name in current_dataset:
            current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread'])
        cost_printer.done()
        # str(dataset.get_shuffle_data_size(fleet))
354
        fleet._role_maker._barrier_worker()
X
xiexionghang 已提交
355 356 357 358 359 360 361

        if self.global_config['prefetch_data']:
            next_pass_time = (self._train_pass._current_train_time + 
                datetime.timedelta(minutes=self._train_pass._interval_per_pass)).strftime("%Y%m%d%H%M")
            for name in self._dataset:
                self._dataset[name].preload_dataset({
                    'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(),
X
xiexionghang 已提交
362
                    'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass
X
xiexionghang 已提交
363 364
                })
        
365
        fleet._role_maker._barrier_worker()
X
xiexionghang 已提交
366 367 368
        pure_train_begin = time.time()
        for executor in self.global_config['executor']:
            self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str)
X
xiexionghang 已提交
369 370
        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, \
            {'master': True, 'log_format': 'release_memory cost %s sec'}) 
X
xiexionghang 已提交
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387
        for name in current_dataset:
            current_dataset[name].release_memory()
        pure_train_cost = time.time() - pure_train_begin
        
        if self._train_pass.is_checkpoint_pass(pass_id):
            self.save_model(day, pass_id, base_key)

        train_end_time = time.time()
        train_cost = train_end_time - train_begin_time
        other_cost = train_cost - pure_train_cost 
        log_str = "finished train day %s pass %s time cost:%s sec job time cost:" % (day, pass_id, train_cost)
        for executor in self._exector_context:
            log_str += '[' + executor + ':' + str(self._exector_context[executor]['cost']) + ']'
        log_str += '[other_cost:' + str(other_cost) + ']'         
        kagle_util.rank0_print(log_str)
        stdout_str += kagle_util.now_time_str() + log_str
        sys.stdout.write(stdout_str)
388
        fleet._role_maker._barrier_worker()
X
xiexionghang 已提交
389 390 391 392 393 394
        stdout_str = ""
        if pass_id == self._train_pass.max_pass_num_day():
            context['status'] = 'end_day'
            return
        elif not self._train_pass.next():
            context['is_exit'] = True