未验证 提交 f132c2f4 编写于 作者: L lilong12 提交者: GitHub

Modify pipeline demo to use the new pipelineoptimizer api. (#25374)

* modify pipeline demo, test=develop
上级 513852e2
......@@ -31,478 +31,175 @@ logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
batch_size = 100
ncards = 4
nreaders = 4
nscopes = 30
learning_rate = 0.1
is_profile = False
sync_steps = 1
def parse_args():
parser = argparse.ArgumentParser("gnn")
parser.add_argument(
'--train_path',
type=str,
default='./data/diginetica/train.txt',
help='dir of training data')
parser.add_argument(
'--config_path',
type=str,
default='./data/diginetica/config.txt',
help='dir of config')
parser.add_argument(
'--model_path',
type=str,
default='./saved_model',
help="path of model parameters")
parser.add_argument(
'--epoch_num',
type=int,
default=30,
help='number of epochs to train for')
parser = argparse.ArgumentParser("Resnet with pipelie parallel.")
parser.add_argument(
'--batch_size', type=int, default=100, help='input batch size')
parser.add_argument(
'--hidden_size', type=int, default=100, help='hidden state size')
parser.add_argument('--l2', type=float, default=1e-5, help='l2 penalty')
parser.add_argument('--lr', type=float, default=0.001, help='learning rate')
parser.add_argument(
'--emb_lr_rate', type=float, default=0.5, help='learning rate')
parser.add_argument(
'--step', type=int, default=1, help='gnn propagation steps')
parser.add_argument(
'--lr_dc', type=float, default=0.1, help='learning rate decay rate')
parser.add_argument(
'--lr_dc_step',
type=int,
default=3,
help='the number of steps after which the learning rate decay')
parser.add_argument(
'--use_cuda', type=int, default=0, help='whether to use gpu')
parser.add_argument(
'--use_parallel',
type=int,
default=1,
help='whether to use parallel executor')
return parser.parse_args()
def network(batch_size, items_num, hidden_size, step, rate):
stdv = 1.0 / math.sqrt(hidden_size)
items = layers.data(
name="items",
shape=[batch_size, -1, 1],
dtype="int64",
append_batch_size=False) #[bs, uniq_max, 1]
seq_index = layers.data(
name="seq_index",
shape=[batch_size, -1],
dtype="int64",
append_batch_size=False) #[-1(seq_max)*batch_size, 1]
last_index = layers.data(
name="last_index",
shape=[batch_size],
dtype="int64",
append_batch_size=False) #[batch_size, 1]
adj_in = layers.data(
name="adj_in",
shape=[batch_size, -1, -1],
dtype="float32",
append_batch_size=False)
adj_out = layers.data(
name="adj_out",
shape=[batch_size, -1, -1],
dtype="float32",
append_batch_size=False)
mask = layers.data(
name="mask",
shape=[batch_size, -1, 1],
dtype="float32",
append_batch_size=False)
label = layers.data(
name="label",
shape=[batch_size, 1],
dtype="int64",
append_batch_size=False)
items_emb = layers.embedding(
input=items,
is_sparse=True,
param_attr=fluid.ParamAttr(
name="emb",
learning_rate=rate,
initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv)),
size=[items_num, hidden_size]) #[batch_size, uniq_max, h]
data_feed = [items, seq_index, last_index, adj_in, adj_out, mask, label]
pre_state = items_emb
for i in range(step):
pre_state = layers.reshape(
x=pre_state, shape=[batch_size, -1, hidden_size])
state_in = layers.fc(
input=pre_state,
name="state_in",
size=hidden_size,
act=None,
num_flatten_dims=2,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv)),
bias_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv))) #[batch_size, uniq_max, h]
state_out = layers.fc(
input=pre_state,
name="state_out",
size=hidden_size,
act=None,
num_flatten_dims=2,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv)),
bias_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv))) #[batch_size, uniq_max, h]
state_adj_in = layers.matmul(adj_in,
state_in) #[batch_size, uniq_max, h]
state_adj_out = layers.matmul(adj_out,
state_out) #[batch_size, uniq_max, h]
gru_input = layers.concat([state_adj_in, state_adj_out], axis=2)
gru_input = layers.reshape(x=gru_input, shape=[-1, hidden_size * 2])
gru_fc = layers.fc(input=gru_input,
name="gru_fc",
size=3 * hidden_size,
bias_attr=False)
pre_state, _, _ = fluid.layers.gru_unit(
input=gru_fc,
hidden=layers.reshape(
x=pre_state, shape=[-1, hidden_size]),
size=3 * hidden_size)
final_state = pre_state
seq_index = layers.reshape(seq_index, shape=[-1])
seq = layers.gather(final_state, seq_index) #[batch_size*-1(seq_max), h]
last = layers.gather(final_state, last_index) #[batch_size, h]
seq = layers.reshape(
seq, shape=[batch_size, -1, hidden_size]) #[batch_size, -1(seq_max), h]
last = layers.reshape(
last, shape=[batch_size, hidden_size]) #[batch_size, h]
seq_fc = layers.fc(
input=seq,
name="seq_fc",
size=hidden_size,
bias_attr=False,
act=None,
num_flatten_dims=2,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv))) #[batch_size, -1(seq_max), h]
last_fc = layers.fc(input=last,
name="last_fc",
size=hidden_size,
bias_attr=False,
act=None,
num_flatten_dims=1,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv))) #[bathc_size, h]
seq_fc_t = layers.transpose(
seq_fc, perm=[1, 0, 2]) #[-1(seq_max), batch_size, h]
add = layers.elementwise_add(seq_fc_t,
last_fc) #[-1(seq_max), batch_size, h]
b = layers.create_parameter(
shape=[hidden_size],
dtype='float32',
default_initializer=fluid.initializer.Constant(value=0.0)) #[h]
add = layers.elementwise_add(add, b) #[-1(seq_max), batch_size, h]
add_sigmoid = layers.sigmoid(add) #[-1(seq_max), batch_size, h]
add_sigmoid = layers.transpose(
add_sigmoid, perm=[1, 0, 2]) #[batch_size, -1(seq_max), h]
weight = layers.fc(input=add_sigmoid,
name="weight_fc",
size=1,
act=None,
num_flatten_dims=2,
bias_attr=False,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv))) #[batch_size, -1, 1]
weight *= mask
weight_mask = layers.elementwise_mul(seq, weight, axis=0)
global_attention = layers.reduce_sum(weight_mask, dim=1)
final_attention = layers.concat(
[global_attention, last], axis=1) #[batch_size, 2*h]
final_attention_fc = layers.fc(
input=final_attention,
name="fina_attention_fc",
size=hidden_size,
bias_attr=False,
def conv_bn_layer(input, num_filters, filter_size, stride=1, groups=1,
act=None):
conv = fluid.layers.conv2d(
input=input,
num_filters=num_filters,
filter_size=filter_size,
stride=stride,
padding=(filter_size - 1) // 2,
groups=groups,
act=None,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv))) #[batch_size, h]
all_vocab = layers.create_global_var(
shape=[items_num - 1, 1],
value=0,
dtype="int64",
persistable=True,
name="all_vocab")
all_emb = layers.embedding(
input=all_vocab,
is_sparse=True,
param_attr=fluid.ParamAttr(
name="emb",
learning_rate=rate,
initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv)),
size=[items_num, hidden_size]) #[all_vocab, h]
logits = layers.matmul(
x=final_attention_fc, y=all_emb,
transpose_y=True) #[batch_size, all_vocab]
softmax = layers.softmax_with_cross_entropy(
logits=logits, label=label) #[batch_size, 1]
loss = layers.reduce_mean(softmax) # [1]
#fluid.layers.Print(loss)
acc = layers.accuracy(input=logits, label=label, k=20)
return loss, acc, data_feed, [items_emb, all_emb]
bias_attr=False)
return fluid.layers.batch_norm(
input=conv,
act=act, )
def shortcut(input, ch_out, stride, is_first):
ch_in = input.shape[1]
if ch_in != ch_out or stride != 1 or is_first == True:
return conv_bn_layer(input, ch_out, 1, stride)
else:
return input
def bottleneck_block(input, num_filters, stride):
conv0 = conv_bn_layer(
input=input, num_filters=num_filters, filter_size=1, act='relu')
conv1 = conv_bn_layer(
input=conv0,
num_filters=num_filters,
filter_size=3,
stride=stride,
act='relu')
conv2 = conv_bn_layer(
input=conv1, num_filters=num_filters * 4, filter_size=1, act=None)
short = shortcut(input, num_filters * 4, stride, is_first=False)
return fluid.layers.elementwise_add(x=short, y=conv2, act='relu')
def basic_block(input, num_filters, stride, is_first):
conv0 = conv_bn_layer(
input=input,
num_filters=num_filters,
filter_size=3,
act='relu',
stride=stride)
conv1 = conv_bn_layer(
input=conv0, num_filters=num_filters, filter_size=3, act=None)
short = shortcut(input, num_filters, stride, is_first)
return fluid.layers.elementwise_add(x=short, y=conv1, act='relu')
def network(input, layers=50, class_dim=1000):
supported_layers = [18, 34, 50, 101, 152]
assert layers in supported_layers
depth = None
if layers == 18:
depth = [2, 2, 2, 2]
elif layers == 34 or layers == 50:
depth = [3, 4, 6, 3]
elif layers == 101:
depth = [3, 4, 23, 3]
elif layers == 152:
depth = [3, 8, 36, 3]
num_filters = [64, 128, 256, 512]
with fluid.device_guard("gpu:0"):
conv = conv_bn_layer(
input=input, num_filters=64, filter_size=7, stride=2, act='relu')
conv = fluid.layers.pool2d(
input=conv,
pool_size=3,
pool_stride=2,
pool_padding=1,
pool_type='max')
if layers >= 50:
for block in range(len(depth)):
with fluid.device_guard("gpu:1"):
for i in range(depth[block]):
conv = bottleneck_block(
input=conv,
num_filters=num_filters[block],
stride=2 if i == 0 and block != 0 else 1)
with fluid.device_guard("gpu:2"):
pool = fluid.layers.pool2d(
input=conv, pool_size=7, pool_type='avg', global_pooling=True)
stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0)
out = fluid.layers.fc(
input=pool,
size=class_dim,
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(-stdv, stdv)))
else:
for block in range(len(depth)):
with fluid.device_guard("gpu:1"):
for i in range(depth[block]):
conv = basic_block(
input=conv,
num_filters=num_filters[block],
stride=2 if i == 0 and block != 0 else 1,
is_first=block == i == 0)
with fluid.device_guard("gpu:2"):
pool = fluid.layers.pool2d(
input=conv, pool_size=7, pool_type='avg', global_pooling=True)
stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0)
out = fluid.layers.fc(
input=pool,
size=class_dim,
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(-stdv, stdv)))
return out
def train():
args = parse_args()
lr = args.lr
rate = args.emb_lr_rate
train_data_dir = "./gnn_data_new_8"
filelist = [
os.path.join(train_data_dir, f) for f in os.listdir(train_data_dir)
if os.path.isfile(os.path.join(train_data_dir, f))
][:]
items_num = read_config(args.config_path)
loss, acc, data_vars, cut_list = network(batch_size, items_num,
args.hidden_size, args.step, rate)
print("card: %d, thread: %d, lr: %f, lr_rate: %f, scope: %d, sync_step: %d"
% (ncards, nreaders, lr, rate, nscopes, sync_steps))
place = fluid.CPUPlace()
with fluid.device_guard("gpu:0"):
image = fluid.layers.data(
name="image", shape=[3, 224, 224], dtype="float32")
label = fluid.layers.data(name="label", shape=[1], dtype="int64")
data_loader = fluid.io.DataLoader.from_generator(
feed_list=[image, label],
capacity=64,
use_double_buffer=True,
iterable=False)
fc = build_network(image, layers=50)
with fluid.device_guard("gpu:3"):
out, prob = fluid.layers.softmax_with_cross_entropy(
logits=fc, label=label, return_softmax=True)
loss = fluid.layers.mean(out)
acc_top1 = fluid.layers.accuracy(input=prob, label=label, k=1)
acc_top5 = fluid.layers.accuracy(input=prob, label=label, k=5)
optimizer = fluid.optimizer.SGD(lr)
optimizer = fluid.optimizer.PipelineOptimizer(optimizer, num_microbatches=2)
optimizer.minimize(loss)
def train_reader():
for _ in range(4000):
img = np.random.random(size=[3, 224, 224]).astype('float32')
label = np.random.random(size=[1]).astype('int64')
yield img, label
data_loader.set_sample_generator(train_reader, batch_size=args.batch_size)
place = fluid.CUDAPlace(0)
exe = fluid.Executor(place)
step_per_epoch = 750000 // batch_size
"""
opt = fluid.optimizer.SGD(
learning_rate=fluid.layers.exponential_decay(
learning_rate=args.lr,
decay_steps=step_per_epoch * 10,
decay_rate=args.lr_dc),
regularization=fluid.regularizer.L2DecayRegularizer(regularization_coeff=args.l2))
"""
opt = fluid.optimizer.SGD(lr)
opt = fluid.optimizer.PipelineOptimizer(
opt,
cut_list=[cut_list, [loss, acc]],
place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()],
concurrency_list=[1, 1, nreaders],
queue_size=nscopes,
sync_steps=sync_steps)
opt.minimize(loss)
exe.run(fluid.default_startup_program())
all_vocab = fluid.global_scope().var("all_vocab").get_tensor()
all_vocab.set(
np.arange(1, items_num).astype("int64").reshape((-1, 1)), place)
logger.info("begin train")
dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
dataset.set_use_var(data_vars)
dataset.set_batch_size(batch_size)
dataset.set_filelist(filelist)
total_time = []
start_time = time.time()
loss_sum = 0.0
acc_sum = 0.0
global_step = 0
for i in range(25):
logger.info("begin epoch %d" % (i))
epoch_sum = []
random.shuffle(filelist)
dataset.set_filelist(filelist)
exe.train_from_dataset(
fluid.default_main_program(),
dataset,
thread=ncards,
debug=is_profile,
fetch_list=[loss, acc],
fetch_info=["loss", "acc"],
print_period=1)
model_path = args.model_path
model_path += "_" + str(lr) + "_" + str(rate)
save_dir = model_path + "/epoch_" + str(i)
fetch_vars = [loss, acc]
feed_list = [
"items", "seq_index", "last_index", "adj_in", "adj_out", "mask",
"label"
]
fluid.io.save_inference_model(save_dir, feed_list, fetch_vars, exe)
class Data():
def __init__(self, path, shuffle=False):
data = pickle.load(open(path, 'rb'))
self.shuffle = shuffle
self.length = len(data[0])
self.input = list(zip(data[0], data[1]))
def make_data(self, cur_batch, batch_size):
cur_batch = [list(e) for e in cur_batch]
max_seq_len = 0
for e in cur_batch:
max_seq_len = max(max_seq_len, len(e[0]))
last_id = []
for e in cur_batch:
last_id.append(len(e[0]) - 1)
e[0] += [0] * (max_seq_len - len(e[0]))
max_uniq_len = 0
for e in cur_batch:
max_uniq_len = max(max_uniq_len, len(np.unique(e[0])))
items, adj_in, adj_out, seq_index, last_index = [], [], [], [], []
mask, label = [], []
id = 0
for e in cur_batch:
node = np.unique(e[0])
items.append(node.tolist() + (max_uniq_len - len(node)) * [0])
adj = np.zeros((max_uniq_len, max_uniq_len))
for i in np.arange(len(e[0]) - 1):
if e[0][i + 1] == 0:
break
u = np.where(node == e[0][i])[0][0]
v = np.where(node == e[0][i + 1])[0][0]
adj[u][v] = 1
u_deg_in = np.sum(adj, 0)
u_deg_in[np.where(u_deg_in == 0)] = 1
adj_in.append(np.divide(adj, u_deg_in).transpose())
u_deg_out = np.sum(adj, 1)
u_deg_out[np.where(u_deg_out == 0)] = 1
adj_out.append(np.divide(adj.transpose(), u_deg_out).transpose())
seq_index.append(
[np.where(node == i)[0][0] + id * max_uniq_len for i in e[0]])
last_index.append(
np.where(node == e[0][last_id[id]])[0][0] + id * max_uniq_len)
label.append(e[1] - 1)
mask.append([[1] * (last_id[id] + 1) + [0] *
(max_seq_len - last_id[id] - 1)])
id += 1
items = np.array(items).astype("uint64").reshape((batch_size, -1, 1))
seq_index = np.array(seq_index).astype("uint64").reshape(
(batch_size, -1))
last_index = np.array(last_index).astype("uint64").reshape(
(batch_size, 1))
adj_in = np.array(adj_in).astype("float32").reshape(
(batch_size, max_uniq_len, max_uniq_len))
adj_out = np.array(adj_out).astype("float32").reshape(
(batch_size, max_uniq_len, max_uniq_len))
mask = np.array(mask).astype("float32").reshape((batch_size, -1, 1))
label = np.array(label).astype("uint64").reshape((batch_size, 1))
return list(
zip(items, seq_index, last_index, adj_in, adj_out, mask, label))
def reader(self, batch_size, batch_group_size, train=True):
if self.shuffle:
random.shuffle(self.input)
group_remain = self.length % batch_group_size
for bg_id in range(0, self.length - group_remain, batch_group_size):
cur_bg = copy.deepcopy(self.input[bg_id:bg_id + batch_group_size])
if train:
cur_bg = sorted(cur_bg, key=lambda x: len(x[0]), reverse=True)
for i in range(0, batch_group_size, batch_size):
cur_batch = cur_bg[i:i + batch_size]
yield self.make_data(cur_batch, batch_size)
#deal with the remaining, discard at most batch_size data
if group_remain < batch_size:
return
remain_data = copy.deepcopy(self.input[-group_remain:])
if train:
remain_data = sorted(
remain_data, key=lambda x: len(x[0]), reverse=True)
for i in range(0, batch_group_size, batch_size):
if i + batch_size <= len(remain_data):
cur_batch = remain_data[i:i + batch_size]
yield self.make_data(cur_batch, batch_size)
def read_config(path):
with open(path, "r") as fin:
item_num = int(fin.readline())
return item_num
induce_map = {0: [0], 1: [0], 2: [], 3: [0, 1], 4: [0, 1], 5: [0], 6: []}
def binary_print(slot, fout, index):
shape_array = slot.shape
num = 1
for e in shape_array:
num *= e
num += len(induce_map[index])
num = np.uint16(num)
num.tofile(fout)
for e in induce_map[index]:
tmp_shape = np.uint64(shape_array[e])
tmp_shape.tofile(fout)
slot.tofile(fout)
def make_binary_data():
data_reader = Data('./data/diginetica/train.txt', True)
index = 0
id = -1
filename = None
fout = None
binary = True
for data in data_reader.reader(batch_size, 20 * batch_size, True):
if index % (batch_size * 900) == 0:
id += 1
if not binary:
filename = "./gnn_data_text/" + str(id)
else:
filename = "./gnn_data_new_8/" + str(id)
print("filename: " + filename)
if fout:
fout.close()
fout = open(filename, "wb" if binary else "w")
for ins in data:
for i, slot in enumerate(ins):
if binary:
binary_print(slot, fout, i)
else:
text_print(slot, fout, i)
index += batch_size
data_loader.start()
logger.info("begin training...")
exe.train_from_dataset(fluid.default_main_program(), debug=is_profile)
if __name__ == "__main__":
make_binary_data()
train()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册