diff --git a/ogb_examples/nodeproppred/unimp/main_arxiv.py b/ogb_examples/nodeproppred/unimp/main_arxiv.py index b82742d72457783d2402826e412d24b1cd9fa2fe..4a13fd68c16215e96b21d46451d11dbc3b68ac5e 100644 --- a/ogb_examples/nodeproppred/unimp/main_arxiv.py +++ b/ogb_examples/nodeproppred/unimp/main_arxiv.py @@ -20,7 +20,7 @@ evaluator = Evaluator(name='ogbn-arxiv') def get_config(): parser = argparse.ArgumentParser() - ## 基本模型参数 + ## model_arg model_group=parser.add_argument_group('model_base_arg') model_group.add_argument('--num_layers', default=3, type=int) model_group.add_argument('--hidden_size', default=128, type=int) @@ -28,7 +28,7 @@ def get_config(): model_group.add_argument('--dropout', default=0.3, type=float) model_group.add_argument('--attn_dropout', default=0, type=float) - ## label embedding模型参数 + ## label_embed_arg embed_group=parser.add_argument_group('embed_arg') embed_group.add_argument('--use_label_e', action='store_true') embed_group.add_argument('--label_rate', default=0.625, type=float) @@ -81,17 +81,17 @@ def eval_test(parser, program, model, test_exe, graph, y_true, split_idx): def train_loop(parser, start_program, main_program, test_program, model, graph, label, split_idx, exe, run_id, wf=None): - #启动上文构建的训练器 + #build up training program exe.run(start_program) - max_acc=0 # 最佳test_acc - max_step=0 # 最佳test_acc 对应step - max_val_acc=0 # 最佳val_acc - max_cor_acc=0 # 最佳val_acc对应test_acc - max_cor_step=0 # 最佳val_acc对应step - #训练循环 + max_acc=0 # best test_acc + max_step=0 # step for best test_acc + max_val_acc=0 # best val_acc + max_cor_acc=0 # test_acc for best val_acc + max_cor_step=0 # step for best val_acc + #training loop for epoch_id in tqdm(range(parser.epochs)): - #运行训练器 + #start training if parser.use_label_e: feed_dict=model.gw.to_feed(graph) @@ -115,7 +115,7 @@ def train_loop(parser, start_program, main_program, test_program, # print(loss[1][0]) loss = loss[0] - #测试结果 + #eval result result = eval_test(parser, test_program, model, exe, graph, label, split_idx) train_acc, valid_acc, test_acc = result @@ -191,11 +191,7 @@ if __name__ == '__main__': test_prog=train_prog.clone(for_test=True) model.train_program() - -# ave_loss = train_program(pred_output)#训练程序 -# lr, global_step= linear_warmup_decay(parser.lr, parser.epochs*0.1, parser.epochs) -# adam_optimizer = optimizer_func(lr)#训练优化函数 - adam_optimizer = optimizer_func(parser.lr)#训练优化函数 + adam_optimizer = optimizer_func(parser.lr)#optimizer adam_optimizer.minimize(model.avg_cost) exe = F.Executor(place) @@ -206,4 +202,4 @@ if __name__ == '__main__': total_test_acc+=train_loop(parser, startup_prog, train_prog, test_prog, model, graph, label, split_idx, exe, run_i, wf) wf.write(f'average: {100 * (total_test_acc/parser.runs):.2f}%') - wf.close() \ No newline at end of file + wf.close() diff --git a/ogb_examples/nodeproppred/unimp/main_product.py b/ogb_examples/nodeproppred/unimp/main_product.py index 3a3141a6a6e01dd2f9325bf908ba802174f40bb7..d9780e3d8f24f9d974b89cc575523a3ab0508530 100644 --- a/ogb_examples/nodeproppred/unimp/main_product.py +++ b/ogb_examples/nodeproppred/unimp/main_product.py @@ -22,14 +22,14 @@ evaluator = Evaluator(name='ogbn-products') def get_config(): parser = argparse.ArgumentParser() - ## 采样参数 + ## data_sampling_arg data_group= parser.add_argument_group('data_arg') data_group.add_argument('--batch_size', default=1500, type=int) data_group.add_argument('--num_workers', default=12, type=int) data_group.add_argument('--sizes', default=[10, 10, 10], type=int, nargs='+' ) data_group.add_argument('--buf_size', default=1000, type=int) - ## 基本模型参数 + ## model_arg model_group=parser.add_argument_group('model_base_arg') model_group.add_argument('--num_layers', default=3, type=int) model_group.add_argument('--hidden_size', default=128, type=int) @@ -37,7 +37,7 @@ def get_config(): model_group.add_argument('--dropout', default=0.3, type=float) model_group.add_argument('--attn_dropout', default=0, type=float) - ## label embedding模型参数 + ## label_embed_arg embed_group=parser.add_argument_group('embed_arg') embed_group.add_argument('--use_label_e', action='store_true') embed_group.add_argument('--label_rate', default=0.625, type=float) @@ -113,19 +113,19 @@ def eval_test(parser, test_p_list, model, test_exe, dataset, split_idx): def train_loop(parser, start_program, main_program, test_p_list, model, feat_init, place, dataset, split_idx, exe, run_id, wf=None): - #启动上文构建的训练器 + #build up training program exe.run(start_program) feat_init(place) - max_acc=0 # 最佳test_acc - max_step=0 # 最佳test_acc 对应step - max_val_acc=0 # 最佳val_acc - max_cor_acc=0 # 最佳val_acc对应test_acc - max_cor_step=0 # 最佳val_acc对应step - #训练循环 + max_acc=0 # best test_acc + max_step=0 # step for best test_acc + max_val_acc=0 # best val_acc + max_cor_acc=0 # test_acc for best val_acc + max_cor_step=0 # step for best val_acc + #training loop for epoch_id in range(parser.epochs): - #运行训练器 + #start training if parser.use_label_e: train_idx_temp=copy.deepcopy(split_idx['train']) @@ -158,8 +158,7 @@ def train_loop(parser, start_program, main_program, test_p_list, print('acc: ', (acc_num/unlabel_idx.shape[0])*100) - #测试结果 -# total=0.0 + #eval result if (epoch_id+1)>=50 and (epoch_id+1)%10==0: result = eval_test(parser, test_p_list, model, exe, dataset, split_idx) train_acc, valid_acc, test_acc = result @@ -242,17 +241,14 @@ if __name__ == '__main__': # test_prog=train_prog.clone(for_test=True) model.train_program() -# ave_loss = train_program(pred_output)#训练程序 -# lr, global_step= linear_warmup_decay(0.01, 50, 500) -# adam_optimizer = optimizer_func(lr)#训练优化函数 - adam_optimizer = optimizer_func(parser.lr)#训练优化函数 + adam_optimizer = optimizer_func(parser.lr)#optimizer adam_optimizer.minimize(model.avg_cost) test_p_list=[] with F.unique_name.guard(): - ## input层 + ## build up eval program test_p=F.Program() with F.program_guard(test_p, ): gw_test=pgl.graph_wrapper.GraphWrapper( @@ -281,7 +277,7 @@ if __name__ == '__main__': with F.program_guard(test_p, ): gw_test=pgl.graph_wrapper.GraphWrapper( name="product_"+str(0)) -# feature_batch=model.get_batch_feature(label_feature, test=True) # 把图在CPU存起 +# feature_batch=model.get_batch_feature(label_feature, test=True) feature_batch = F.data( 'hidden_node_feat', shape=[None, model.num_heads*model.hidden_size], dtype='float32') @@ -322,4 +318,4 @@ if __name__ == '__main__': total_test_acc+=train_loop(parser, startup_prog, train_prog, test_p_list, model, feat_init, place, dataset, split_idx, exe, run_i, wf) wf.write(f'average: {100 * (total_test_acc/parser.runs):.2f}%') - wf.close() \ No newline at end of file + wf.close() diff --git a/ogb_examples/nodeproppred/unimp/main_protein.py b/ogb_examples/nodeproppred/unimp/main_protein.py index 1ad8073112d69844e9bed4a8ae5d8fa54f7e8e36..970314d3587dbdc6cfa41c43aa8942c1f4698577 100644 --- a/ogb_examples/nodeproppred/unimp/main_protein.py +++ b/ogb_examples/nodeproppred/unimp/main_protein.py @@ -23,7 +23,7 @@ evaluator = Evaluator(name='ogbn-proteins') def get_config(): parser = argparse.ArgumentParser() - ## 基本模型参数 + ## model_arg model_group=parser.add_argument_group('model_base_arg') model_group.add_argument('--num_layers', default=7, type=int) model_group.add_argument('--hidden_size', default=64, type=int) @@ -31,7 +31,7 @@ def get_config(): model_group.add_argument('--dropout', default=0.1, type=float) model_group.add_argument('--attn_dropout', default=0, type=float) - ## label embedding模型参数 + ## label_embed_arg embed_group=parser.add_argument_group('embed_arg') embed_group.add_argument('--use_label_e', action='store_true') embed_group.add_argument('--label_rate', default=0.5, type=float) @@ -90,15 +90,16 @@ def eval_test(parser, program, model, test_exe, graph, y_true, split_idx): def train_loop(parser, start_program, main_program, test_program, model, graph, label, split_idx, exe, run_id, wf=None): - #启动上文构建的训练器 + #build up training program exe.run(start_program) - max_acc=0 # 最佳test_acc - max_step=0 # 最佳test_acc 对应step - max_val_acc=0 # 最佳val_acc - max_cor_acc=0 # 最佳val_acc对应test_acc - max_cor_step=0 # 最佳val_acc对应step - #训练循环 + max_acc=0 # best test_acc + max_step=0 # step for best test_acc + max_val_acc=0 # best val_acc + max_cor_acc=0 # test_acc for best val_acc + max_cor_step=0 # step for best val_acc + #training loop + graph.node_feat["label"] = label graph.node_feat["nid"] = np.arange(0, graph.num_nodes) @@ -112,7 +113,7 @@ def train_loop(parser, start_program, main_program, test_program, for epoch_id in tqdm(range(parser.epochs)): for subgraph in random_partition(num_clusters=9, graph=graph, shuffle=True): - #运行训练器 + #start training if parser.use_label_e: feed_dict = model.gw.to_feed(subgraph) sub_idx = set(subgraph.node_feat["nid"]) @@ -139,7 +140,7 @@ def train_loop(parser, start_program, main_program, test_program, fetch_list=[model.avg_cost]) loss = loss[0] - #测试结果 + #eval result if (epoch_id+1) > parser.epochs*0.9: result = eval_test(parser, test_program, model, exe, graph, label, split_idx) train_acc, valid_acc, test_acc = result @@ -221,7 +222,7 @@ if __name__ == '__main__': model.train_program() - adam_optimizer = optimizer_func(parser.lr)#训练优化函数 + adam_optimizer = optimizer_func(parser.lr)#optimizer adam_optimizer.minimize(model.avg_cost) exe = F.Executor(place) diff --git a/pgl/utils/mp_reader.py b/pgl/utils/mp_reader.py index a7962830031c3aeede2b780104dacf936d62a120..55c8b640fc5d460a24e87baeae66dfe1f537bd93 100644 --- a/pgl/utils/mp_reader.py +++ b/pgl/utils/mp_reader.py @@ -27,24 +27,34 @@ import time import paddle.fluid as fluid from multiprocessing import Queue import threading +from collections import namedtuple +_np_serialized_data = namedtuple("_np_serialized_data", ["value", "shape", "dtype"]) + def serialize_data(data): """serialize_data""" if data is None: return None return numpy_serialize_data(data) #, ensure_ascii=False) +def index_iter(data): + """return indexing iter""" + if isinstance(data, list): + return range(len(data)) + elif isinstance(data, dict): + return data.keys() + def numpy_serialize_data(data): """serialize_data""" - ret_data = {} - for key in data: - if isinstance(data[key], np.ndarray): - ret_data[key] = (data[key].tobytes(), list(data[key].shape), - "%s" % data[key].dtype) - else: - ret_data[key] = data[key] + ret_data = copy.deepcopy(data) + + if isinstance(ret_data, (dict, list)): + for key in index_iter(ret_data): + if isinstance(ret_data[key], np.ndarray): + ret_data[key] = _np_serialized_data(value=ret_data[key].tobytes(), + shape=list(ret_data[key].shape), dtype="%s" % ret_data[key].dtype) return ret_data @@ -52,11 +62,12 @@ def numpy_deserialize_data(data): """deserialize_data""" if data is None: return None - for key in data: - if isinstance(data[key], tuple): - value = np.frombuffer( - data[key][0], dtype=data[key][2]).reshape(data[key][1]) - data[key] = value + + if isinstance(data, (dict, list)): + for key in index_iter(data): + if isinstance(data[key], _np_serialized_data): + data[key] = np.frombuffer(buffer=data[key].value, + dtype=data[key].dtype).reshape(data[key].shape) return data