diff --git a/python/paddle/fluid/tests/demo/pipeline_train.py b/python/paddle/fluid/tests/demo/pipeline_train.py index bebc0761bf0d64093c15ebd0cadec54e90d179e9..2f75908a160fd3c61c743dc407095d645737a534 100644 --- a/python/paddle/fluid/tests/demo/pipeline_train.py +++ b/python/paddle/fluid/tests/demo/pipeline_train.py @@ -31,478 +31,175 @@ logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("fluid") logger.setLevel(logging.INFO) -batch_size = 100 -ncards = 4 -nreaders = 4 -nscopes = 30 -learning_rate = 0.1 is_profile = False -sync_steps = 1 def parse_args(): - parser = argparse.ArgumentParser("gnn") - parser.add_argument( - '--train_path', - type=str, - default='./data/diginetica/train.txt', - help='dir of training data') - parser.add_argument( - '--config_path', - type=str, - default='./data/diginetica/config.txt', - help='dir of config') - parser.add_argument( - '--model_path', - type=str, - default='./saved_model', - help="path of model parameters") - parser.add_argument( - '--epoch_num', - type=int, - default=30, - help='number of epochs to train for') + parser = argparse.ArgumentParser("Resnet with pipelie parallel.") parser.add_argument( '--batch_size', type=int, default=100, help='input batch size') - parser.add_argument( - '--hidden_size', type=int, default=100, help='hidden state size') - parser.add_argument('--l2', type=float, default=1e-5, help='l2 penalty') parser.add_argument('--lr', type=float, default=0.001, help='learning rate') - parser.add_argument( - '--emb_lr_rate', type=float, default=0.5, help='learning rate') - parser.add_argument( - '--step', type=int, default=1, help='gnn propagation steps') - parser.add_argument( - '--lr_dc', type=float, default=0.1, help='learning rate decay rate') - parser.add_argument( - '--lr_dc_step', - type=int, - default=3, - help='the number of steps after which the learning rate decay') - parser.add_argument( - '--use_cuda', type=int, default=0, help='whether to use gpu') - parser.add_argument( - '--use_parallel', - type=int, - default=1, - help='whether to use parallel executor') return parser.parse_args() -def network(batch_size, items_num, hidden_size, step, rate): - stdv = 1.0 / math.sqrt(hidden_size) - - items = layers.data( - name="items", - shape=[batch_size, -1, 1], - dtype="int64", - append_batch_size=False) #[bs, uniq_max, 1] - seq_index = layers.data( - name="seq_index", - shape=[batch_size, -1], - dtype="int64", - append_batch_size=False) #[-1(seq_max)*batch_size, 1] - last_index = layers.data( - name="last_index", - shape=[batch_size], - dtype="int64", - append_batch_size=False) #[batch_size, 1] - adj_in = layers.data( - name="adj_in", - shape=[batch_size, -1, -1], - dtype="float32", - append_batch_size=False) - adj_out = layers.data( - name="adj_out", - shape=[batch_size, -1, -1], - dtype="float32", - append_batch_size=False) - mask = layers.data( - name="mask", - shape=[batch_size, -1, 1], - dtype="float32", - append_batch_size=False) - label = layers.data( - name="label", - shape=[batch_size, 1], - dtype="int64", - append_batch_size=False) - - items_emb = layers.embedding( - input=items, - is_sparse=True, - param_attr=fluid.ParamAttr( - name="emb", - learning_rate=rate, - initializer=fluid.initializer.Uniform( - low=-stdv, high=stdv)), - size=[items_num, hidden_size]) #[batch_size, uniq_max, h] - data_feed = [items, seq_index, last_index, adj_in, adj_out, mask, label] - - pre_state = items_emb - for i in range(step): - pre_state = layers.reshape( - x=pre_state, shape=[batch_size, -1, hidden_size]) - state_in = layers.fc( - input=pre_state, - name="state_in", - size=hidden_size, - act=None, - num_flatten_dims=2, - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( - low=-stdv, high=stdv)), - bias_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( - low=-stdv, high=stdv))) #[batch_size, uniq_max, h] - state_out = layers.fc( - input=pre_state, - name="state_out", - size=hidden_size, - act=None, - num_flatten_dims=2, - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( - low=-stdv, high=stdv)), - bias_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( - low=-stdv, high=stdv))) #[batch_size, uniq_max, h] - - state_adj_in = layers.matmul(adj_in, - state_in) #[batch_size, uniq_max, h] - state_adj_out = layers.matmul(adj_out, - state_out) #[batch_size, uniq_max, h] - - gru_input = layers.concat([state_adj_in, state_adj_out], axis=2) - - gru_input = layers.reshape(x=gru_input, shape=[-1, hidden_size * 2]) - gru_fc = layers.fc(input=gru_input, - name="gru_fc", - size=3 * hidden_size, - bias_attr=False) - pre_state, _, _ = fluid.layers.gru_unit( - input=gru_fc, - hidden=layers.reshape( - x=pre_state, shape=[-1, hidden_size]), - size=3 * hidden_size) - - final_state = pre_state - seq_index = layers.reshape(seq_index, shape=[-1]) - seq = layers.gather(final_state, seq_index) #[batch_size*-1(seq_max), h] - last = layers.gather(final_state, last_index) #[batch_size, h] - - seq = layers.reshape( - seq, shape=[batch_size, -1, hidden_size]) #[batch_size, -1(seq_max), h] - last = layers.reshape( - last, shape=[batch_size, hidden_size]) #[batch_size, h] - - seq_fc = layers.fc( - input=seq, - name="seq_fc", - size=hidden_size, - bias_attr=False, - act=None, - num_flatten_dims=2, - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( - low=-stdv, high=stdv))) #[batch_size, -1(seq_max), h] - last_fc = layers.fc(input=last, - name="last_fc", - size=hidden_size, - bias_attr=False, - act=None, - num_flatten_dims=1, - param_attr=fluid.ParamAttr( - initializer=fluid.initializer.Uniform( - low=-stdv, high=stdv))) #[bathc_size, h] - - seq_fc_t = layers.transpose( - seq_fc, perm=[1, 0, 2]) #[-1(seq_max), batch_size, h] - add = layers.elementwise_add(seq_fc_t, - last_fc) #[-1(seq_max), batch_size, h] - b = layers.create_parameter( - shape=[hidden_size], - dtype='float32', - default_initializer=fluid.initializer.Constant(value=0.0)) #[h] - add = layers.elementwise_add(add, b) #[-1(seq_max), batch_size, h] - - add_sigmoid = layers.sigmoid(add) #[-1(seq_max), batch_size, h] - add_sigmoid = layers.transpose( - add_sigmoid, perm=[1, 0, 2]) #[batch_size, -1(seq_max), h] - - weight = layers.fc(input=add_sigmoid, - name="weight_fc", - size=1, - act=None, - num_flatten_dims=2, - bias_attr=False, - param_attr=fluid.ParamAttr( - initializer=fluid.initializer.Uniform( - low=-stdv, high=stdv))) #[batch_size, -1, 1] - weight *= mask - weight_mask = layers.elementwise_mul(seq, weight, axis=0) - global_attention = layers.reduce_sum(weight_mask, dim=1) - - final_attention = layers.concat( - [global_attention, last], axis=1) #[batch_size, 2*h] - final_attention_fc = layers.fc( - input=final_attention, - name="fina_attention_fc", - size=hidden_size, - bias_attr=False, +def conv_bn_layer(input, num_filters, filter_size, stride=1, groups=1, + act=None): + conv = fluid.layers.conv2d( + input=input, + num_filters=num_filters, + filter_size=filter_size, + stride=stride, + padding=(filter_size - 1) // 2, + groups=groups, act=None, - param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( - low=-stdv, high=stdv))) #[batch_size, h] - - all_vocab = layers.create_global_var( - shape=[items_num - 1, 1], - value=0, - dtype="int64", - persistable=True, - name="all_vocab") - - all_emb = layers.embedding( - input=all_vocab, - is_sparse=True, - param_attr=fluid.ParamAttr( - name="emb", - learning_rate=rate, - initializer=fluid.initializer.Uniform( - low=-stdv, high=stdv)), - size=[items_num, hidden_size]) #[all_vocab, h] - - logits = layers.matmul( - x=final_attention_fc, y=all_emb, - transpose_y=True) #[batch_size, all_vocab] - softmax = layers.softmax_with_cross_entropy( - logits=logits, label=label) #[batch_size, 1] - loss = layers.reduce_mean(softmax) # [1] - #fluid.layers.Print(loss) - acc = layers.accuracy(input=logits, label=label, k=20) - return loss, acc, data_feed, [items_emb, all_emb] + bias_attr=False) + return fluid.layers.batch_norm( + input=conv, + act=act, ) + + +def shortcut(input, ch_out, stride, is_first): + ch_in = input.shape[1] + if ch_in != ch_out or stride != 1 or is_first == True: + return conv_bn_layer(input, ch_out, 1, stride) + else: + return input + + +def bottleneck_block(input, num_filters, stride): + conv0 = conv_bn_layer( + input=input, num_filters=num_filters, filter_size=1, act='relu') + conv1 = conv_bn_layer( + input=conv0, + num_filters=num_filters, + filter_size=3, + stride=stride, + act='relu') + conv2 = conv_bn_layer( + input=conv1, num_filters=num_filters * 4, filter_size=1, act=None) + + short = shortcut(input, num_filters * 4, stride, is_first=False) + + return fluid.layers.elementwise_add(x=short, y=conv2, act='relu') + + +def basic_block(input, num_filters, stride, is_first): + conv0 = conv_bn_layer( + input=input, + num_filters=num_filters, + filter_size=3, + act='relu', + stride=stride) + conv1 = conv_bn_layer( + input=conv0, num_filters=num_filters, filter_size=3, act=None) + short = shortcut(input, num_filters, stride, is_first) + return fluid.layers.elementwise_add(x=short, y=conv1, act='relu') + + +def network(input, layers=50, class_dim=1000): + supported_layers = [18, 34, 50, 101, 152] + assert layers in supported_layers + depth = None + if layers == 18: + depth = [2, 2, 2, 2] + elif layers == 34 or layers == 50: + depth = [3, 4, 6, 3] + elif layers == 101: + depth = [3, 4, 23, 3] + elif layers == 152: + depth = [3, 8, 36, 3] + num_filters = [64, 128, 256, 512] + with fluid.device_guard("gpu:0"): + conv = conv_bn_layer( + input=input, num_filters=64, filter_size=7, stride=2, act='relu') + conv = fluid.layers.pool2d( + input=conv, + pool_size=3, + pool_stride=2, + pool_padding=1, + pool_type='max') + if layers >= 50: + for block in range(len(depth)): + with fluid.device_guard("gpu:1"): + for i in range(depth[block]): + conv = bottleneck_block( + input=conv, + num_filters=num_filters[block], + stride=2 if i == 0 and block != 0 else 1) + + with fluid.device_guard("gpu:2"): + pool = fluid.layers.pool2d( + input=conv, pool_size=7, pool_type='avg', global_pooling=True) + stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0) + out = fluid.layers.fc( + input=pool, + size=class_dim, + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.Uniform(-stdv, stdv))) + else: + for block in range(len(depth)): + with fluid.device_guard("gpu:1"): + for i in range(depth[block]): + conv = basic_block( + input=conv, + num_filters=num_filters[block], + stride=2 if i == 0 and block != 0 else 1, + is_first=block == i == 0) + with fluid.device_guard("gpu:2"): + pool = fluid.layers.pool2d( + input=conv, pool_size=7, pool_type='avg', global_pooling=True) + stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0) + out = fluid.layers.fc( + input=pool, + size=class_dim, + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.Uniform(-stdv, stdv))) + return out def train(): args = parse_args() lr = args.lr - rate = args.emb_lr_rate - train_data_dir = "./gnn_data_new_8" - filelist = [ - os.path.join(train_data_dir, f) for f in os.listdir(train_data_dir) - if os.path.isfile(os.path.join(train_data_dir, f)) - ][:] - - items_num = read_config(args.config_path) - loss, acc, data_vars, cut_list = network(batch_size, items_num, - args.hidden_size, args.step, rate) - print("card: %d, thread: %d, lr: %f, lr_rate: %f, scope: %d, sync_step: %d" - % (ncards, nreaders, lr, rate, nscopes, sync_steps)) - - place = fluid.CPUPlace() + with fluid.device_guard("gpu:0"): + image = fluid.layers.data( + name="image", shape=[3, 224, 224], dtype="float32") + label = fluid.layers.data(name="label", shape=[1], dtype="int64") + data_loader = fluid.io.DataLoader.from_generator( + feed_list=[image, label], + capacity=64, + use_double_buffer=True, + iterable=False) + fc = build_network(image, layers=50) + + with fluid.device_guard("gpu:3"): + out, prob = fluid.layers.softmax_with_cross_entropy( + logits=fc, label=label, return_softmax=True) + loss = fluid.layers.mean(out) + acc_top1 = fluid.layers.accuracy(input=prob, label=label, k=1) + acc_top5 = fluid.layers.accuracy(input=prob, label=label, k=5) + + optimizer = fluid.optimizer.SGD(lr) + optimizer = fluid.optimizer.PipelineOptimizer(optimizer, num_microbatches=2) + optimizer.minimize(loss) + + def train_reader(): + for _ in range(4000): + img = np.random.random(size=[3, 224, 224]).astype('float32') + label = np.random.random(size=[1]).astype('int64') + yield img, label + + data_loader.set_sample_generator(train_reader, batch_size=args.batch_size) + + place = fluid.CUDAPlace(0) exe = fluid.Executor(place) - step_per_epoch = 750000 // batch_size - """ - opt = fluid.optimizer.SGD( - learning_rate=fluid.layers.exponential_decay( - learning_rate=args.lr, - decay_steps=step_per_epoch * 10, - decay_rate=args.lr_dc), - regularization=fluid.regularizer.L2DecayRegularizer(regularization_coeff=args.l2)) - """ - opt = fluid.optimizer.SGD(lr) - opt = fluid.optimizer.PipelineOptimizer( - opt, - cut_list=[cut_list, [loss, acc]], - place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()], - concurrency_list=[1, 1, nreaders], - queue_size=nscopes, - sync_steps=sync_steps) - opt.minimize(loss) - exe.run(fluid.default_startup_program()) - all_vocab = fluid.global_scope().var("all_vocab").get_tensor() - all_vocab.set( - np.arange(1, items_num).astype("int64").reshape((-1, 1)), place) - - logger.info("begin train") - - dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset") - dataset.set_use_var(data_vars) - dataset.set_batch_size(batch_size) - dataset.set_filelist(filelist) - - total_time = [] - start_time = time.time() - loss_sum = 0.0 - acc_sum = 0.0 - global_step = 0 - - for i in range(25): - logger.info("begin epoch %d" % (i)) - epoch_sum = [] - random.shuffle(filelist) - dataset.set_filelist(filelist) - exe.train_from_dataset( - fluid.default_main_program(), - dataset, - thread=ncards, - debug=is_profile, - fetch_list=[loss, acc], - fetch_info=["loss", "acc"], - print_period=1) - model_path = args.model_path - model_path += "_" + str(lr) + "_" + str(rate) - save_dir = model_path + "/epoch_" + str(i) - fetch_vars = [loss, acc] - feed_list = [ - "items", "seq_index", "last_index", "adj_in", "adj_out", "mask", - "label" - ] - fluid.io.save_inference_model(save_dir, feed_list, fetch_vars, exe) - - -class Data(): - def __init__(self, path, shuffle=False): - data = pickle.load(open(path, 'rb')) - self.shuffle = shuffle - self.length = len(data[0]) - self.input = list(zip(data[0], data[1])) - - def make_data(self, cur_batch, batch_size): - cur_batch = [list(e) for e in cur_batch] - max_seq_len = 0 - for e in cur_batch: - max_seq_len = max(max_seq_len, len(e[0])) - last_id = [] - for e in cur_batch: - last_id.append(len(e[0]) - 1) - e[0] += [0] * (max_seq_len - len(e[0])) - - max_uniq_len = 0 - for e in cur_batch: - max_uniq_len = max(max_uniq_len, len(np.unique(e[0]))) - - items, adj_in, adj_out, seq_index, last_index = [], [], [], [], [] - mask, label = [], [] - - id = 0 - for e in cur_batch: - node = np.unique(e[0]) - items.append(node.tolist() + (max_uniq_len - len(node)) * [0]) - adj = np.zeros((max_uniq_len, max_uniq_len)) - - for i in np.arange(len(e[0]) - 1): - if e[0][i + 1] == 0: - break - u = np.where(node == e[0][i])[0][0] - v = np.where(node == e[0][i + 1])[0][0] - adj[u][v] = 1 - - u_deg_in = np.sum(adj, 0) - u_deg_in[np.where(u_deg_in == 0)] = 1 - adj_in.append(np.divide(adj, u_deg_in).transpose()) - - u_deg_out = np.sum(adj, 1) - u_deg_out[np.where(u_deg_out == 0)] = 1 - adj_out.append(np.divide(adj.transpose(), u_deg_out).transpose()) - - seq_index.append( - [np.where(node == i)[0][0] + id * max_uniq_len for i in e[0]]) - last_index.append( - np.where(node == e[0][last_id[id]])[0][0] + id * max_uniq_len) - label.append(e[1] - 1) - mask.append([[1] * (last_id[id] + 1) + [0] * - (max_seq_len - last_id[id] - 1)]) - id += 1 - - items = np.array(items).astype("uint64").reshape((batch_size, -1, 1)) - seq_index = np.array(seq_index).astype("uint64").reshape( - (batch_size, -1)) - last_index = np.array(last_index).astype("uint64").reshape( - (batch_size, 1)) - adj_in = np.array(adj_in).astype("float32").reshape( - (batch_size, max_uniq_len, max_uniq_len)) - adj_out = np.array(adj_out).astype("float32").reshape( - (batch_size, max_uniq_len, max_uniq_len)) - mask = np.array(mask).astype("float32").reshape((batch_size, -1, 1)) - label = np.array(label).astype("uint64").reshape((batch_size, 1)) - return list( - zip(items, seq_index, last_index, adj_in, adj_out, mask, label)) - - def reader(self, batch_size, batch_group_size, train=True): - if self.shuffle: - random.shuffle(self.input) - group_remain = self.length % batch_group_size - for bg_id in range(0, self.length - group_remain, batch_group_size): - cur_bg = copy.deepcopy(self.input[bg_id:bg_id + batch_group_size]) - if train: - cur_bg = sorted(cur_bg, key=lambda x: len(x[0]), reverse=True) - for i in range(0, batch_group_size, batch_size): - cur_batch = cur_bg[i:i + batch_size] - yield self.make_data(cur_batch, batch_size) - - #deal with the remaining, discard at most batch_size data - if group_remain < batch_size: - return - remain_data = copy.deepcopy(self.input[-group_remain:]) - if train: - remain_data = sorted( - remain_data, key=lambda x: len(x[0]), reverse=True) - for i in range(0, batch_group_size, batch_size): - if i + batch_size <= len(remain_data): - cur_batch = remain_data[i:i + batch_size] - yield self.make_data(cur_batch, batch_size) - - -def read_config(path): - with open(path, "r") as fin: - item_num = int(fin.readline()) - return item_num - - -induce_map = {0: [0], 1: [0], 2: [], 3: [0, 1], 4: [0, 1], 5: [0], 6: []} - - -def binary_print(slot, fout, index): - shape_array = slot.shape - num = 1 - for e in shape_array: - num *= e - num += len(induce_map[index]) - num = np.uint16(num) - num.tofile(fout) - for e in induce_map[index]: - tmp_shape = np.uint64(shape_array[e]) - tmp_shape.tofile(fout) - slot.tofile(fout) - - -def make_binary_data(): - data_reader = Data('./data/diginetica/train.txt', True) - index = 0 - id = -1 - filename = None - fout = None - binary = True - for data in data_reader.reader(batch_size, 20 * batch_size, True): - if index % (batch_size * 900) == 0: - id += 1 - if not binary: - filename = "./gnn_data_text/" + str(id) - else: - filename = "./gnn_data_new_8/" + str(id) - print("filename: " + filename) - if fout: - fout.close() - fout = open(filename, "wb" if binary else "w") - - for ins in data: - for i, slot in enumerate(ins): - if binary: - binary_print(slot, fout, i) - else: - text_print(slot, fout, i) - index += batch_size + data_loader.start() + logger.info("begin training...") + exe.train_from_dataset(fluid.default_main_program(), debug=is_profile) if __name__ == "__main__": - make_binary_data() train()