abacus_trainer.py 17.4 KB
Newer Older
X
xiexionghang 已提交
1 2 3 4
"""
A paddle trainer Adapt to Abacus
"""
import abc
X
xiexionghang 已提交
5 6 7 8 9 10
import sys
import copy
import yaml
import time
import json
import datetime
X
xiexionghang 已提交
11 12 13 14 15 16
import kagle.kagle_fs
import kagle.kagle_util
import kagle.kagle_model
import kagle.kagle_metric
import kagle.kagle_dataset
import kagle.trainer.kagle_trainer
X
xiexionghang 已提交
17 18 19 20
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet

class AbacusPaddleTrainer(kagle_trainer.Trainer):
X
xiexionghang 已提交
21 22
    """R
    """
X
xiexionghang 已提交
23
    def __init__(self, config):
X
xiexionghang 已提交
24 25
        """R
        """
X
xiexionghang 已提交
26 27 28 29 30 31 32 33 34
	kagle_trainer.Trainer.__init__(self, config)
        config['output_path'] = kagle_util.get_absolute_path(
            config['output_path'], config['io']['afs'])
        self.global_config = config
        self._place = fluid.CPUPlace()
        self._exe = fluid.Executor(self._place)
        self._exector_context = {}
        self._metrics = {}
        self._path_generator = kagle_util.PathGenerator({
X
xiexionghang 已提交
35
            'templates': [
X
xiexionghang 已提交
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
                {'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'},
                {'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'},
                {'name': 'xbox_base', 'template': config['output_path'] + '/xbox/{day}/base/'},
                {'name': 'xbox_delta', 'template': config['output_path'] + '/xbox/{day}/delta-{pass_id}/'},
                {'name': 'batch_model', 'template': config['output_path'] + '/batch_model/{day}/{pass_id}/'}
            ]
        })
        if 'path_generator' in config:
            self._path_generator.add_path_template(config['path_generator'])
        
        self.regist_context_processor('uninit', self.init)
        self.regist_context_processor('startup', self.startup)
        self.regist_context_processor('begin_day', self.begin_day)
        self.regist_context_processor('train_pass', self.train_pass)
        self.regist_context_processor('end_day', self.end_day)

    def init(self, context):
X
xiexionghang 已提交
53 54
        """R
        """
X
xiexionghang 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
        fleet.init(self._exe)
        data_var_list = []
        data_var_name_dict = {}
        runnnable_scope = []
        runnnable_cost_op = []
        context['status'] = 'startup'

        for executor in self.global_config['executor']:
            scope = fluid.Scope()
            self._exector_context[executor['name']] = {}
            self._exector_context[executor['name']]['scope'] = scope
            self._exector_context[executor['name']]['model'] = kagle_model.create(executor)
            model =  self._exector_context[executor['name']]['model']
            self._metrics.update(model.get_metrics())
            runnnable_scope.append(scope)
            runnnable_cost_op.append(model.get_cost_op())
            for var in model._data_var:
                if var.name in data_var_name_dict:
                    continue
                data_var_list.append(var)
                data_var_name_dict[var.name] = var 

        optimizer = kagle_model.FluidModel.build_optimizer({
X
xiexionghang 已提交
78 79
	    'metrics': self._metrics,
            'optimizer_conf': self.global_config['optimizer']
X
xiexionghang 已提交
80 81 82 83 84 85 86 87 88
        })
        optimizer.minimize(runnnable_cost_op, runnnable_scope)
        for executor in self.global_config['executor']:
            scope = self._exector_context[executor['name']]['scope']
            model =  self._exector_context[executor['name']]['model']
            program = model._build_param['model']['train_program']
            if not executor['is_update_sparse']:
                program._fleet_opt["program_configs"][str(id(model.get_cost_op().block.program))]["push_sparse"] = []
            if 'train_thread_num' not in executor:
X
xiexionghang 已提交
89
                executor['train_thread_num'] = self.global_config['train_thread_num']
X
xiexionghang 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
            with fluid.scope_guard(scope):
                self._exe.run(model._build_param['model']['startup_program'])
            model.dump_model_program('./')

        #server init done
        if fleet.is_server():
            return 0
        
        self._dataset = {}
        for dataset_item in self.global_config['dataset']['data_list']:
            dataset_item['data_vars'] = data_var_list
            dataset_item.update(self.global_config['io']['afs'])
            dataset_item["batch_size"] = self.global_config['batch_size']
            self._dataset[dataset_item['name']] = kagle_dataset.FluidTimeSplitDataset(dataset_item)
        #if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= last_day and config.reqi_dnn_plugin_pass >= last_pass:
        #    util.reqi_changeslot(config.hdfs_dnn_plugin_path, join_save_params, common_save_params, update_save_params, scope2, scope3)
        fleet.init_worker()
        pass

    def print_log(self, log_str, params):
X
xiexionghang 已提交
110 111
        """R
        """
X
xiexionghang 已提交
112 113 114 115
        params['index'] = fleet.worker_index()
        return kagle_util.print_log(log_str, params)

    def print_global_metrics(self, scope, model, monitor_data, stdout_str):
X
xiexionghang 已提交
116 117
        """R
        """
X
xiexionghang 已提交
118 119 120
        metrics = model.get_metrics()
        metric_calculator = kagle_metric.PaddleAUCMetric(None)
        for metric in metrics:
X
xiexionghang 已提交
121
            metric_param =  {'label': metric, 'metric_dict': metrics[metric]}
X
xiexionghang 已提交
122 123
            metric_calculator.calculate(scope, metric_param)
            metric_result = metric_calculator.get_result_to_string() 
X
xiexionghang 已提交
124
            self.print_log(metric_result, {'master': True, 'stdout': stdout_str})
X
xiexionghang 已提交
125 126 127 128
            monitor_data += metric_result
            metric_calculator.clear(scope, metric_param)
       
    def save_model(self, day, pass_index, base_key):
X
xiexionghang 已提交
129 130
        """R
        """
X
xiexionghang 已提交
131
        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, 
X
xiexionghang 已提交
132
            {'master': True, 'log_format': 'save model cost %s sec'}) 
X
xiexionghang 已提交
133 134 135 136 137 138 139 140 141 142 143
        model_path = self._path_generator.generate_path('batch_model', {'day': day, 'pass_id': pass_index})
        save_mode = 0     # just save all 
        if pass_index < 1: #batch_model
            save_mode = 3     # unseen_day++, save all
        kagle_util.rank0_print("going to save_model %s" % model_path)
        fleet.save_persistables(None, model_path, mode=save_mode)
        self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True)
        cost_printer.done()
        return model_path
        
    def save_xbox_model(self, day, pass_index, xbox_base_key, monitor_data):
X
xiexionghang 已提交
144 145
        """R
        """
X
xiexionghang 已提交
146 147 148 149 150 151
        stdout_str = ""
        xbox_patch_id = str(int(time.time()))
        kagle_util.rank0_print("begin save delta model")
        
        model_path = ""
        xbox_model_donefile = ""
X
xiexionghang 已提交
152 153
        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, \
            'log_format': 'save xbox model cost %s sec', 'stdout': stdout_str})
X
xiexionghang 已提交
154 155 156
        if pass_index < 1:
            save_mode = 2
            xbox_patch_id = xbox_base_key
X
xiexionghang 已提交
157 158
            model_path = self._path_generator.generate_path('xbox_base', {'day': day})
            xbox_model_donefile = self._path_generator.generate_path('xbox_base_done', {'day': day})
X
xiexionghang 已提交
159 160
        else:
            save_mode = 1
X
xiexionghang 已提交
161 162
            model_path = self._path_generator.generate_path('xbox_delta', {'day': day, 'pass_id': pass_index})
            xbox_model_donefile = self._path_generator.generate_path('xbox_delta_done', {'day': day})
X
xiexionghang 已提交
163 164 165 166
        total_save_num = fleet.save_persistables(None, model_path, mode=save_mode)
        cost_printer.done()

        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 
X
xiexionghang 已提交
167
                'log_format': 'save cache model cost %s sec', 'stdout': stdout_str})
X
xiexionghang 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181
        model_file_handler = kagle_fs.FileHandler(self.global_config['io']['afs'])
        if self.global_config['save_cache_model']:
            cache_save_num = fleet.save_cache_model(None, model_path, mode=save_mode)
            model_file_handler.write(
                    "file_prefix:part\npart_num:16\nkey_num:%d\n" % cache_save_num,
                    model_path + '/000_cache/sparse_cache.meta', 'w')
        cost_printer.done()
        kagle_util.rank0_print("save xbox cache model done, key_num=%s" % cache_save_num)

        save_env_param = {
            'executor': self._exe,
            'save_combine': True
        }
        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 
X
xiexionghang 已提交
182
                'log_format': 'save dense model cost %s sec', 'stdout': stdout_str})
X
xiexionghang 已提交
183 184 185 186 187 188 189 190 191 192 193 194 195 196
        for executor in self.global_config['executor']:
            if 'layer_for_inference' not in executor:
                continue
            executor_name = executor['name']
            model = self._exector_context[executor_name]['model']
            save_env_param['inference_list'] = executor['layer_for_inference']
            save_env_param['scope'] =  self._exector_context[executor_name]['scope']
            model.dump_inference_param(save_env_param)
            for dnn_layer in executor['layer_for_inference']:
                model_file_handler.cp(dnn_layer['save_file_name'], 
                    model_path + '/dnn_plugin/' + dnn_layer['save_file_name'])
        cost_printer.done()

        xbox_done_info = {
X
xiexionghang 已提交
197 198 199 200 201 202 203 204 205 206 207
            "id": xbox_patch_id,
            "key": xbox_base_key,
            "ins_path": "",
            "ins_tag": "feasign",
            "partition_type": "2",
            "record_count": "111111",
            "monitor_data": monitor_data,
            "mpi_size": str(fleet.worker_num()),
            "input": model_path.rstrip("/") + "/000",
            "job_id": kagle_util.get_env_value("JOB_ID"),
            "job_name": kagle_util.get_env_value("JOB_NAME")
X
xiexionghang 已提交
208 209 210 211 212 213 214
        }
        model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a')
        if pass_index > 0:
            self._train_pass.save_train_progress(day, pass_index, xbox_base_key, model_path, is_checkpoint=False)
        return stdout_str 
                
    def run_executor(self, executor_config, dataset, stdout_str):
X
xiexionghang 已提交
215 216
        """R
        """
X
xiexionghang 已提交
217 218 219 220 221 222 223 224 225 226 227 228 229
        day = self._train_pass.date()
        pass_id = self._train_pass._pass_id
        xbox_base_key = self._train_pass._base_key
        executor_name = executor_config['name']
        scope = self._exector_context[executor_name]['scope']
        model = self._exector_context[executor_name]['model']
        with fluid.scope_guard(scope):
            kagle_util.rank0_print("Begin " + executor_name + " pass")
            begin = time.time()
            program = model._build_param['model']['train_program']
            self._exe.train_from_dataset(program, dataset, scope,
                thread=executor_config['train_thread_num'], debug=self.global_config['debug'])
            end = time.time()
X
xiexionghang 已提交
230
            local_cost = (end - begin) / 60.0
X
xiexionghang 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243
            avg_cost = kagle_util.worker_numric_avg(local_cost)
            min_cost = kagle_util.worker_numric_min(local_cost)
            max_cost = kagle_util.worker_numric_max(local_cost)
            kagle_util.rank0_print("avg train time %s mins, min %s mins, max %s mins" % (avg_cost, min_cost, max_cost))
            self._exector_context[executor_name]['cost'] = max_cost

            monitor_data = ""
            self.print_global_metrics(scope, model, monitor_data, stdout_str)
            kagle_util.rank0_print("End " + executor_name + " pass")
            if self._train_pass.need_dump_inference(pass_id) and executor_config['dump_inference_model']:
                stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data)

    def startup(self, context):
X
xiexionghang 已提交
244 245
        """R
        """
X
xiexionghang 已提交
246 247 248 249 250 251 252 253
        if fleet.is_server():
            fleet.run_server()
            context['status'] = 'wait'
            return
        stdout_str = ""
        self._train_pass = kagle_util.TimeTrainPass(self.global_config)
        if not self.global_config['cold_start']:
            cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, 
X
xiexionghang 已提交
254
                {'master': True, 'log_format': 'load model cost %s sec', 'stdout': stdout_str})
X
xiexionghang 已提交
255 256 257 258 259 260 261 262 263
            self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True})
            #if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date()
            #    and config.reqi_dnn_plugin_pass >= self._pass_id:
            #    fleet.load_one_table(0, self._train_pass._checkpoint_model_path)
            #else:
            fleet.init_server(self._train_pass._checkpoint_model_path, mode=0)
            cost_printer.done()
        if self.global_config['save_first_base']:
            self.print_log("save_first_base=True", {'master': True})
X
xiexionghang 已提交
264
            self.print_log("going to save xbox base model", {'master': True, 'stdout': stdout_str})
X
xiexionghang 已提交
265
            self._train_pass._base_key = int(time.time())
X
xiexionghang 已提交
266
            stdout_str += self.save_xbox_model(self._train_pass.date(), 0, self._train_pass._base_key, "")
X
xiexionghang 已提交
267 268 269
        context['status'] = 'begin_day'
    
    def begin_day(self, context):
X
xiexionghang 已提交
270 271
        """R
        """
X
xiexionghang 已提交
272 273 274 275 276
        stdout_str = ""
        if not self._train_pass.next():
            context['is_exit'] = True
        day = self._train_pass.date()
        pass_id = self._train_pass._pass_id
X
xiexionghang 已提交
277
        self.print_log("======== BEGIN DAY:%s ========" % day, {'master': True, 'stdout': stdout_str})
X
xiexionghang 已提交
278 279 280 281 282 283
        if pass_id == self._train_pass.max_pass_num_day():
            context['status'] = 'end_day'
        else:
            context['status'] = 'train_pass'
    
    def end_day(self, context):
X
xiexionghang 已提交
284 285
        """R
        """
X
xiexionghang 已提交
286 287 288 289 290 291 292
        day = self._train_pass.date()
        pass_id = self._train_pass._pass_id
        xbox_base_key = int(time.time())
        context['status'] = 'begin_day'

        kagle_util.rank0_print("shrink table")
        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, 
X
xiexionghang 已提交
293
            {'master': True, 'log_format': 'shrink table done, cost %s sec'})
X
xiexionghang 已提交
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
        fleet.shrink_sparse_table()
        for executor in self._exector_context:
            self._exector_context[executor]['model'].shrink({
                'scope': self._exector_context[executor]['scope'],
                'decay': self.global_config['optimizer']['dense_decay_rate']
            })
        cost_printer.done()

        next_date = self._train_pass.date(delta_day=1)
        kagle_util.rank0_print("going to save xbox base model")
        self.save_xbox_model(next_date, 0, xbox_base_key, "")
        kagle_util.rank0_print("going to save batch model")
        self.save_model(next_date, 0, xbox_base_key)
        self._train_pass._base_key = xbox_base_key

    def train_pass(self, context):
X
xiexionghang 已提交
310 311
        """R
        """
X
xiexionghang 已提交
312 313 314 315 316
        stdout_str = ""
        day = self._train_pass.date()
        pass_id = self._train_pass._pass_id
        base_key = self._train_pass._base_key
        pass_time = self._train_pass._current_train_time.strftime("%Y%m%d%H%M")
X
xiexionghang 已提交
317
        self.print_log("    ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout': stdout_str})
X
xiexionghang 已提交
318 319
        train_begin_time = time.time()

X
xiexionghang 已提交
320 321
        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, \
            {'master': True, 'log_format': 'load into memory done, cost %s sec', 'stdout': stdout_str})
X
xiexionghang 已提交
322 323 324 325
        current_dataset = {}
        for name in self._dataset:
            current_dataset[name] = self._dataset[name].load_dataset({
                'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(),
X
xiexionghang 已提交
326
                'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass
X
xiexionghang 已提交
327 328 329 330 331
            })
        cost_printer.done()
                
        kagle_util.rank0_print("going to global shuffle")
        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, { 
X
xiexionghang 已提交
332 333
            'master': True, 'stdout': stdout_str,
            'log_format': 'global shuffle done, cost %s sec'}) 
X
xiexionghang 已提交
334 335 336 337 338 339 340 341 342 343 344
        for name in current_dataset:
            current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread'])
        cost_printer.done()
        # str(dataset.get_shuffle_data_size(fleet))

        if self.global_config['prefetch_data']:
            next_pass_time = (self._train_pass._current_train_time + 
                datetime.timedelta(minutes=self._train_pass._interval_per_pass)).strftime("%Y%m%d%H%M")
            for name in self._dataset:
                self._dataset[name].preload_dataset({
                    'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(),
X
xiexionghang 已提交
345
                    'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass
X
xiexionghang 已提交
346 347 348 349 350
                })
        
        pure_train_begin = time.time()
        for executor in self.global_config['executor']:
            self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str)
X
xiexionghang 已提交
351 352
        cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, \
            {'master': True, 'log_format': 'release_memory cost %s sec'}) 
X
xiexionghang 已提交
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
        for name in current_dataset:
            current_dataset[name].release_memory()
        pure_train_cost = time.time() - pure_train_begin
        
        if self._train_pass.is_checkpoint_pass(pass_id):
            self.save_model(day, pass_id, base_key)

        train_end_time = time.time()
        train_cost = train_end_time - train_begin_time
        other_cost = train_cost - pure_train_cost 
        log_str = "finished train day %s pass %s time cost:%s sec job time cost:" % (day, pass_id, train_cost)
        for executor in self._exector_context:
            log_str += '[' + executor + ':' + str(self._exector_context[executor]['cost']) + ']'
        log_str += '[other_cost:' + str(other_cost) + ']'         
        kagle_util.rank0_print(log_str)
        stdout_str += kagle_util.now_time_str() + log_str
        sys.stdout.write(stdout_str)
        stdout_str = ""
        if pass_id == self._train_pass.max_pass_num_day():
            context['status'] = 'end_day'
            return
        elif not self._train_pass.next():
            context['is_exit'] = True