提交 73334d88 编写于 作者: X xiexionghang

submit news_jingpai online config

上级 1fb811f8
#!/bin/bash
WORKDIR=`pwd`
echo "current:"$WORKDIR
mpirun -npernode 1 mv package/* ./
export LIBRARY_PATH=$WORKDIR/python/lib:$LIBRARY_PATH
ulimit -c unlimited
#export FLAGS_check_nan_inf=True
#export check_nan_inf=True
#FLAGS_check_nan_inf=True check_nan_inf=True
#mpirun -npernode 2 -timestamp-output -tag-output -machinefile ${PBS_NODEFILE} python/bin/python -u trainer_online.py
mpirun -npernode 2 -timestamp-output -tag-output python/bin/python -u trainer_online.py
if [[ $? -ne 0 ]]; then
echo "Failed to run mpi!" 1>&2
exit 1
fi
6048
6002
6145
6202
6201
6121
6738
6119
6146
6120
6147
6122
6123
6118
6142
6143
6008
6148
6151
6127
6144
6094
6083
6952
6739
6150
6109
6003
6099
6149
6129
6203
6153
6152
6128
6106
6251
7082
7515
6951
6949
7080
6066
7507
6186
6007
7514
6125
7506
10001
6006
7023
6085
10000
6098
6250
6110
6124
6090
6082
6067
6101
6004
6191
7075
6948
6157
6126
6188
7077
6070
6111
6087
6103
6107
6194
6156
6005
6247
6814
6158
7122
6058
6189
7058
6059
6115
7079
7081
6833
7024
6108
13342
13345
13412
13343
13350
13346
13409
6009
6011
6012
6013
6014
6015
6019
6023
6024
6027
6029
6031
6050
6060
6068
6069
6089
6095
6105
6112
6130
6131
6132
6134
6161
6162
6163
6166
6182
6183
6185
6190
6212
6213
6231
6233
6234
6236
6238
6239
6240
6241
6242
6243
6244
6245
6354
7002
7005
7008
7010
7013
7015
7019
7020
7045
7046
7048
7049
7052
7054
7056
7064
7066
7076
7078
7083
7084
7085
7086
7087
7088
7089
7090
7099
7100
7101
7102
7103
7104
7105
7109
7124
7126
7136
7142
7143
7144
7145
7146
7147
7148
7150
7151
7152
7153
7154
7155
7156
7157
7047
7050
6257
6259
6260
6261
7170
7185
7186
6751
6755
6757
6759
6760
6763
6764
6765
6766
6767
6768
6769
6770
7502
7503
7504
7505
7510
7511
7512
7513
6806
6807
6808
6809
6810
6811
6812
6813
6815
6816
6817
6819
6823
6828
6831
6840
6845
6875
6879
6881
6888
6889
6947
6950
6956
6957
6959
10006
10008
10009
10010
10011
10016
10017
10018
10019
10020
10021
10022
10023
10024
10029
10030
10031
10032
10033
10034
10035
10036
10037
10038
10039
10040
10041
10042
10044
10045
10046
10051
10052
10053
10054
10055
10056
10057
10060
10066
10069
6820
6821
6822
13333
13334
13335
13336
13337
13338
13339
13340
13341
13351
13352
13353
13359
13361
13362
13363
13366
13367
13368
13369
13370
13371
13375
13376
5700
5702
13400
13401
13402
13403
13404
13406
13407
13408
13410
13417
13418
13419
13420
13422
13425
13427
13428
13429
13430
13431
13433
13434
13436
13437
13326
13330
13331
5717
13442
13451
13452
13455
13456
13457
13458
13459
13460
13461
13462
13463
13464
13465
13466
13467
13468
1104
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
13812
13813
6740
1490
32915
32950
32952
32953
32954
33077
33085
33086
12345
23456
dataset_type="InMemoryDataset"
sparse_table_storage="ssd"
batch_size=32
thread_num=12
shuffle_thread=12
preload_thread=12
join_common_thread=16
update_thread=12
fs_name="afs://xingtian.afs.baidu.com:9902"
fs_ugi="mlarch_pro,proisvip"
train_data_path=["afs:/user/feed/mlarch/samplejoin/mondr_shoubai_dnn_master/feasign"]
init_model_path=""
days="{20190915..20190930} {20191001..20191031} {20191101..20191130} {20191201..20191231} {20200101..20200131}"
hours="{0..23}"
split_interval=5
split_per_pass=2
is_data_hourly_placed=False
save_first_base=False
output_path="afs:/user/feed/mlarch/model/feed_muye_ln_paddle"
pipe_command="./read_feasign | python/bin/python ins_weight.py | awk -f format_newcate_hotnews.awk | ./parse_feasign all_slot.dict"
save_xbox_before_update=True
check_exist_seconds=30
checkpoint_per_pass=36
save_delta_frequency=6
prefetch=True
write_stdout_frequency=10
need_reqi_changeslot=True
hdfs_dnn_plugin_path="afs:/user/feed/mlarch/sequence_generator/wuzhihua02/xujiaqi/test_combinejoincommon_0918_amd/20191006/base/dnn_plugin"
reqi_dnn_plugin_day=20191006
reqi_dnn_plugin_pass=0
task_name="feed_production_shoubai_video_ctr_fsort_session_cut"
nodes=119
node_memory=100000
mpi_server=yq01-hpc-lvliang01-smart-master.dmop.baidu.com
mpi_queue=feed5
mpi_priority=very_high
smart_client_home=/home/work/online_model/news_fsort/submit_jingpai_xiaoliuliang_paddlef50e701_pslibf7995_compile02255_reqi/smart_client/
local_hadoop_home=/home/work/online_model/news_fsort/submit_jingpai_xiaoliuliang_paddlef50e701_pslibf7995_compile02255_reqi/hadoop-client/hadoop
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
__all__ = ['MultiSlotDataGenerator']
class DataGenerator(object):
"""
DataGenerator is a general Base class for user to inherit
A user who wants to define his/her own python processing logic
with paddle.fluid.dataset should inherit this class.
"""
def __init__(self):
self._proto_info = None
self.batch_size_ = 32
def _set_line_limit(self, line_limit):
if not isinstance(line_limit, int):
raise ValueError("line_limit%s must be in int type" %
type(line_limit))
if line_limit < 1:
raise ValueError("line_limit can not less than 1")
self._line_limit = line_limit
def set_batch(self, batch_size):
'''
Set batch size of current DataGenerator
This is necessary only if a user wants to define generator_batch
Example:
.. code-block:: python
import paddle.fluid.incubate.data_generator as dg
class MyData(dg.DataGenerator):
def generate_sample(self, line):
def local_iter():
int_words = [int(x) for x in line.split()]
yield ("words", int_words)
return local_iter
def generate_batch(self, samples):
def local_iter():
for s in samples:
yield ("words", s[1].extend([s[1][0]]))
mydata = MyData()
mydata.set_batch(128)
'''
self.batch_size_ = batch_size
def run_from_memory(self):
'''
This function generator data from memory, it is usually used for
debug and benchmarking
Example:
.. code-block:: python
import paddle.fluid.incubate.data_generator as dg
class MyData(dg.DataGenerator):
def generate_sample(self, line):
def local_iter():
yield ("words", [1, 2, 3, 4])
return local_iter
mydata = MyData()
mydata.run_from_memory()
'''
batch_samples = []
line_iter = self.generate_sample(None)
for user_parsed_line in line_iter():
if user_parsed_line == None:
continue
batch_samples.append(user_parsed_line)
if len(batch_samples) == self.batch_size_:
batch_iter = self.generate_batch(batch_samples)
for sample in batch_iter():
sys.stdout.write(self._gen_str(sample))
batch_samples = []
if len(batch_samples) > 0:
batch_iter = self.generate_batch(batch_samples)
for sample in batch_iter():
sys.stdout.write(self._gen_str(sample))
def run_from_stdin(self):
'''
This function reads the data row from stdin, parses it with the
process function, and further parses the return value of the
process function with the _gen_str function. The parsed data will
be wrote to stdout and the corresponding protofile will be
generated.
Example:
.. code-block:: python
import paddle.fluid.incubate.data_generator as dg
class MyData(dg.DataGenerator):
def generate_sample(self, line):
def local_iter():
int_words = [int(x) for x in line.split()]
yield ("words", [int_words])
return local_iter
mydata = MyData()
mydata.run_from_stdin()
'''
batch_samples = []
for line in sys.stdin:
line_iter = self.generate_sample(line)
for user_parsed_line in line_iter():
if user_parsed_line == None:
continue
batch_samples.append(user_parsed_line)
if len(batch_samples) == self.batch_size_:
batch_iter = self.generate_batch(batch_samples)
for sample in batch_iter():
sys.stdout.write(self._gen_str(sample))
batch_samples = []
if len(batch_samples) > 0:
batch_iter = self.generate_batch(batch_samples)
for sample in batch_iter():
sys.stdout.write(self._gen_str(sample))
def _gen_str(self, line):
'''
Further processing the output of the process() function rewritten by
user, outputting data that can be directly read by the datafeed,and
updating proto_info infomation.
Args:
line(str): the output of the process() function rewritten by user.
Returns:
Return a string data that can be read directly by the datafeed.
'''
raise NotImplementedError(
"pls use MultiSlotDataGenerator or PairWiseDataGenerator")
def generate_sample(self, line):
'''
This function needs to be overridden by the user to process the
original data row into a list or tuple.
Args:
line(str): the original data row
Returns:
Returns the data processed by the user.
The data format is list or tuple:
[(name, [feasign, ...]), ...]
or ((name, [feasign, ...]), ...)
For example:
[("words", [1926, 08, 17]), ("label", [1])]
or (("words", [1926, 08, 17]), ("label", [1]))
Note:
The type of feasigns must be in int or float. Once the float
element appears in the feasign, the type of that slot will be
processed into a float.
Example:
.. code-block:: python
import paddle.fluid.incubate.data_generator as dg
class MyData(dg.DataGenerator):
def generate_sample(self, line):
def local_iter():
int_words = [int(x) for x in line.split()]
yield ("words", [int_words])
return local_iter
'''
raise NotImplementedError(
"Please rewrite this function to return a list or tuple: " +
"[(name, [feasign, ...]), ...] or ((name, [feasign, ...]), ...)")
def generate_batch(self, samples):
'''
This function needs to be overridden by the user to process the
generated samples from generate_sample(self, str) function
It is usually used as batch processing when a user wants to
do preprocessing on a batch of samples, e.g. padding according to
the max length of a sample in the batch
Args:
samples(list tuple): generated sample from generate_sample
Returns:
a python generator, the same format as return value of generate_sample
Example:
.. code-block:: python
import paddle.fluid.incubate.data_generator as dg
class MyData(dg.DataGenerator):
def generate_sample(self, line):
def local_iter():
int_words = [int(x) for x in line.split()]
yield ("words", int_words)
return local_iter
def generate_batch(self, samples):
def local_iter():
for s in samples:
yield ("words", s[1].extend([s[1][0]]))
mydata = MyData()
mydata.set_batch(128)
'''
def local_iter():
for sample in samples:
yield sample
return local_iter
class MultiSlotDataGenerator(DataGenerator):
def _gen_str(self, line):
'''
Further processing the output of the process() function rewritten by
user, outputting data that can be directly read by the MultiSlotDataFeed,
and updating proto_info infomation.
The input line will be in this format:
>>> [(name, [feasign, ...]), ...]
>>> or ((name, [feasign, ...]), ...)
The output will be in this format:
>>> [ids_num id1 id2 ...] ...
The proto_info will be in this format:
>>> [(name, type), ...]
For example, if the input is like this:
>>> [("words", [1926, 08, 17]), ("label", [1])]
>>> or (("words", [1926, 08, 17]), ("label", [1]))
the output will be:
>>> 3 1234 2345 3456 1 1
the proto_info will be:
>>> [("words", "uint64"), ("label", "uint64")]
Args:
line(str): the output of the process() function rewritten by user.
Returns:
Return a string data that can be read directly by the MultiSlotDataFeed.
'''
if not isinstance(line, list) and not isinstance(line, tuple):
raise ValueError(
"the output of process() must be in list or tuple type")
output = ""
for index, item in enumerate(line):
name, elements = item
if output:
output += " "
out_str = []
out_str.append(str(len(elements)))
out_str.extend(elements)
output += " ".join(out_str)
return output + "\n"
if self._proto_info is None:
self._proto_info = []
for index, item in enumerate(line):
name, elements = item
'''
if not isinstance(name, str):
raise ValueError("name%s must be in str type" % type(name))
if not isinstance(elements, list):
raise ValueError("elements%s must be in list type" %
type(elements))
if not elements:
raise ValueError(
"the elements of each field can not be empty, you need padding it in process()."
)
self._proto_info.append((name, "uint64"))
if output:
output += " "
output += str(len(elements))
for elem in elements:
if isinstance(elem, float):
self._proto_info[-1] = (name, "float")
elif not isinstance(elem, int) and not isinstance(elem,
long):
raise ValueError(
"the type of element%s must be in int or float" %
type(elem))
output += " " + str(elem)
'''
if output:
output += " "
out_str = []
out_str.append(str(len(elements)))
out_str.extend(elements)
output += " ".join(out_str)
else:
if len(line) != len(self._proto_info):
raise ValueError(
"the complete field set of two given line are inconsistent.")
for index, item in enumerate(line):
name, elements = item
'''
if not isinstance(name, str):
raise ValueError("name%s must be in str type" % type(name))
if not isinstance(elements, list):
raise ValueError("elements%s must be in list type" %
type(elements))
if not elements:
raise ValueError(
"the elements of each field can not be empty, you need padding it in process()."
)
if name != self._proto_info[index][0]:
raise ValueError(
"the field name of two given line are not match: require<%s>, get<%s>."
% (self._proto_info[index][0], name))
'''
if output:
output += " "
out_str = []
out_str.append(str(len(elements)))
#out_str.extend([str(x) for x in elements])
out_str.extend(elements)
output += " ".join(out_str)
'''
for elem in elements:
if self._proto_info[index][1] != "float":
if isinstance(elem, float):
self._proto_info[index] = (name, "float")
elif not isinstance(elem, int) and not isinstance(elem,
long):
raise ValueError(
"the type of element%s must be in int or float"
% type(elem))
output += " " + str(elem)
'''
return output + "\n"
#!/bin/awk -f
{
if ($1 !~ /^([0-9a-zA-Z])+$/ || $2 !~ /^([0-9])+$/ || $3 !~ /^([0-9])+$/) {
next;
}
show = $2;
clk = $3;
if (clk > show) {
clk = show;
}
for (i = 0; i < clk; i++) {
$2 = "1";
$3 = "1";
print $0;
}
for (i = 0; i < show - clk; i++) {
$2 = "1";
$3 = "0";
print $0;
}
}
#!/usr/bin/python
import sys
import re
import math
del_text_slot = True
g_ratio = 1
w_ratio = 0.01
slots_str = "6048 6145 6202 6201 6121 6119 6146 6120 6147 6122 6123 6118 6142 6143 6008 6148 6151 6127 6144 6150 6109 6003 6096 6149 6129 6203 6153 6152 6128 6106 6251 7082 7515 7080 6066 7507 6186 6007 7514 6054 6125 7506 10001 6006 6080 7023 6085 10000 6250 6110 6124 6090 6082 6067 7516 6101 6004 6191 6188 6070 6194 6247 6814 7512 10007 6058 6189 6059 7517 10005 7510 7024 7502 7503 6183 7511 6060 6806 7504 6185 6810 6248 10004 6815 6182 10068 6069 6073 6196 6816 7513 6071 6809 6072 6817 6190 7505 6813 6192 6807 6808 6195 6826 6184 6197 6068 6812 7107 6811 6823 6824 6819 6818 6821 6822 6820 6094 6083 6952 6099 6951 6949 6098 7075 6948 6157 6126 7077 6111 6087 6103 6107 6156 6005 6158 7122 6155 7058 6115 7079 7081 6833 6108 6840 6837 7147 7129 6097 6231 6957 7145 6956 7143 6130 7149 7142 6212 6827 7144 6089 6161 7055 6233 6105 7057 6237 6828 6850 6163 7124 6354 6162 7146 6830 7123 6160 6235 7056 6081 6841 6132 6954 6131 6236 6831 6845 6832 6953 6839 6950 7125 7054 6138 6166 6076 6851 6353 7076 7148 6858 6842 6860 7126 6829 6835 7078 6866 6869 6871 7052 6134 6855 6947 6862 6215 6852 7128 6092 6112 6213 6232 6863 6113 6165 6214 6216 6873 6865 6870 6077 6234 6861 6164 6217 7127 6218 6962 7053 7051 6961 6002 6738 6739 10105 7064 6751 6770 7100 6014 6765 6755 10021 10022 6010 10056 6011 6756 10055 6768 10024 6023 10003 6769 10002 6767 6759 10018 6024 6064 6012 6050 10042 6168 6253 10010 10020 6015 6018 10033 10041 10039 10031 10016 6764 7083 7152 7066 6171 7150 7085 6255 10044 10008 7102 6167 6240 6238 6095 10017 10046 6019 6031 6763 6256 6169 6254 10034 7108 7186 6257 10019 6757 10040 6025 7019 7086 10029 10011 7104 6261 6013 6766 10106 7105 7153 7089 6057 7134 7151 7045 7005 7008 7101 6035 7137 10023 6036 6172 7099 7087 6239 7185 6170 10006 6243 6350 7103 7090 7157 6259 7171 6875 7084 7154 6242 6260 7155 7017 7048 7156 6959 7047 10053 7135 6244 7136 10030 7063 6760 7016 7065 7179 6881 7018 6876 10081 10052 10054 10038 6886 10069 7004 10051 7007 7109 10057 6029 6888 10009 6889 7021 10047 6245 6878 10067 6879 6884 7180 7182 10071 7002 6880 6890 6887 10061 6027 6877 6892 10060 6893 7050 10036 7049 10012 10025 7012 7183 10058 7181 10086 6891 6258 6894 6883 7046 6037 7106 10043 10048 10045 10087 6885 10013 10028 7187 10037 10035 10050 6895 7011 7170 7172 10026 10063 10095 10082 10084 6960 10092 10075 6038 7010 7015 10015 10027 10064 7184 10014 10059 7013 7020 10072 10066 10080 6896 10083 10090 6039 10049 7164 7165 10091 10099 6963 7166 10079 10103 7006 7009 7169 6034 7028 7029 7030 7034 7035 7036 7040 7041 7042 10032 6009 6241 7003 7014 7088 13326 13330 13331 13352 13353 6198"
slot_whitelist = slots_str.split(" ")
def calc_ins_weight(params, label):
"""calc ins weight"""
global g_ratio
global w_ratio
slots = []
s_clk_num = 0
s_show_num = 0
active = 0
attclk_num = 0
attshow_num = 0
attclk_avg = 0
for items in params:
if len(items) != 2:
continue
slot_name = items[0]
slot_val = items[1]
if slot_name not in slots:
slots.append(slot_name)
if slot_name == "session_click_num":
s_clk_num = int(slot_val)
if slot_name == "session_show_num":
s_show_num = int(slot_val)
if slot_name == "activity":
active = float(slot_val) / 10000.0
w = 1
# for inactive user
if active >= 0 and active < 0.4 and s_show_num >=0 and s_show_num < 20:
w = math.log(w_ratio * (420 - (active * 50 + 1) * (s_show_num + 1)) + math.e)
if label == "0":
w = 1 + (w - 1) * g_ratio
return w
def filter_whitelist_slot(tmp_line):
terms = tmp_line.split()
line = "%s %s %s" % (terms[0], terms[1], terms[2])
for item in terms[3:]:
feasign = item.split(':')
if len(feasign) == 2 and \
feasign[1] in slot_whitelist:
line = "%s %s" %(line, item)
return line
def get_sample_type(line):
# vertical_type = 20
# if line.find("13038012583501790:6738") > 0:
# return 30
# vertical_type = 0/5/1/2/9/11/13/16/29/-1
if (line.find("7408512894065610:6738") > 0) or \
(line.find("8815887816424655:6738") > 0) or \
(line.find("7689987878537419:6738") > 0) or \
(line.find("7971462863009228:6738") > 0) or \
(line.find("9941787754311891:6738") > 0) or \
(line.find("10504737723255509:6738") > 0) or \
(line.find("11067687692199127:6738") > 0) or \
(line.find("11912112645614554:6738") > 0) or \
(line.find("15571287443748071:6738") > 0) or \
(line.find("7127025017546227:6738") > 0):
return 20
return -1
def main():
"""ins adjust"""
global del_text_slot
for l in sys.stdin:
l = l.rstrip("\n")
items = l.split(" ")
if len(items) < 3:
continue
label = items[2]
lines = l.split("\t")
line = lines[0]
# streaming ins include all ins, sample_type only handle NEWS ins
sample_type = -1
if 'NEWS' in l:
sample_type = get_sample_type(line)
#line = filter_whitelist_slot(tmp_line)
if len(lines) >= 4:
if 'VIDEO' in lines[3]:
continue
params = lines[2]
params = params.split(" ")
m = [tuple(i.split(":")) for i in params]
if m is None or len(m) == 0:
if sample_type > 0:
print "%s $%s *1" % (line, sample_type)
else:
print "%s *1" % line
sys.stdout.flush()
continue
weight = calc_ins_weight(m, label)
if sample_type > 0:
print "%s $%s *%s" % (line, sample_type, weight)
else:
print "%s *%s" % (line, weight)
sys.stdout.flush()
else:
if sample_type > 0:
print "%s $%s *1" % (line, sample_type)
else:
print "%s *1" % line
sys.stdout.flush()
if __name__ == "__main__":
if len(sys.argv) > 1:
if sys.argv[1] == "0":
del_text_slot = False
if len(sys.argv) > 2:
g_ratio = float(sys.argv[2])
if len(sys.argv) > 3:
w_ratio = float(sys.argv[3])
main()
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
class Model(object):
def __init__(self, slot_file_name, all_slot_file, use_cvm, ins_tag, is_update_model):
self._slot_file_name = slot_file_name
self._use_cvm = use_cvm
self._dict_dim = 10 # it's fake
self._emb_dim = 9 + 2
self._init_range = 0.2
self._all_slot_file = all_slot_file
self._not_use_slots = []
self._not_use_slotemb = []
self._all_slots = []
self._ins_tag_value = ins_tag
self._is_update_model = is_update_model
self._train_program = fluid.Program()
self._startup_program = fluid.Program()
self.save_vars = []
with fluid.program_guard(self._train_program, self._startup_program):
with fluid.unique_name.guard():
self.show = fluid.layers.data(name="show", shape=[-1, 1], dtype="int64", lod_level=0, append_batch_size=False)
self.label = fluid.layers.data(name="click", shape=[-1, 1], dtype="int64", lod_level=0, append_batch_size=False)
self.ins_weight = fluid.layers.data(
name="12345",
shape=[-1, 1],
dtype="float32",
lod_level=0,
append_batch_size=False,
stop_gradient=True)
self.ins_tag = fluid.layers.data(
name="23456",
shape=[-1, 1],
dtype="int64",
lod_level=0,
append_batch_size=False,
stop_gradient=True)
self.slots = []
self.slots_name = []
self.embs = []
if self._ins_tag_value != 0:
self.x3_ts = fluid.layers.create_global_var(shape=[1,1], value=self._ins_tag_value, dtype='int64', persistable=True, force_cpu=True, name='X3')
self.x3_ts.stop_gradient=True
self.label_after_filter, self.filter_loss = fluid.layers.filter_by_instag(self.label, self.ins_tag, self.x3_ts, True)
self.label_after_filter.stop_gradient=True
self.show_after_filter, _ = fluid.layers.filter_by_instag(self.show, self.ins_tag, self.x3_ts, True)
self.show_after_filter.stop_gradient=True
self.ins_weight_after_filter, _ = fluid.layers.filter_by_instag(self.ins_weight, self.ins_tag, self.x3_ts, True)
self.ins_weight_after_filter.stop_gradient=True
for line in open(self._slot_file_name, 'r'):
slot = line.strip()
self.slots_name.append(slot)
self.all_slots_name = []
for line in open(self._all_slot_file, 'r'):
self.all_slots_name.append(line.strip())
for i in self.all_slots_name:
if i == self.ins_weight.name or i == self.ins_tag.name:
pass
elif i not in self.slots_name:
pass
else:
l = fluid.layers.data(name=i, shape=[1], dtype="int64", lod_level=1)
emb = fluid.layers.embedding(input=l, size=[self._dict_dim, self._emb_dim], is_sparse = True, is_distributed=True, param_attr=fluid.ParamAttr(name="embedding"))
self.slots.append(l)
self.embs.append(emb)
if self._ins_tag_value != 0:
self.emb = self.slot_net(self.slots, self.label_after_filter)
else:
self.emb = self.slot_net(self.slots, self.label)
self.similarity_norm = fluid.layers.sigmoid(fluid.layers.clip(self.emb, min=-15.0, max=15.0), name="similarity_norm")
if self._ins_tag_value != 0:
self.cost = fluid.layers.log_loss(input=self.similarity_norm, label=fluid.layers.cast(x=self.label_after_filter, dtype='float32'))
else:
self.cost = fluid.layers.log_loss(input=self.similarity_norm, label=fluid.layers.cast(x=self.label, dtype='float32'))
if self._ins_tag_value != 0:
self.cost = fluid.layers.elementwise_mul(self.cost, self.ins_weight_after_filter)
else:
self.cost = fluid.layers.elementwise_mul(self.cost, self.ins_weight)
if self._ins_tag_value != 0:
self.cost = fluid.layers.elementwise_mul(self.cost, self.filter_loss)
self.avg_cost = fluid.layers.mean(x=self.cost)
binary_predict = fluid.layers.concat(
input=[fluid.layers.elementwise_sub(fluid.layers.ceil(self.similarity_norm), self.similarity_norm), self.similarity_norm], axis=1)
if self._ins_tag_value != 0:
self.auc, batch_auc, [self.batch_stat_pos, self.batch_stat_neg, self.stat_pos, self.stat_neg] = \
fluid.layers.auc(input=binary_predict, label=self.label_after_filter, curve='ROC', num_thresholds=4096)
self.sqrerr, self.abserr, self.prob, self.q, self.pos, self.total = \
fluid.contrib.layers.ctr_metric_bundle(self.similarity_norm, fluid.layers.cast(x=self.label_after_filter, dtype='float32'))
#self.precise_ins_num = fluid.layers.create_global_var(persistable=True, dtype='float32', shape=[1])
#batch_ins_num = fluid.layers.reduce_sum(self.filter_loss)
#self.precise_ins_num = fluid.layers.elementwise_add(batch_ins_num, self.precise_ins_num)
else:
self.auc, batch_auc, [self.batch_stat_pos, self.batch_stat_neg, self.stat_pos, self.stat_neg] = \
fluid.layers.auc(input=binary_predict, label=self.label, curve='ROC', num_thresholds=4096)
self.sqrerr, self.abserr, self.prob, self.q, self.pos, self.total = \
fluid.contrib.layers.ctr_metric_bundle(self.similarity_norm, fluid.layers.cast(x=self.label, dtype='float32'))
self.tmp_train_program = fluid.Program()
self.tmp_startup_program = fluid.Program()
with fluid.program_guard(self.tmp_train_program, self.tmp_startup_program):
with fluid.unique_name.guard():
self._all_slots = [self.show, self.label]
self._merge_slots = []
for i in self.all_slots_name:
if i == self.ins_weight.name:
self._all_slots.append(self.ins_weight)
elif i == self.ins_tag.name:
self._all_slots.append(self.ins_tag)
else:
l = fluid.layers.data(name=i, shape=[1], dtype="int64", lod_level=1)
self._all_slots.append(l)
self._merge_slots.append(l)
def slot_net(self, slots, label, lr_x=1.0):
input_data = []
cvms = []
cast_label = fluid.layers.cast(label, dtype='float32')
cast_label.stop_gradient = True
ones = fluid.layers.fill_constant_batch_size_like(input=label, shape=[-1, 1], dtype="float32", value=1)
show_clk = fluid.layers.cast(fluid.layers.concat([ones, cast_label], axis=1), dtype='float32')
show_clk.stop_gradient = True
for index in range(len(slots)):
input_data.append(slots[index])
emb = self.embs[index]
bow = fluid.layers.sequence_pool(input=emb, pool_type='sum')
cvm = fluid.layers.continuous_value_model(bow, show_clk, self._use_cvm)
cvms.append(cvm)
concat = None
if self._ins_tag_value != 0:
concat = fluid.layers.concat(cvms, axis=1)
concat, _ = fluid.layers.filter_by_instag(concat, self.ins_tag, self.x3_ts, False)
else:
concat = fluid.layers.concat(cvms, axis=1)
bn = concat
if self._use_cvm:
bn = fluid.layers.data_norm(input=concat, name="bn6048", epsilon=1e-4,
param_attr={
"batch_size":1e4,
"batch_sum_default":0.0,
"batch_square":1e4})
self.save_vars.append(bn)
fc_layers_input = [bn]
if self._is_update_model:
fc_layers_size = [511, 255, 127, 127, 127, 1]
else:
fc_layers_size = [511, 255, 255, 127, 127, 127, 127, 1]
fc_layers_act = ["relu"] * (len(fc_layers_size) - 1) + [None]
scales_tmp = [bn.shape[1]] + fc_layers_size
scales = []
for i in range(len(scales_tmp)):
scales.append(self._init_range / (scales_tmp[i] ** 0.5))
for i in range(len(fc_layers_size)):
fc = fluid.layers.fc(
input = fc_layers_input[-1],
size = fc_layers_size[i],
act = fc_layers_act[i],
param_attr = \
fluid.ParamAttr(learning_rate=lr_x, \
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i])),
bias_attr = \
fluid.ParamAttr(learning_rate=lr_x, \
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i])))
fc_layers_input.append(fc)
self.save_vars.append(fc)
return fc_layers_input[-1]
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
class ModelJoinCommon(object):
def __init__(self, slot_file_name, slot_common_file_name, all_slot_file, join_ins_tag):
self.slot_file_name = slot_file_name
self.slot_common_file_name = slot_common_file_name
self.dict_dim = 10 # it's fake
self.emb_dim = 9 + 2
self.init_range = 0.2
self.all_slot_file = all_slot_file
self.ins_tag_v = join_ins_tag
self._train_program = fluid.Program()
self._startup_program = fluid.Program()
with fluid.program_guard(self._train_program, self._startup_program):
with fluid.unique_name.guard():
self.show = fluid.layers.data(name="show", shape=[-1, 1], dtype="int64", lod_level=0, append_batch_size=False)
self.label = fluid.layers.data(name="click", shape=[-1, 1], dtype="int64", lod_level=0, append_batch_size=False)
self.ins_weight = fluid.layers.data(
name="12345",
shape=[-1, 1],
dtype="float32",
lod_level=0,
append_batch_size=False,
stop_gradient=True)
self.ins_tag = fluid.layers.data(
name="23456",
shape=[-1, 1],
dtype="int64",
lod_level=0,
append_batch_size=False,
stop_gradient=True)
self.x3_ts = fluid.layers.create_global_var(shape=[1,1], value=self.ins_tag_v, dtype='int64', persistable=True, force_cpu=True, name='X3')
self.x3_ts.stop_gradient=True
self.label_after_filter, self.filter_loss = fluid.layers.filter_by_instag(self.label, self.ins_tag, self.x3_ts, True)
self.label_after_filter.stop_gradient=True
self.show_after_filter, _ = fluid.layers.filter_by_instag(self.show, self.ins_tag, self.x3_ts, True)
self.show_after_filter.stop_gradient=True
self.ins_weight_after_filter, _ = fluid.layers.filter_by_instag(self.ins_weight, self.ins_tag, self.x3_ts, True)
self.ins_weight_after_filter.stop_gradient=True
self.slots_name = []
for line in open(self.slot_file_name, 'r'):
slot = line.strip()
self.slots_name.append(slot)
self.all_slots_name = []
for line in open(self.all_slot_file, 'r'):
self.all_slots_name.append(line.strip())
self.slots = []
self.embs = []
for i in self.all_slots_name:
if i == self.ins_weight.name or i == self.ins_tag.name:
pass
elif i not in self.slots_name:
pass
else:
l = fluid.layers.data(name=i, shape=[1], dtype="int64", lod_level=1)
emb = fluid.layers.embedding(input=l, size=[self.dict_dim, self.emb_dim], is_sparse = True, is_distributed=True, param_attr=fluid.ParamAttr(name="embedding"))
self.slots.append(l)
self.embs.append(emb)
self.common_slot_name = []
for i in open(self.slot_common_file_name, 'r'):
self.common_slot_name.append(i.strip())
cvms = []
cast_label = fluid.layers.cast(self.label, dtype='float32')
cast_label.stop_gradient = True
ones = fluid.layers.fill_constant_batch_size_like(input=self.label, shape=[-1, 1], dtype="float32", value=1)
show_clk = fluid.layers.cast(fluid.layers.concat([ones, cast_label], axis=1), dtype='float32')
show_clk.stop_gradient = True
for index in range(len(self.embs)):
emb = self.embs[index]
emb.stop_gradient=True
bow = fluid.layers.sequence_pool(input=emb, pool_type='sum')
bow.stop_gradient=True
cvm = fluid.layers.continuous_value_model(bow, show_clk, True)
cvm.stop_gradient=True
cvms.append(cvm)
concat_join = fluid.layers.concat(cvms, axis=1)
concat_join.stop_gradient=True
cvms_common = []
for index in range(len(self.common_slot_name)):
cvms_common.append(cvms[index])
concat_common = fluid.layers.concat(cvms_common, axis=1)
concat_common.stop_gradient=True
bn_common = fluid.layers.data_norm(input=concat_common, name="common", epsilon=1e-4, param_attr={"batch_size":1e4,"batch_sum_default":0.0,"batch_square":1e4})
concat_join, _ = fluid.layers.filter_by_instag(concat_join, self.ins_tag, self.x3_ts, False)
concat_join.stop_gradient=True
bn_join = fluid.layers.data_norm(input=concat_join, name="join", epsilon=1e-4, param_attr={"batch_size":1e4,"batch_sum_default":0.0,"batch_square":1e4})
join_fc = self.fcs(bn_join, "join")
join_similarity_norm = fluid.layers.sigmoid(fluid.layers.clip(join_fc, min=-15.0, max=15.0), name="join_similarity_norm")
join_cost = fluid.layers.log_loss(input=join_similarity_norm, label=fluid.layers.cast(x=self.label_after_filter, dtype='float32'))
join_cost = fluid.layers.elementwise_mul(join_cost, self.ins_weight_after_filter)
join_cost = fluid.layers.elementwise_mul(join_cost, self.filter_loss)
join_avg_cost = fluid.layers.mean(x=join_cost)
common_fc = self.fcs(bn_common, "common")
common_similarity_norm = fluid.layers.sigmoid(fluid.layers.clip(common_fc, min=-15.0, max=15.0), name="common_similarity_norm")
common_cost = fluid.layers.log_loss(input=common_similarity_norm, label=fluid.layers.cast(x=self.label, dtype='float32'))
common_cost = fluid.layers.elementwise_mul(common_cost, self.ins_weight)
common_avg_cost = fluid.layers.mean(x=common_cost)
self.joint_cost = join_avg_cost + common_avg_cost
join_binary_predict = fluid.layers.concat(
input=[fluid.layers.elementwise_sub(fluid.layers.ceil(join_similarity_norm), join_similarity_norm), join_similarity_norm], axis=1)
self.join_auc, batch_auc, [self.join_batch_stat_pos, self.join_batch_stat_neg, self.join_stat_pos, self.join_stat_neg] = \
fluid.layers.auc(input=join_binary_predict, label=self.label_after_filter, curve='ROC', num_thresholds=4096)
self.join_sqrerr, self.join_abserr, self.join_prob, self.join_q, self.join_pos, self.join_total = \
fluid.contrib.layers.ctr_metric_bundle(join_similarity_norm, fluid.layers.cast(x=self.label_after_filter, dtype='float32'))
common_binary_predict = fluid.layers.concat(
input=[fluid.layers.elementwise_sub(fluid.layers.ceil(common_similarity_norm), common_similarity_norm), common_similarity_norm], axis=1)
self.common_auc, batch_auc, [self.common_batch_stat_pos, self.common_batch_stat_neg, self.common_stat_pos, self.common_stat_neg] = \
fluid.layers.auc(input=common_binary_predict, label=self.label, curve='ROC', num_thresholds=4096)
self.common_sqrerr, self.common_abserr, self.common_prob, self.common_q, self.common_pos, self.common_total = \
fluid.contrib.layers.ctr_metric_bundle(common_similarity_norm, fluid.layers.cast(x=self.label, dtype='float32'))
self.tmp_train_program = fluid.Program()
self.tmp_startup_program = fluid.Program()
with fluid.program_guard(self.tmp_train_program, self.tmp_startup_program):
with fluid.unique_name.guard():
self._all_slots = [self.show, self.label]
self._merge_slots = []
for i in self.all_slots_name:
if i == self.ins_weight.name:
self._all_slots.append(self.ins_weight)
elif i == self.ins_tag.name:
self._all_slots.append(self.ins_tag)
else:
l = fluid.layers.data(name=i, shape=[1], dtype="int64", lod_level=1)
self._all_slots.append(l)
self._merge_slots.append(l)
def fcs(self, bn, prefix):
fc_layers_input = [bn]
fc_layers_size = [511, 255, 255, 127, 127, 127, 127, 1]
fc_layers_act = ["relu"] * (len(fc_layers_size) - 1) + [None]
scales_tmp = [bn.shape[1]] + fc_layers_size
scales = []
for i in range(len(scales_tmp)):
scales.append(self.init_range / (scales_tmp[i] ** 0.5))
for i in range(len(fc_layers_size)):
name = prefix+"_"+str(i)
fc = fluid.layers.fc(
input = fc_layers_input[-1],
size = fc_layers_size[i],
act = fc_layers_act[i],
param_attr = \
fluid.ParamAttr(learning_rate=1.0, \
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i])),
bias_attr = \
fluid.ParamAttr(learning_rate=1.0, \
initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=1.0 * scales[i])),
name=name)
fc_layers_input.append(fc)
return fc_layers_input[-1]
import sys
import os
import paddle
import re
import collections
import time
#import paddle.fluid.incubate.data_generator as dg
import data_generate_base as dg
class MyDataset(dg.MultiSlotDataGenerator):
def load_resource(self, dictf):
self._all_slots_dict = collections.OrderedDict()
with open(dictf, 'r') as f:
slots = f.readlines()
for index, slot in enumerate(slots):
#self._all_slots_dict[slot.strip()] = [False, index + 3] #+3 #
self._all_slots_dict[slot.strip()] = [False, index + 2]
def generate_sample(self, line):
def data_iter_str():
s = line.split('\t')[0].split()#[1:]
lineid = s[0]
elements = s[1:] #line.split('\t')[0].split()[1:]
padding = "0"
# output = [("lineid", [lineid]), ("show", [elements[0]]), ("click", [elements[1]])]
output = [("show", [elements[0]]), ("click", [elements[1]])]
output.extend([(slot, []) for slot in self._all_slots_dict])
for elem in elements[2:]:
if elem.startswith("*"):
feasign = elem[1:]
slot = "12345"
elif elem.startswith("$"):
feasign = elem[1:]
if feasign == "D":
feasign = "0"
slot = "23456"
else:
feasign, slot = elem.split(':')
#feasign, slot = elem.split(':')
if not self._all_slots_dict.has_key(slot):
continue
self._all_slots_dict[slot][0] = True
index = self._all_slots_dict[slot][1]
output[index][1].append(feasign)
for slot in self._all_slots_dict:
visit, index = self._all_slots_dict[slot]
if visit:
self._all_slots_dict[slot][0] = False
else:
output[index][1].append(padding)
#print output
yield output
return data_iter_str
def data_iter():
elements = line.split('\t')[0].split()[1:]
padding = 0
output = [("show", [int(elements[0])]), ("click", [int(elements[1])])]
#output += [(slot, []) for slot in self._all_slots_dict]
output.extend([(slot, []) for slot in self._all_slots_dict])
for elem in elements[2:]:
feasign, slot = elem.split(':')
if slot == "12345":
feasign = float(feasign)
else:
feasign = int(feasign)
if not self._all_slots_dict.has_key(slot):
continue
self._all_slots_dict[slot][0] = True
index = self._all_slots_dict[slot][1]
output[index][1].append(feasign)
for slot in self._all_slots_dict:
visit, index = self._all_slots_dict[slot]
if visit:
self._all_slots_dict[slot][0] = False
else:
output[index][1].append(padding)
yield output
return data_iter
if __name__ == "__main__":
#start = time.clock()
d = MyDataset()
d.load_resource("all_slot.dict")
d.run_from_stdin()
#elapsed = (time.clock() - start)
#print("Time used:",elapsed)
6048
6002
6145
6202
6201
6121
6738
6119
6146
6120
6147
6122
6123
6118
6142
6143
6008
6148
6151
6127
6144
6094
6083
6952
6739
6150
6109
6003
6099
6149
6129
6203
6153
6152
6128
6106
6251
7082
7515
6951
6949
7080
6066
7507
6186
6007
7514
6125
7506
10001
6006
7023
6085
10000
6098
6250
6110
6124
6090
6082
6067
6101
6004
6191
7075
6948
6157
6126
6188
7077
6070
6111
6087
6103
6107
6194
6156
6005
6247
6814
6158
7122
6058
6189
7058
6059
6115
7079
7081
6833
7024
6108
13342
13345
13412
13343
13350
13346
13409
6009
6011
6012
6013
6014
6015
6019
6023
6024
6027
6029
6031
6050
6060
6068
6069
6089
6095
6105
6112
6130
6131
6132
6134
6161
6162
6163
6166
6182
6183
6185
6190
6212
6213
6231
6233
6234
6236
6238
6239
6240
6241
6242
6243
6244
6245
6354
7002
7005
7008
7010
7012
7013
7015
7016
7017
7018
7019
7020
7045
7046
7048
7049
7052
7054
7056
7064
7066
7076
7078
7083
7084
7085
7086
7087
7088
7089
7090
7099
7100
7101
7102
7103
7104
7105
7109
7124
7126
7136
7142
7143
7144
7145
7146
7147
7148
7150
7151
7152
7153
7154
7155
7156
7157
7047
7050
6253
6254
6255
6256
6257
6259
6260
6261
7170
7185
7186
6751
6755
6757
6759
6760
6763
6764
6765
6766
6767
6768
6769
6770
7502
7503
7504
7505
7510
7511
7512
7513
6806
6807
6808
6809
6810
6811
6812
6813
6815
6816
6817
6819
6823
6828
6831
6840
6845
6875
6879
6881
6888
6889
6947
6950
6956
6957
6959
10006
10008
10009
10010
10011
10016
10017
10018
10019
10020
10021
10022
10023
10024
10029
10030
10031
10032
10033
10034
10035
10036
10037
10038
10039
10040
10041
10042
10044
10045
10046
10051
10052
10053
10054
10055
10056
10057
10060
10066
10069
6820
6821
6822
13333
13334
13335
13336
13337
13338
13339
13340
13341
13351
13352
13353
13359
13361
13362
13363
13366
13367
13368
13369
13370
13371
13375
13376
5700
5702
13400
13401
13402
13403
13404
13406
13407
13408
13410
13417
13418
13419
13420
13422
13425
13427
13428
13429
13430
13431
13433
13434
13436
13437
13326
13330
13331
5717
13442
13451
13452
13455
13456
13457
13458
13459
13460
13461
13462
13463
13464
13465
13466
13467
13468
1104
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
13812
13813
6740
1490
1491
6048
6002
6145
6202
6201
6121
6738
6119
6146
6120
6147
6122
6123
6118
6142
6143
6008
6148
6151
6127
6144
6094
6083
6952
6739
6150
6109
6003
6099
6149
6129
6203
6153
6152
6128
6106
6251
7082
7515
6951
6949
7080
6066
7507
6186
6007
7514
6125
7506
10001
6006
7023
6085
10000
6098
6250
6110
6124
6090
6082
6067
6101
6004
6191
7075
6948
6157
6126
6188
7077
6070
6111
6087
6103
6107
6194
6156
6005
6247
6814
6158
7122
6058
6189
7058
6059
6115
7079
7081
6833
7024
6108
13342
13345
13412
13343
13350
13346
13409
with open("session_slot", "r") as fin:
res = []
for i in fin:
res.append("\"" + i.strip() + "\"")
print ", ".join(res)
#!/bin/awk -f
{
OFS="\t";
SHOW_RATIO = 1;
CLK_RATIO = 8;
LR_RATIO = 1024;
MF_RATIO = 1024;
}
function decompress_show(x) {
x = x * 1.0 / SHOW_RATIO;
return x;
}
function decompress_clk(x) {
if (x == "") {
x = 0;
}
x = x * 1.0 / CLK_RATIO;
return x;
}
function decompress_lr(x) {
return x * 1.0 / LR_RATIO;
}
function decompress_mf(x) {
return x * 1.0 / MF_RATIO;
}
function show_clk_sore(show, clk, nonclk_coeff, clk_coeff) {
return (show - clk) * nonclk_coeff + clk * clk_coeff;
}
#key, show, clk, pred, lr_w, mf_w or [\t]
{
l=split($0, a, "\t");
show = decompress_show(a[2]);
click = decompress_clk(a[3]);
lr = decompress_lr(a[5]);
printf("%s\t0\t0\t%s\t%s\t%s\t0\t", a[1], show, click, lr);
if (l == 7) {
printf("\n");
} else {
printf("%d", l-5)
for(i = 6; i <= l; i++) {
printf("\t%s", decompress_mf(a[i]));
}
printf("\n");
}
}
此差异已折叠。
此差异已折叠。
with open("session_slot", "r") as fin:
res = []
for i in fin:
res.append("\"" + i.strip() + "\"")
print ", ".join(res)
此差异已折叠。
此差异已折叠。
SERVER=yq01-hpc-lvliang01-smart-master.dmop.baidu.com
QUEUE=feed5
PRIORITY=very_high
USE_FLAGS_ADVRES=yes
此差异已折叠。
此差异已折叠。
add_subdirectory(src)
add_subdirectory(pybind)
add_subdirectory(tool)
add_executable(parse_feasign parse_feasign.cpp)
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册