diff --git a/PaddleRec/gnn/README.md b/PaddleRec/gnn/README.md index 7261d8e9b9caf3ddf88358ecedf94df2de4b3acd..1996d9e08a0b2ffb8b3e45c8d6b3ce651ab9c719 100644 --- a/PaddleRec/gnn/README.md +++ b/PaddleRec/gnn/README.md @@ -76,11 +76,21 @@ gpu 单机单卡训练 CUDA_VISIBLE_DEVICES=1 python -u train.py --use_cuda 1 > log.txt 2>&1 & ``` +gpu 单机多卡训练 +``` bash +CUDA_VISIBLE_DEVICES=0,1,2,3 python -u train.py --use_cuda 1 > log.txt 2>&1 & +``` + cpu 单机训练 ``` bash CPU_NUM=1 python -u train.py --use_cuda 0 > log.txt 2>&1 & ``` +cpu 单机多CPU训练 +``` bash +CPU_NUM=5 python -u train.py --use_cuda 0 > log.txt 2>&1 & +``` + 值得注意的是上述单卡训练可以通过加--use_parallel 1参数使用Parallel Executor来进行加速。 diff --git a/PaddleRec/gnn/infer.py b/PaddleRec/gnn/infer.py index ef007712dcd1b100138b5d887d3c6126d6761ead..ee196702052b6bc89eaf03a9e2640e81f4da54b6 100644 --- a/PaddleRec/gnn/infer.py +++ b/PaddleRec/gnn/infer.py @@ -59,7 +59,7 @@ def infer(epoch_num): loss_sum = 0.0 acc_sum = 0.0 count = 0 - for data in test_data.reader(batch_size, batch_size, False): + for data in test_data.reader(batch_size, batch_size, False)(): res = exe.run(infer_program, feed=feeder.feed(data), fetch_list=fetch_targets) diff --git a/PaddleRec/gnn/network.py b/PaddleRec/gnn/network.py index 1cd1af9243603a9fff0554e9f2c22734e1bee217..44d53f943aef18ba58b6176c43e2bf656214ef65 100644 --- a/PaddleRec/gnn/network.py +++ b/PaddleRec/gnn/network.py @@ -58,6 +58,12 @@ def network(batch_size, items_num, hidden_size, step): dtype="int64", append_batch_size=False) + datas = [items, seq_index, last_index, adj_in, adj_out, mask, label] + py_reader = fluid.layers.create_py_reader_by_data( + capacity=256, feed_list=datas, name='py_reader', use_double_buffer=True) + feed_datas = fluid.layers.read_file(py_reader) + items, seq_index, last_index, adj_in, adj_out, mask, label = feed_datas + items_emb = layers.embedding( input=items, param_attr=fluid.ParamAttr( @@ -171,7 +177,7 @@ def network(batch_size, items_num, hidden_size, step): [global_attention, last], axis=1) #[batch_size, 2*h] final_attention_fc = layers.fc( input=final_attention, - name="fina_attention_fc", + name="final_attention_fc", size=hidden_size, bias_attr=False, act=None, @@ -200,4 +206,4 @@ def network(batch_size, items_num, hidden_size, step): logits=logits, label=label) #[batch_size, 1] loss = layers.reduce_mean(softmax) # [1] acc = layers.accuracy(input=logits, label=label, k=20) - return loss, acc + return loss, acc, py_reader, feed_datas diff --git a/PaddleRec/gnn/reader.py b/PaddleRec/gnn/reader.py index 9b5793bc9b26ec6587b2999c98f72d5ca3bdc98e..82d56e59cf90ae8fb27aefb1c7ffc04df05fa02a 100644 --- a/PaddleRec/gnn/reader.py +++ b/PaddleRec/gnn/reader.py @@ -76,7 +76,7 @@ class Data(): seq_index = np.array(seq_index).astype("int32").reshape( (batch_size, -1)) last_index = np.array(last_index).astype("int32").reshape( - (batch_size, 1)) + (batch_size)) adj_in = np.array(adj_in).astype("float32").reshape( (batch_size, max_uniq_len, max_uniq_len)) adj_out = np.array(adj_out).astype("float32").reshape( @@ -86,28 +86,30 @@ class Data(): return zip(items, seq_index, last_index, adj_in, adj_out, mask, label) def reader(self, batch_size, batch_group_size, train=True): - if self.shuffle: - random.shuffle(self.input) - group_remain = self.length % batch_group_size - for bg_id in range(0, self.length - group_remain, batch_group_size): - cur_bg = copy.deepcopy(self.input[bg_id:bg_id + batch_group_size]) + def _reader(): + if self.shuffle: + random.shuffle(self.input) + group_remain = self.length % batch_group_size + for bg_id in range(0, self.length - group_remain, batch_group_size): + cur_bg = copy.deepcopy(self.input[bg_id:bg_id + batch_group_size]) + if train: + cur_bg = sorted(cur_bg, key=lambda x: len(x[0]), reverse=True) + for i in range(0, batch_group_size, batch_size): + cur_batch = cur_bg[i:i + batch_size] + yield self.make_data(cur_batch, batch_size) + + #deal with the remaining, discard at most batch_size data + if group_remain < batch_size: + return + remain_data = copy.deepcopy(self.input[-group_remain:]) if train: - cur_bg = sorted(cur_bg, key=lambda x: len(x[0]), reverse=True) + remain_data = sorted( + remain_data, key=lambda x: len(x[0]), reverse=True) for i in range(0, batch_group_size, batch_size): - cur_batch = cur_bg[i:i + batch_size] - yield self.make_data(cur_batch, batch_size) - - #deal with the remaining, discard at most batch_size data - if group_remain < batch_size: - return - remain_data = copy.deepcopy(self.input[-group_remain:]) - if train: - remain_data = sorted( - remain_data, key=lambda x: len(x[0]), reverse=True) - for i in range(0, batch_group_size, batch_size): - if i + batch_size <= len(remain_data): - cur_batch = remain_data[i:i + batch_size] - yield self.make_data(cur_batch, batch_size) + if i + batch_size <= len(remain_data): + cur_batch = remain_data[i:i + batch_size] + yield self.make_data(cur_batch, batch_size) + return _reader def read_config(path): diff --git a/PaddleRec/gnn/train.py b/PaddleRec/gnn/train.py index c244f84a586f91a231f5edf507ad0ca8439a7647..1da8dc0b3976495062eed0c730fee1ca7e6c8c16 100644 --- a/PaddleRec/gnn/train.py +++ b/PaddleRec/gnn/train.py @@ -71,7 +71,7 @@ def train(): batch_size = args.batch_size items_num = reader.read_config(args.config_path) - loss, acc = network.network(batch_size, items_num, args.hidden_size, + loss, acc, py_reader, feed_datas = network.network(batch_size, items_num, args.hidden_size, args.step) data_reader = reader.Data(args.train_path, True) @@ -98,10 +98,7 @@ def train(): all_vocab.set( np.arange(1, items_num).astype("int64").reshape((-1, 1)), place) - feed_list = [ - "items", "seq_index", "last_index", "adj_in", "adj_out", "mask", "label" - ] - feeder = fluid.DataFeeder(feed_list=feed_list, place=place) + feed_list = [e.name for e in feed_datas] if use_parallel: train_exe = fluid.ParallelExecutor( @@ -118,23 +115,27 @@ def train(): acc_sum = 0.0 global_step = 0 PRINT_STEP = 500 + py_reader.decorate_paddle_reader(data_reader.reader(batch_size, batch_size * 20, True)) for i in range(args.epoch_num): epoch_sum = [] - for data in data_reader.reader(batch_size, batch_size * 20, True): - res = train_exe.run(feed=feeder.feed(data), - fetch_list=[loss.name, acc.name]) - loss_sum += res[0] - acc_sum += res[1] - epoch_sum.append(res[0]) - global_step += 1 - if global_step % PRINT_STEP == 0: - ce_info.append([loss_sum / PRINT_STEP, acc_sum / PRINT_STEP]) - total_time.append(time.time() - start_time) - logger.info("global_step: %d, loss: %.4lf, train_acc: %.4lf" % ( - global_step, loss_sum / PRINT_STEP, acc_sum / PRINT_STEP)) - loss_sum = 0.0 - acc_sum = 0.0 - start_time = time.time() + py_reader.start() + try: + while True: + res = train_exe.run(fetch_list=[loss.name, acc.name]) + loss_sum += res[0].mean() + acc_sum += res[1].mean() + epoch_sum.append(res[0].mean()) + global_step += 1 + if global_step % PRINT_STEP == 0: + ce_info.append([loss_sum / PRINT_STEP, acc_sum / PRINT_STEP]) + total_time.append(time.time() - start_time) + logger.info("global_step: %d, loss: %.4lf, train_acc: %.4lf" % ( + global_step, loss_sum / PRINT_STEP, acc_sum / PRINT_STEP)) + loss_sum = 0.0 + acc_sum = 0.0 + start_time = time.time() + except fluid.core.EOFException: + py_reader.reset() logger.info("epoch loss: %.4lf" % (np.mean(epoch_sum))) save_dir = args.model_path + "/epoch_" + str(i) fetch_vars = [loss, acc]