diff --git a/models/recall/gnn/__init__.py b/models/recall/gnn/__init__.py new file mode 100755 index 0000000000000000000000000000000000000000..abf198b97e6e818e1fbe59006f98492640bcee54 --- /dev/null +++ b/models/recall/gnn/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/models/recall/gnn/config.yaml b/models/recall/gnn/config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..19eeb9e4f8fcbf473e2ef399801fe4d2f85a468d --- /dev/null +++ b/models/recall/gnn/config.yaml @@ -0,0 +1,56 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +evaluate: + workspace: "fleetrec.models.recall.gnn" + reader: + batch_size: 50 + class: "{workspace}/evaluate_reader.py" + test_data_path: "{workspace}/data/test" + +train: + trainer: + # for cluster training + strategy: "async" + + epochs: 2 + workspace: "fleetrec.models.recall.gnn" + + reader: + batch_size: 100 + class: "{workspace}/reader.py" + train_data_path: "{workspace}/data/train" + dataset_class: "DataLoader" + + model: + models: "{workspace}/model.py" + hyper_parameters: + use_DataLoader: True + config_path: "{workspace}/data/config.txt" + sparse_feature_dim: 100 + gnn_propogation_steps: 1 + learning_rate: 0.001 + l2: 0.00001 + decay_steps: 3 + decay_rate: 0.1 + optimizer: adam + + save: + increment: + dirname: "increment" + epoch_interval: 1 + save_last: True + inference: + dirname: "inference" + epoch_interval: 1 + save_last: True diff --git a/models/recall/gnn/data/config.txt b/models/recall/gnn/data/config.txt new file mode 100644 index 0000000000000000000000000000000000000000..18b31374f85324a9bf70b8b22dfe8f39092bc1c9 --- /dev/null +++ b/models/recall/gnn/data/config.txt @@ -0,0 +1,2 @@ +43098 +719470 diff --git a/models/recall/gnn/data/test/test.txt b/models/recall/gnn/data/test/test.txt new file mode 100644 index 0000000000000000000000000000000000000000..07fc5c2aff20c93b90df4b2841d94d296a74396c --- /dev/null +++ b/models/recall/gnn/data/test/test.txt @@ -0,0 +1,200 @@ +1 1 +2,3,4 4 +2,3 4 +2 3 +5 6 +7 8 +9,10 11 +9 10 +12,13,14,14,15,16 17 +12,13,14,14,15 16 +12,13,14,14 15 +12,13,14 14 +12,13 14 +12 13 +18,18 19 +18 18 +20 21 +5 22 +23,24 25 +23 24 +26,11,10,27,28,29 30 +26,11,10,27,28 29 +26,11,10,27 28 +26,11,10 27 +26,11 10 +26 11 +31,32 33 +31 32 +34 35 +36,37,38,39,40,37 41 +36,37,38,39,40 37 +36,37,38,39 40 +36,37,38 39 +36,37 38 +36 37 +42 43 +44 45 +46 47 +48,49,48,50,51,22 51 +48,49,48,50,51 22 +48,49,48,50 51 +48,49,48 50 +48,49 48 +48 49 +52,52 53 +52 52 +54,55 56 +54 55 +57 57 +58,59 58 +58 59 +60,61,61,62,63,62,64,65,62,66,67,68 68 +60,61,61,62,63,62,64,65,62,66,67 68 +60,61,61,62,63,62,64,65,62,66 67 +60,61,61,62,63,62,64,65,62 66 +60,61,61,62,63,62,64,65 62 +60,61,61,62,63,62,64 65 +60,61,61,62,63,62 64 +60,61,61,62,63 62 +60,61,61,62 63 +60,61,61 62 +60,61 61 +60 61 +69,70 71 +69 70 +72,73,74,75,76 77 +72,73,74,75 76 +72,73,74 75 +72,73 74 +72 73 +78 79 +80 81 +82 82 +83,83 83 +83 83 +84 85 +86 87 +48,22 48 +48 22 +88,89 90 +88 89 +91 92 +93 94 +82,83,95,96,97,98,99,100 101 +82,83,95,96,97,98,99 100 +82,83,95,96,97,98 99 +82,83,95,96,97 98 +82,83,95,96 97 +82,83,95 96 +82,83 95 +82 83 +8,102,103,13,13 103 +8,102,103,13 13 +8,102,103 13 +8,102 103 +8 102 +60,104,105,106,107,108,109,110 111 +60,104,105,106,107,108,109 110 +60,104,105,106,107,108 109 +60,104,105,106,107 108 +60,104,105,106 107 +60,104,105 106 +60,104 105 +60 104 +112,113 114 +112 113 +115 6 +116 117 +118 119 +18,120,121 18 +18,120 121 +18 120 +122 123 +124,125 126 +124 125 +127 128 +129,130,131,132,133,134,129,135,136 137 +129,130,131,132,133,134,129,135 136 +129,130,131,132,133,134,129 135 +129,130,131,132,133,134 129 +129,130,131,132,133 134 +129,130,131,132 133 +129,130,131 132 +129,130 131 +129 130 +138 139 +140,141,140,142,143,136,144,145,137,136,146,147,140,142,141 143 +140,141,140,142,143,136,144,145,137,136,146,147,140,142 141 +140,141,140,142,143,136,144,145,137,136,146,147,140 142 +140,141,140,142,143,136,144,145,137,136,146,147 140 +140,141,140,142,143,136,144,145,137,136,146 147 +140,141,140,142,143,136,144,145,137,136 146 +140,141,140,142,143,136,144,145,137 136 +140,141,140,142,143,136,144,145 137 +140,141,140,142,143,136,144 145 +140,141,140,142,143,136 144 +140,141,140,142,143 136 +140,141,140,142 143 +140,141,140 142 +140,141 140 +140 141 +18,148 149 +18 148 +5,150,6,48 151 +5,150,6 48 +5,150 6 +5 150 +152,153 152 +152 153 +11,154 154 +11 154 +155,156,157 158 +155,156 157 +155 156 +159,160 161 +159 160 +162,9,163,164 165 +162,9,163 164 +162,9 163 +162 9 +166,133,166,166,133,133,167,168,169,170,171,172 173 +166,133,166,166,133,133,167,168,169,170,171 172 +166,133,166,166,133,133,167,168,169,170 171 +166,133,166,166,133,133,167,168,169 170 +166,133,166,166,133,133,167,168 169 +166,133,166,166,133,133,167 168 +166,133,166,166,133,133 167 +166,133,166,166,133 133 +166,133,166,166 133 +166,133,166 166 +166,133 166 +166 133 +174 175 +176 177 +178,179 180 +178 179 +181,182,183 181 +181,182 183 +181 182 +184,80,185 186 +184,80 185 +184 80 +187,188,160,189,190,191,191,192 193 +187,188,160,189,190,191,191 192 +187,188,160,189,190,191 191 +187,188,160,189,190 191 +187,188,160,189 190 +187,188,160 189 +187,188 160 +187 188 +194 195 +196 197 +198,199,200,201,202 203 +198,199,200,201 202 +198,199,200 201 +198,199 200 +198 199 +204,132,205,129,206 207 +204,132,205,129 206 +204,132,205 129 diff --git a/models/recall/gnn/data/train/train.txt b/models/recall/gnn/data/train/train.txt new file mode 100644 index 0000000000000000000000000000000000000000..07fc5c2aff20c93b90df4b2841d94d296a74396c --- /dev/null +++ b/models/recall/gnn/data/train/train.txt @@ -0,0 +1,200 @@ +1 1 +2,3,4 4 +2,3 4 +2 3 +5 6 +7 8 +9,10 11 +9 10 +12,13,14,14,15,16 17 +12,13,14,14,15 16 +12,13,14,14 15 +12,13,14 14 +12,13 14 +12 13 +18,18 19 +18 18 +20 21 +5 22 +23,24 25 +23 24 +26,11,10,27,28,29 30 +26,11,10,27,28 29 +26,11,10,27 28 +26,11,10 27 +26,11 10 +26 11 +31,32 33 +31 32 +34 35 +36,37,38,39,40,37 41 +36,37,38,39,40 37 +36,37,38,39 40 +36,37,38 39 +36,37 38 +36 37 +42 43 +44 45 +46 47 +48,49,48,50,51,22 51 +48,49,48,50,51 22 +48,49,48,50 51 +48,49,48 50 +48,49 48 +48 49 +52,52 53 +52 52 +54,55 56 +54 55 +57 57 +58,59 58 +58 59 +60,61,61,62,63,62,64,65,62,66,67,68 68 +60,61,61,62,63,62,64,65,62,66,67 68 +60,61,61,62,63,62,64,65,62,66 67 +60,61,61,62,63,62,64,65,62 66 +60,61,61,62,63,62,64,65 62 +60,61,61,62,63,62,64 65 +60,61,61,62,63,62 64 +60,61,61,62,63 62 +60,61,61,62 63 +60,61,61 62 +60,61 61 +60 61 +69,70 71 +69 70 +72,73,74,75,76 77 +72,73,74,75 76 +72,73,74 75 +72,73 74 +72 73 +78 79 +80 81 +82 82 +83,83 83 +83 83 +84 85 +86 87 +48,22 48 +48 22 +88,89 90 +88 89 +91 92 +93 94 +82,83,95,96,97,98,99,100 101 +82,83,95,96,97,98,99 100 +82,83,95,96,97,98 99 +82,83,95,96,97 98 +82,83,95,96 97 +82,83,95 96 +82,83 95 +82 83 +8,102,103,13,13 103 +8,102,103,13 13 +8,102,103 13 +8,102 103 +8 102 +60,104,105,106,107,108,109,110 111 +60,104,105,106,107,108,109 110 +60,104,105,106,107,108 109 +60,104,105,106,107 108 +60,104,105,106 107 +60,104,105 106 +60,104 105 +60 104 +112,113 114 +112 113 +115 6 +116 117 +118 119 +18,120,121 18 +18,120 121 +18 120 +122 123 +124,125 126 +124 125 +127 128 +129,130,131,132,133,134,129,135,136 137 +129,130,131,132,133,134,129,135 136 +129,130,131,132,133,134,129 135 +129,130,131,132,133,134 129 +129,130,131,132,133 134 +129,130,131,132 133 +129,130,131 132 +129,130 131 +129 130 +138 139 +140,141,140,142,143,136,144,145,137,136,146,147,140,142,141 143 +140,141,140,142,143,136,144,145,137,136,146,147,140,142 141 +140,141,140,142,143,136,144,145,137,136,146,147,140 142 +140,141,140,142,143,136,144,145,137,136,146,147 140 +140,141,140,142,143,136,144,145,137,136,146 147 +140,141,140,142,143,136,144,145,137,136 146 +140,141,140,142,143,136,144,145,137 136 +140,141,140,142,143,136,144,145 137 +140,141,140,142,143,136,144 145 +140,141,140,142,143,136 144 +140,141,140,142,143 136 +140,141,140,142 143 +140,141,140 142 +140,141 140 +140 141 +18,148 149 +18 148 +5,150,6,48 151 +5,150,6 48 +5,150 6 +5 150 +152,153 152 +152 153 +11,154 154 +11 154 +155,156,157 158 +155,156 157 +155 156 +159,160 161 +159 160 +162,9,163,164 165 +162,9,163 164 +162,9 163 +162 9 +166,133,166,166,133,133,167,168,169,170,171,172 173 +166,133,166,166,133,133,167,168,169,170,171 172 +166,133,166,166,133,133,167,168,169,170 171 +166,133,166,166,133,133,167,168,169 170 +166,133,166,166,133,133,167,168 169 +166,133,166,166,133,133,167 168 +166,133,166,166,133,133 167 +166,133,166,166,133 133 +166,133,166,166 133 +166,133,166 166 +166,133 166 +166 133 +174 175 +176 177 +178,179 180 +178 179 +181,182,183 181 +181,182 183 +181 182 +184,80,185 186 +184,80 185 +184 80 +187,188,160,189,190,191,191,192 193 +187,188,160,189,190,191,191 192 +187,188,160,189,190,191 191 +187,188,160,189,190 191 +187,188,160,189 190 +187,188,160 189 +187,188 160 +187 188 +194 195 +196 197 +198,199,200,201,202 203 +198,199,200,201 202 +198,199,200 201 +198,199 200 +198 199 +204,132,205,129,206 207 +204,132,205,129 206 +204,132,205 129 diff --git a/models/recall/gnn/data_process.sh b/models/recall/gnn/data_process.sh new file mode 100644 index 0000000000000000000000000000000000000000..9aa009f03cf2595ea0cb54e691800486f26a21bf --- /dev/null +++ b/models/recall/gnn/data_process.sh @@ -0,0 +1,21 @@ +#! /bin/bash + +set -e +echo "begin to download data" + +cd raw_data && python download.py +mkdir diginetica +python preprocess.py --dataset diginetica + +echo "begin to convert data (binary -> txt)" +python convert_data.py --data_dir diginetica + +cat diginetica/train.txt | wc -l >> diginetica/config.txt + +mkdir train_data +mv diginetica/train.txt train_data + +mkdir test_data +mv diginetica/test.txt test_data + + diff --git a/models/recall/gnn/evaluate_reader.py b/models/recall/gnn/evaluate_reader.py new file mode 100755 index 0000000000000000000000000000000000000000..113433986b79530aad1c8da8aae84e5d7ad3d60d --- /dev/null +++ b/models/recall/gnn/evaluate_reader.py @@ -0,0 +1,135 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import numpy as np +import io +import copy +import random +from fleetrec.core.reader import Reader +from fleetrec.core.utils import envs + + +class EvaluateReader(Reader): + def init(self): + self.batch_size = envs.get_global_env("batch_size", None, "evaluate.reader") + + self.input = [] + self.length = None + + def base_read(self, files): + res = [] + for f in files: + with open(f, "r") as fin: + for line in fin: + line = line.strip().split('\t') + res.append(tuple([map(int, line[0].split(',')), int(line[1])])) + return res + + def make_data(self, cur_batch, batch_size): + cur_batch = [list(e) for e in cur_batch] + max_seq_len = 0 + for e in cur_batch: + max_seq_len = max(max_seq_len, len(e[0])) + last_id = [] + for e in cur_batch: + last_id.append(len(e[0]) - 1) + e[0] += [0] * (max_seq_len - len(e[0])) + + max_uniq_len = 0 + for e in cur_batch: + max_uniq_len = max(max_uniq_len, len(np.unique(e[0]))) + + items, adj_in, adj_out, seq_index, last_index = [], [], [], [], [] + mask, label = [], [] + + id = 0 + for e in cur_batch: + node = np.unique(e[0]) + items.append(node.tolist() + (max_uniq_len - len(node)) * [0]) + adj = np.zeros((max_uniq_len, max_uniq_len)) + + for i in np.arange(len(e[0]) - 1): + if e[0][i + 1] == 0: + break + u = np.where(node == e[0][i])[0][0] + v = np.where(node == e[0][i + 1])[0][0] + adj[u][v] = 1 + + u_deg_in = np.sum(adj, 0) + u_deg_in[np.where(u_deg_in == 0)] = 1 + adj_in.append(np.divide(adj, u_deg_in).transpose()) + + u_deg_out = np.sum(adj, 1) + u_deg_out[np.where(u_deg_out == 0)] = 1 + adj_out.append(np.divide(adj.transpose(), u_deg_out).transpose()) + + seq_index.append( + [[id, np.where(node == i)[0][0]] for i in e[0]]) + last_index.append( + [id, np.where(node == e[0][last_id[id]])[0][0]]) + label.append(e[1] - 1) + mask.append([[1] * (last_id[id] + 1) + [0] * + (max_seq_len - last_id[id] - 1)]) + id += 1 + + items = np.array(items).astype("int64").reshape((batch_size, -1)) + seq_index = np.array(seq_index).astype("int32").reshape( + (batch_size, -1, 2)) + last_index = np.array(last_index).astype("int32").reshape( + (batch_size, 2)) + adj_in = np.array(adj_in).astype("float32").reshape( + (batch_size, max_uniq_len, max_uniq_len)) + adj_out = np.array(adj_out).astype("float32").reshape( + (batch_size, max_uniq_len, max_uniq_len)) + mask = np.array(mask).astype("float32").reshape((batch_size, -1, 1)) + label = np.array(label).astype("int64").reshape((batch_size, 1)) + return zip(items, seq_index, last_index, adj_in, adj_out, mask, label) + + def batch_reader(self, batch_size, batch_group_size, train=True): + def _reader(): + random.shuffle(self.input) + group_remain = self.length % batch_group_size + for bg_id in range(0, self.length - group_remain, batch_group_size): + cur_bg = copy.deepcopy(self.input[bg_id:bg_id + batch_group_size]) + if train: + cur_bg = sorted(cur_bg, key=lambda x: len(x[0]), reverse=True) + for i in range(0, batch_group_size, batch_size): + cur_batch = cur_bg[i:i + batch_size] + yield self.make_data(cur_batch, batch_size) + + if group_remain == 0: + return + remain_data = copy.deepcopy(self.input[-group_remain:]) + if train: + remain_data = sorted( + remain_data, key=lambda x: len(x[0]), reverse=True) + for i in range(0, group_remain, batch_size): + if i + batch_size <= group_remain: + cur_batch = remain_data[i:i + batch_size] + yield self.make_data(cur_batch, batch_size) + else: + # Due to fixed batch_size, discard the remaining ins + return + #cur_batch = remain_data[i:] + #yield self.make_data(cur_batch, group_remain % batch_size) + return _reader + + def generate_batch_from_trainfiles(self, files): + self.input = self.base_read(files) + self.length = len(self.input) + return self.batch_reader(self.batch_size, self.batch_size * 20, False) + + def generate_sample(self, line): + def data_iter(): + yield [] + return data_iter diff --git a/models/recall/gnn/model.py b/models/recall/gnn/model.py new file mode 100644 index 0000000000000000000000000000000000000000..e63945e8a424257e742c911e2ae2444d234a0805 --- /dev/null +++ b/models/recall/gnn/model.py @@ -0,0 +1,274 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import math +import paddle.fluid as fluid +import paddle.fluid.layers as layers + +from fleetrec.core.utils import envs +from fleetrec.core.model import Model as ModelBase + + +class Model(ModelBase): + def __init__(self, config): + ModelBase.__init__(self, config) + self.init_config() + + def init_config(self): + self._fetch_interval = 1 + self.items_num, self.ins_num = self.config_read(envs.get_global_env("hyper_parameters.config_path", None, self._namespace)) + self.train_batch_size = envs.get_global_env("batch_size", None, "train.reader") + self.evaluate_batch_size = envs.get_global_env("batch_size", None, "evaluate.reader") + self.hidden_size = envs.get_global_env("hyper_parameters.sparse_feature_dim", None, self._namespace) + self.step = envs.get_global_env("hyper_parameters.gnn_propogation_steps", None, self._namespace) + + + def config_read(self, config_path=None): + if config_path is None: + raise ValueError("please set train.model.hyper_parameters.config_path at first") + with open(config_path, "r") as fin: + item_nums = int(fin.readline().strip()) + ins_nums = int(fin.readline().strip()) + return item_nums, ins_nums + + def input(self, bs): + self.items = fluid.data( + name="items", + shape=[bs, -1], + dtype="int64") #[batch_size, uniq_max] + self.seq_index = fluid.data( + name="seq_index", + shape=[bs, -1, 2], + dtype="int32") #[batch_size, seq_max, 2] + self.last_index = fluid.data( + name="last_index", + shape=[bs, 2], + dtype="int32") #[batch_size, 2] + self.adj_in = fluid.data( + name="adj_in", + shape=[bs, -1, -1], + dtype="float32") #[batch_size, seq_max, seq_max] + self.adj_out = fluid.data( + name="adj_out", + shape=[bs, -1, -1], + dtype="float32") #[batch_size, seq_max, seq_max] + self.mask = fluid.data( + name="mask", + shape=[bs, -1, 1], + dtype="float32") #[batch_size, seq_max, 1] + self.label = fluid.data( + name="label", + shape=[bs, 1], + dtype="int64") #[batch_size, 1] + + res = [self.items, self.seq_index, self.last_index, self.adj_in, self.adj_out, self.mask, self.label] + return res + + def train_input(self): + res = self.input(self.train_batch_size) + self._data_var = res + + use_dataloader = envs.get_global_env("hyper_parameters.use_DataLoader", False, self._namespace) + + if self._platform != "LINUX" or use_dataloader: + self._data_loader = fluid.io.DataLoader.from_generator( + feed_list=self._data_var, capacity=256, use_double_buffer=False, iterable=False) + + def net(self, items_num, hidden_size, step, bs): + stdv = 1.0 / math.sqrt(hidden_size) + + def embedding_layer(input, table_name, emb_dim, initializer_instance=None): + emb = fluid.embedding( + input=input, + size=[items_num, emb_dim], + param_attr=fluid.ParamAttr( + name=table_name, + initializer=initializer_instance), + ) + return emb + + sparse_initializer = fluid.initializer.Uniform(low=-stdv, high=stdv) + items_emb = embedding_layer(self.items, "emb", hidden_size, sparse_initializer) + pre_state = items_emb + for i in range(step): + pre_state = layers.reshape(x=pre_state, shape=[bs, -1, hidden_size]) + state_in = layers.fc( + input=pre_state, + name="state_in", + size=hidden_size, + act=None, + num_flatten_dims=2, + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv)), + bias_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv))) #[batch_size, uniq_max, h] + state_out = layers.fc( + input=pre_state, + name="state_out", + size=hidden_size, + act=None, + num_flatten_dims=2, + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv)), + bias_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv))) #[batch_size, uniq_max, h] + + state_adj_in = layers.matmul(self.adj_in, state_in) #[batch_size, uniq_max, h] + state_adj_out = layers.matmul(self.adj_out, state_out) #[batch_size, uniq_max, h] + + gru_input = layers.concat([state_adj_in, state_adj_out], axis=2) + + gru_input = layers.reshape(x=gru_input, shape=[-1, hidden_size * 2]) + gru_fc = layers.fc( + input=gru_input, + name="gru_fc", + size=3 * hidden_size, + bias_attr=False) + pre_state, _, _ = fluid.layers.gru_unit( + input=gru_fc, + hidden=layers.reshape(x=pre_state, shape=[-1, hidden_size]), + size=3 * hidden_size) + + final_state = layers.reshape(pre_state, shape=[bs, -1, hidden_size]) + seq = layers.gather_nd(final_state, self.seq_index) + last = layers.gather_nd(final_state, self.last_index) + + seq_fc = layers.fc( + input=seq, + name="seq_fc", + size=hidden_size, + bias_attr=False, + act=None, + num_flatten_dims=2, + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv))) #[batch_size, seq_max, h] + last_fc = layers.fc( + input=last, + name="last_fc", + size=hidden_size, + bias_attr=False, + act=None, + num_flatten_dims=1, + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv))) #[bathc_size, h] + + seq_fc_t = layers.transpose( + seq_fc, perm=[1, 0, 2]) #[seq_max, batch_size, h] + add = layers.elementwise_add( + seq_fc_t, last_fc) #[seq_max, batch_size, h] + b = layers.create_parameter( + shape=[hidden_size], + dtype='float32', + default_initializer=fluid.initializer.Constant(value=0.0)) #[h] + add = layers.elementwise_add(add, b) #[seq_max, batch_size, h] + + add_sigmoid = layers.sigmoid(add) #[seq_max, batch_size, h] + add_sigmoid = layers.transpose( + add_sigmoid, perm=[1, 0, 2]) #[batch_size, seq_max, h] + + weight = layers.fc( + input=add_sigmoid, + name="weight_fc", + size=1, + act=None, + num_flatten_dims=2, + bias_attr=False, + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv))) #[batch_size, seq_max, 1] + weight *= self.mask + weight_mask = layers.elementwise_mul(seq, weight, axis=0) #[batch_size, seq_max, h] + global_attention = layers.reduce_sum(weight_mask, dim=1) #[batch_size, h] + + final_attention = layers.concat( + [global_attention, last], axis=1) #[batch_size, 2*h] + final_attention_fc = layers.fc( + input=final_attention, + name="final_attention_fc", + size=hidden_size, + bias_attr=False, + act=None, + param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv))) #[batch_size, h] + + # all_vocab = layers.create_global_var( + # shape=[items_num - 1], + # value=0, + # dtype="int64", + # persistable=True, + # name="all_vocab") + all_vocab = np.arange(1, items_num).reshape((-1)).astype('int32') + all_vocab = fluid.layers.cast(x=fluid.layers.assign(all_vocab), dtype='int64') + + all_emb = fluid.embedding( + input=all_vocab, + param_attr=fluid.ParamAttr( + name="emb", + initializer=fluid.initializer.Uniform( + low=-stdv, high=stdv)), + size=[items_num, hidden_size]) #[all_vocab, h] + + logits = layers.matmul( + x=final_attention_fc, y=all_emb, + transpose_y=True) #[batch_size, all_vocab] + softmax = layers.softmax_with_cross_entropy( + logits=logits, label=self.label) #[batch_size, 1] + self.loss = layers.reduce_mean(softmax) # [1] + self.acc = layers.accuracy(input=logits, label=self.label, k=20) + + def avg_loss(self): + self._cost = self.loss + + def metrics(self): + self._metrics["LOSS"] = self.loss + self._metrics["train_acc"] = self.acc + + def train_net(self): + self.train_input() + self.net(self.items_num, self.hidden_size, self.step, self.train_batch_size) + self.avg_loss() + self.metrics() + + def optimizer(self): + learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None, self._namespace) + step_per_epoch = self.ins_num // self.train_batch_size + decay_steps = envs.get_global_env("hyper_parameters.decay_steps", None, self._namespace) + decay_rate = envs.get_global_env("hyper_parameters.decay_rate", None, self._namespace) + l2 = envs.get_global_env("hyper_parameters.l2", None, self._namespace) + optimizer = fluid.optimizer.Adam( + learning_rate=fluid.layers.exponential_decay( + learning_rate=learning_rate, + decay_steps=decay_steps * step_per_epoch, + decay_rate=decay_rate), + regularization=fluid.regularizer.L2DecayRegularizer( + regularization_coeff=l2)) + + return optimizer + + def infer_input(self): + self._reader_namespace = "evaluate.reader" + res = self.input(self.evaluate_batch_size) + self._infer_data_var = res + + self._infer_data_loader = fluid.io.DataLoader.from_generator( + feed_list=self._infer_data_var, capacity=64, use_double_buffer=False, iterable=False) + + def infer_net(self): + self.infer_input() + self.net(self.items_num, self.hidden_size, self.step, self.evaluate_batch_size) + self._infer_results['acc'] = self.acc + self._infer_results['loss'] = self.loss diff --git a/models/recall/gnn/raw_data/convert_data.py b/models/recall/gnn/raw_data/convert_data.py new file mode 100755 index 0000000000000000000000000000000000000000..2e0e57f1f781f7210c46ef265e1189e99a6f7a96 --- /dev/null +++ b/models/recall/gnn/raw_data/convert_data.py @@ -0,0 +1,29 @@ +import argparse +import time +import pickle +import os + +parser = argparse.ArgumentParser() +parser.add_argument( + '--data_dir', + default='sample', + help='dataset dir: diginetica/yoochoose1_4/yoochoose1_64/sample') +opt = parser.parse_args() + +def process_data(file_type): + path = os.path.join(opt.data_dir, file_type) + output_path = os.path.splitext(path)[0] + ".txt" + data = pickle.load(open(path, 'rb')) + data = list(zip(data[0], data[1])) + length = len(data) + with open(output_path, 'w') as fout: + for i in range(length): + fout.write(','.join(map(str, data[i][0]))) + fout.write('\t') + fout.write(str(data[i][1])) + fout.write("\n") + +process_data("train") +process_data("test") + +print('Done.') diff --git a/models/recall/gnn/raw_data/download.py b/models/recall/gnn/raw_data/download.py new file mode 100644 index 0000000000000000000000000000000000000000..69a1ee20b2d634e9eca47c621dce82ac2d98b5f2 --- /dev/null +++ b/models/recall/gnn/raw_data/download.py @@ -0,0 +1,47 @@ +import requests +import sys +import time +import os + +lasttime = time.time() +FLUSH_INTERVAL = 0.1 + + +def progress(str, end=False): + global lasttime + if end: + str += "\n" + lasttime = 0 + if time.time() - lasttime >= FLUSH_INTERVAL: + sys.stdout.write("\r%s" % str) + lasttime = time.time() + sys.stdout.flush() + + +def _download_file(url, savepath, print_progress): + r = requests.get(url, stream=True) + total_length = r.headers.get('content-length') + + if total_length is None: + with open(savepath, 'wb') as f: + shutil.copyfileobj(r.raw, f) + else: + with open(savepath, 'wb') as f: + dl = 0 + total_length = int(total_length) + starttime = time.time() + if print_progress: + print("Downloading %s" % os.path.basename(savepath)) + for data in r.iter_content(chunk_size=4096): + dl += len(data) + f.write(data) + if print_progress: + done = int(50 * dl / total_length) + progress("[%-50s] %.2f%%" % + ('=' * done, float(100 * dl) / total_length)) + if print_progress: + progress("[%-50s] %.2f%%" % ('=' * 50, 100), end=True) + + +_download_file("https://sr-gnn.bj.bcebos.com/train-item-views.csv", + "./train-item-views.csv", True) diff --git a/models/recall/gnn/raw_data/preprocess.py b/models/recall/gnn/raw_data/preprocess.py new file mode 100755 index 0000000000000000000000000000000000000000..3e7f710b221d708183c2f85d2743162c44b863da --- /dev/null +++ b/models/recall/gnn/raw_data/preprocess.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python36 +# -*- coding: utf-8 -*- +""" +Created on July, 2018 + +@author: Tangrizzly +""" + +import argparse +import time +import csv +import pickle +import operator +import datetime +import os + +parser = argparse.ArgumentParser() +parser.add_argument( + '--dataset', + default='sample', + help='dataset name: diginetica/yoochoose/sample') +opt = parser.parse_args() +print(opt) + +dataset = 'sample_train-item-views.csv' +if opt.dataset == 'diginetica': + dataset = 'train-item-views.csv' +elif opt.dataset == 'yoochoose': + dataset = 'yoochoose-clicks.dat' + +print("-- Starting @ %ss" % datetime.datetime.now()) +with open(dataset, "r") as f: + if opt.dataset == 'yoochoose': + reader = csv.DictReader(f, delimiter=',') + else: + reader = csv.DictReader(f, delimiter=';') + sess_clicks = {} + sess_date = {} + ctr = 0 + curid = -1 + curdate = None + for data in reader: + sessid = data['session_id'] + if curdate and not curid == sessid: + date = '' + if opt.dataset == 'yoochoose': + date = time.mktime( + time.strptime(curdate[:19], '%Y-%m-%dT%H:%M:%S')) + else: + date = time.mktime(time.strptime(curdate, '%Y-%m-%d')) + sess_date[curid] = date + curid = sessid + if opt.dataset == 'yoochoose': + item = data['item_id'] + else: + item = data['item_id'], int(data['timeframe']) + curdate = '' + if opt.dataset == 'yoochoose': + curdate = data['timestamp'] + else: + curdate = data['eventdate'] + + if sessid in sess_clicks: + sess_clicks[sessid] += [item] + else: + sess_clicks[sessid] = [item] + ctr += 1 + date = '' + if opt.dataset == 'yoochoose': + date = time.mktime(time.strptime(curdate[:19], '%Y-%m-%dT%H:%M:%S')) + else: + date = time.mktime(time.strptime(curdate, '%Y-%m-%d')) + for i in list(sess_clicks): + sorted_clicks = sorted(sess_clicks[i], key=operator.itemgetter(1)) + sess_clicks[i] = [c[0] for c in sorted_clicks] + sess_date[curid] = date +print("-- Reading data @ %ss" % datetime.datetime.now()) + +# Filter out length 1 sessions +for s in list(sess_clicks): + if len(sess_clicks[s]) == 1: + del sess_clicks[s] + del sess_date[s] + +# Count number of times each item appears +iid_counts = {} +for s in sess_clicks: + seq = sess_clicks[s] + for iid in seq: + if iid in iid_counts: + iid_counts[iid] += 1 + else: + iid_counts[iid] = 1 + +sorted_counts = sorted(iid_counts.items(), key=operator.itemgetter(1)) + +length = len(sess_clicks) +for s in list(sess_clicks): + curseq = sess_clicks[s] + filseq = list(filter(lambda i: iid_counts[i] >= 5, curseq)) + if len(filseq) < 2: + del sess_clicks[s] + del sess_date[s] + else: + sess_clicks[s] = filseq + +# Split out test set based on dates +dates = list(sess_date.items()) +maxdate = dates[0][1] + +for _, date in dates: + if maxdate < date: + maxdate = date + +# 7 days for test +splitdate = 0 +if opt.dataset == 'yoochoose': + splitdate = maxdate - 86400 * 1 # the number of seconds for a day:86400 +else: + splitdate = maxdate - 86400 * 7 + +print('Splitting date', splitdate) # Yoochoose: ('Split date', 1411930799.0) +tra_sess = filter(lambda x: x[1] < splitdate, dates) +tes_sess = filter(lambda x: x[1] > splitdate, dates) + +# Sort sessions by date +tra_sess = sorted( + tra_sess, key=operator.itemgetter(1)) # [(session_id, timestamp), (), ] +tes_sess = sorted( + tes_sess, key=operator.itemgetter(1)) # [(session_id, timestamp), (), ] +print(len(tra_sess)) # 186670 # 7966257 +print(len(tes_sess)) # 15979 # 15324 +print(tra_sess[:3]) +print(tes_sess[:3]) +print("-- Splitting train set and test set @ %ss" % datetime.datetime.now()) + +# Choosing item count >=5 gives approximately the same number of items as reported in paper +item_dict = {} + + +# Convert training sessions to sequences and renumber items to start from 1 +def obtian_tra(): + train_ids = [] + train_seqs = [] + train_dates = [] + item_ctr = 1 + for s, date in tra_sess: + seq = sess_clicks[s] + outseq = [] + for i in seq: + if i in item_dict: + outseq += [item_dict[i]] + else: + outseq += [item_ctr] + item_dict[i] = item_ctr + item_ctr += 1 + if len(outseq) < 2: # Doesn't occur + continue + train_ids += [s] + train_dates += [date] + train_seqs += [outseq] + print(item_ctr) # 43098, 37484 + with open("./diginetica/config.txt", "w") as fout: + fout.write(str(item_ctr) + "\n") + return train_ids, train_dates, train_seqs + + +# Convert test sessions to sequences, ignoring items that do not appear in training set +def obtian_tes(): + test_ids = [] + test_seqs = [] + test_dates = [] + for s, date in tes_sess: + seq = sess_clicks[s] + outseq = [] + for i in seq: + if i in item_dict: + outseq += [item_dict[i]] + if len(outseq) < 2: + continue + test_ids += [s] + test_dates += [date] + test_seqs += [outseq] + return test_ids, test_dates, test_seqs + + +tra_ids, tra_dates, tra_seqs = obtian_tra() +tes_ids, tes_dates, tes_seqs = obtian_tes() + + +def process_seqs(iseqs, idates): + out_seqs = [] + out_dates = [] + labs = [] + ids = [] + for id, seq, date in zip(range(len(iseqs)), iseqs, idates): + for i in range(1, len(seq)): + tar = seq[-i] + labs += [tar] + out_seqs += [seq[:-i]] + out_dates += [date] + ids += [id] + return out_seqs, out_dates, labs, ids + + +tr_seqs, tr_dates, tr_labs, tr_ids = process_seqs(tra_seqs, tra_dates) +te_seqs, te_dates, te_labs, te_ids = process_seqs(tes_seqs, tes_dates) +tra = (tr_seqs, tr_labs) +tes = (te_seqs, te_labs) +print(len(tr_seqs)) +print(len(te_seqs)) +print(tr_seqs[:3], tr_dates[:3], tr_labs[:3]) +print(te_seqs[:3], te_dates[:3], te_labs[:3]) +all = 0 + +for seq in tra_seqs: + all += len(seq) +for seq in tes_seqs: + all += len(seq) +print('avg length: ', all / (len(tra_seqs) + len(tes_seqs) * 1.0)) +if opt.dataset == 'diginetica': + if not os.path.exists('diginetica'): + os.makedirs('diginetica') + pickle.dump(tra, open('diginetica/train', 'wb')) + pickle.dump(tes, open('diginetica/test', 'wb')) + pickle.dump(tra_seqs, open('diginetica/all_train_seq', 'wb')) +elif opt.dataset == 'yoochoose': + if not os.path.exists('yoochoose1_4'): + os.makedirs('yoochoose1_4') + if not os.path.exists('yoochoose1_64'): + os.makedirs('yoochoose1_64') + pickle.dump(tes, open('yoochoose1_4/test', 'wb')) + pickle.dump(tes, open('yoochoose1_64/test', 'wb')) + + split4, split64 = int(len(tr_seqs) / 4), int(len(tr_seqs) / 64) + print(len(tr_seqs[-split4:])) + print(len(tr_seqs[-split64:])) + + tra4, tra64 = (tr_seqs[-split4:], tr_labs[-split4:]), (tr_seqs[-split64:], + tr_labs[-split64:]) + seq4, seq64 = tra_seqs[tr_ids[-split4]:], tra_seqs[tr_ids[-split64]:] + + pickle.dump(tra4, open('yoochoose1_4/train', 'wb')) + pickle.dump(seq4, open('yoochoose1_4/all_train_seq', 'wb')) + + pickle.dump(tra64, open('yoochoose1_64/train', 'wb')) + pickle.dump(seq64, open('yoochoose1_64/all_train_seq', 'wb')) + +else: + if not os.path.exists('sample'): + os.makedirs('sample') + pickle.dump(tra, open('sample/train', 'wb')) + pickle.dump(tes, open('sample/test', 'wb')) + pickle.dump(tra_seqs, open('sample/all_train_seq', 'wb')) + +print('Done.') diff --git a/models/recall/gnn/reader.py b/models/recall/gnn/reader.py new file mode 100755 index 0000000000000000000000000000000000000000..89150d5d2faf241f132614ef825a409d600c7ec3 --- /dev/null +++ b/models/recall/gnn/reader.py @@ -0,0 +1,135 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import numpy as np +import io +import copy +import random +from fleetrec.core.reader import Reader +from fleetrec.core.utils import envs + + +class TrainReader(Reader): + def init(self): + self.batch_size = envs.get_global_env("batch_size", None, "train.reader") + + self.input = [] + self.length = None + + def base_read(self, files): + res = [] + for f in files: + with open(f, "r") as fin: + for line in fin: + line = line.strip().split('\t') + res.append(tuple([map(int, line[0].split(',')), int(line[1])])) + return res + + def make_data(self, cur_batch, batch_size): + cur_batch = [list(e) for e in cur_batch] + max_seq_len = 0 + for e in cur_batch: + max_seq_len = max(max_seq_len, len(e[0])) + last_id = [] + for e in cur_batch: + last_id.append(len(e[0]) - 1) + e[0] += [0] * (max_seq_len - len(e[0])) + + max_uniq_len = 0 + for e in cur_batch: + max_uniq_len = max(max_uniq_len, len(np.unique(e[0]))) + + items, adj_in, adj_out, seq_index, last_index = [], [], [], [], [] + mask, label = [], [] + + id = 0 + for e in cur_batch: + node = np.unique(e[0]) + items.append(node.tolist() + (max_uniq_len - len(node)) * [0]) + adj = np.zeros((max_uniq_len, max_uniq_len)) + + for i in np.arange(len(e[0]) - 1): + if e[0][i + 1] == 0: + break + u = np.where(node == e[0][i])[0][0] + v = np.where(node == e[0][i + 1])[0][0] + adj[u][v] = 1 + + u_deg_in = np.sum(adj, 0) + u_deg_in[np.where(u_deg_in == 0)] = 1 + adj_in.append(np.divide(adj, u_deg_in).transpose()) + + u_deg_out = np.sum(adj, 1) + u_deg_out[np.where(u_deg_out == 0)] = 1 + adj_out.append(np.divide(adj.transpose(), u_deg_out).transpose()) + + seq_index.append( + [[id, np.where(node == i)[0][0]] for i in e[0]]) + last_index.append( + [id, np.where(node == e[0][last_id[id]])[0][0]]) + label.append(e[1] - 1) + mask.append([[1] * (last_id[id] + 1) + [0] * + (max_seq_len - last_id[id] - 1)]) + id += 1 + + items = np.array(items).astype("int64").reshape((batch_size, -1)) + seq_index = np.array(seq_index).astype("int32").reshape( + (batch_size, -1, 2)) + last_index = np.array(last_index).astype("int32").reshape( + (batch_size, 2)) + adj_in = np.array(adj_in).astype("float32").reshape( + (batch_size, max_uniq_len, max_uniq_len)) + adj_out = np.array(adj_out).astype("float32").reshape( + (batch_size, max_uniq_len, max_uniq_len)) + mask = np.array(mask).astype("float32").reshape((batch_size, -1, 1)) + label = np.array(label).astype("int64").reshape((batch_size, 1)) + return zip(items, seq_index, last_index, adj_in, adj_out, mask, label) + + def batch_reader(self, batch_size, batch_group_size, train=True): + def _reader(): + random.shuffle(self.input) + group_remain = self.length % batch_group_size + for bg_id in range(0, self.length - group_remain, batch_group_size): + cur_bg = copy.deepcopy(self.input[bg_id:bg_id + batch_group_size]) + if train: + cur_bg = sorted(cur_bg, key=lambda x: len(x[0]), reverse=True) + for i in range(0, batch_group_size, batch_size): + cur_batch = cur_bg[i:i + batch_size] + yield self.make_data(cur_batch, batch_size) + + if group_remain == 0: + return + remain_data = copy.deepcopy(self.input[-group_remain:]) + if train: + remain_data = sorted( + remain_data, key=lambda x: len(x[0]), reverse=True) + for i in range(0, group_remain, batch_size): + if i + batch_size <= group_remain: + cur_batch = remain_data[i:i + batch_size] + yield self.make_data(cur_batch, batch_size) + else: + # Due to fixed batch_size, discard the remaining ins + return + #cur_batch = remain_data[i:] + #yield self.make_data(cur_batch, group_remain % batch_size) + return _reader + + def generate_batch_from_trainfiles(self, files): + self.input = self.base_read(files) + self.length = len(self.input) + return self.batch_reader(self.batch_size, self.batch_size * 20) + + def generate_sample(self, line): + def data_iter(): + yield [] + return data_iter diff --git a/setup.py b/setup.py index db9b26013fb978c8108047d0dc3a5b197b9e05c7..80e39b5533e050e2307a436865ce14e44d797773 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ def build(dirname): packages = find_packages(dirname, include=('fleetrec.*')) package_dir = {'': dirname} package_data = {} - need_copy = ['data/*/*.txt', '*.yaml', 'tree/*.npy','tree/*.txt'] + need_copy = ['data/*.txt', 'data/*/*.txt', '*.yaml', 'tree/*.npy','tree/*.txt'] for package in packages: if package.startswith("fleetrec.models."): package_data[package] = need_copy