paddle mpi预测时,每个节点把所有的样本都预测了一遍
Created by: shenchong721
这个是Receiver脚本
HDFS_train_path='*/cookie_feature_uniq_test'
HDFS_test_path='*/cookie_feature_uniq_test'
HDFS_output_path='*/cookie_vector1'
HDFS_model_path='*/model_param/'
./paddle cluster_train \
--config work/infer_cookie_vector.py \
--port 12864 \
--use_gpu cpu \
--use_remote_sparse 0 \
--time_limit 100:30:00 \
--submitter *** \
--num_nodes 3 \
--job_name cookie_to_vector1 \
--job_priority normal \
--fs_name * \
--fs_ugi *,* \
--where * \
--train_data_path ${HDFS_train_path} \
--init_model_path ${HDFS_model_path} \
--test_data_path ${HDFS_test_path} \
--output_path ${HDFS_output_path} \
--thirdparty ./thirdparty
这个是分发数据的代码
cluster_train_dir = "./train_data_dir/cookie_feature_uniq_test"
cluster_test_dir = "./test_data_dir/cookie_feature_uniq_test"
node_id = int(os.getenv("OMPI_COMM_WORLD_RANK"))
# 读数据函数
def data_reader(data_dir, node_id):
def parse_ins(ss):
if ss == 'None':
return [(0,0.0),]
results = []
ins_list = ss.strip().split(',')
for term in ins_list:
ins = term.split(' ')[0]
score = float(term.split(' ')[1])
results.append((ins_dict[ins], score))
return results
def parse_edu(ss):
if ss == 'None':
return [(0,0.0),]
idx, score = ss.strip().split(' ')
return [(edu_dict[idx], float(score)),]
def reader_cookie_feature():
for file_name in os.listdir(data_dir):
with open(os.path.join(data_dir, file_name), "r") as f:
for line in f:
TRADE_ID = trade_dict["<UNK>"]
USER_ID = user_dict["<UNK>"]
WORD_ID = word_dict["<UNK>"]
ss = line.strip().split("\t")
if len(ss) != 5:
continue
label = [1]
cookie = ss[0]
if ss[1] != 'None' and ss[1].split(' ')[0] not in ['1', '0']:
continue
gender = [(0,0.0),] if ss[1] == 'None' else \
[(int(ss[1].split(' ')[0]), float(ss[1].split(' ')[1])),]
age = [(0,0.0),] if ss[2] == 'None' else \
[(int(ss[2].split(' ')[0]), float(ss[2].split(' ')[1])),]
ins = parse_ins(ss[3])
edu = [(0,0)]
user_query = [WORD_ID]
user_bidword = [WORD_ID]
user = USER_ID
trade = TRADE_ID
yield gender, age, ins, edu, user_query, user_bidword, [user], [trade], label, cookie
return reader_cookie_feature
infer 函数
test_reader = data_reader(cluster_test_dir, node_id)()
prob_layer = fc_net(is_infer=True, get_param=True)
inferer = paddle.inference.Inference(output_layer=prob_layer, parameters=parameters)
file_index = 0
total_num = 0
test_batch = []
label_batch = []
user_batch = []
for idx, item in enumerate(test_reader):
test_batch.append(item)
user_batch.append(item[-1])
if len(test_batch) == batch_size:
output = inferer.infer(input=test_batch,field=['value'],feeding=feeding)
file_index_str = "%05d" % file_index
file_index += 1
wf = open('./output/part-' + file_index_str, 'w')
for i in range(len(test_batch)):
wf.write('\t'.join([user_batch[i], ';'.join([str(i) for i in output[i]])]) + '\n')
wf.close()
test_batch = []
label_batch = []
user_batch = []
main函数默认参数
paddle.init(use_gpu=False,
trainer_count=int(os.getenv("PADDLE_TRAINER_COUNT", "1")),
port=int(os.getenv("PADDLE_PORT", "7164")),
ports_num=int(os.getenv("PADDLE_PORTS_NUM", "1")),
num_gradient_servers=int(os.getenv("PADDLE_NUM_GRADIENT_SERVERS", "1")),
trainer_id=int(os.getenv("PADDLE_TRAINER_ID", "0")),
pservers=os.getenv("PADDLE_PSERVERS", "127.0.0.1"))