提交 1f89249a 编写于 作者: X Xin Pan

update DeepCF model

test=develop
上级 0fff666f
...@@ -64,6 +64,7 @@ void GPUGather(const platform::DeviceContext& ctx, const Tensor& src, ...@@ -64,6 +64,7 @@ void GPUGather(const platform::DeviceContext& ctx, const Tensor& src,
for (int i = 1; i < src_dims.size(); ++i) slice_size *= src_dims[i]; for (int i = 1; i < src_dims.size(); ++i) slice_size *= src_dims[i];
const T* p_src = src.data<T>(); const T* p_src = src.data<T>();
// why must be int?
const int* p_index = index.data<int>(); const int* p_index = index.data<int>();
T* p_output = output->data<T>(); T* p_output = output->data<T>();
......
...@@ -744,7 +744,7 @@ class Operator(object): ...@@ -744,7 +744,7 @@ class Operator(object):
if _in_imperative_mode(): if _in_imperative_mode():
if type is None: if type is None:
raise ValueError( raise ValueError(
"`type` to initilized an Operator can not be None.") "`type` to initialized an Operator can not be None.")
self.iop = core.OpBase(type) self.iop = core.OpBase(type)
# TODO(minqiyang): remove these lines after we take apart all # TODO(minqiyang): remove these lines after we take apart all
......
...@@ -55,7 +55,8 @@ def to_variable(value, block=None): ...@@ -55,7 +55,8 @@ def to_variable(value, block=None):
type=core.VarDesc.VarType.LOD_TENSOR, type=core.VarDesc.VarType.LOD_TENSOR,
name=None, name=None,
shape=value.shape, shape=value.shape,
dtype=value.dtype) dtype=value.dtype,
stop_gradient=True)
var = py_var._ivar.value() var = py_var._ivar.value()
tensor = var.get_tensor() tensor = var.get_tensor()
tensor.set(value, framework._current_expected_place()) tensor.set(value, framework._current_expected_place())
......
...@@ -14,7 +14,9 @@ ...@@ -14,7 +14,9 @@
from __future__ import print_function from __future__ import print_function
import sys
import six import six
from six.moves import reduce
from collections import defaultdict from collections import defaultdict
from paddle.fluid import core from paddle.fluid import core
...@@ -49,7 +51,16 @@ class Tracer(core.Tracer): ...@@ -49,7 +51,16 @@ class Tracer(core.Tracer):
def trace_op(self, op, stop_gradient=False): def trace_op(self, op, stop_gradient=False):
# record op's trace id # record op's trace id
op.iop._trace_id = self._trace_id op.iop._trace_id = self._trace_id
"""
all_input_stop_grads = True
for vars in op.inputs.values():
for v in vars:
sys.stderr.write('%s %s\n' % (v.name, v.stop_gradient))
all_input_stop_grads &= v.stop_gradient
stop_gradient = False if not stop_gradient else True
stop_gradient = all_input_stop_grads | stop_gradient
"""
backward_refs = self.trace(op.iop, op.inputs, op.outputs, op.attrs, backward_refs = self.trace(op.iop, op.inputs, op.outputs, op.attrs,
framework._current_expected_place(), framework._current_expected_place(),
stop_gradient) stop_gradient)
......
...@@ -756,7 +756,7 @@ class NumpyArrayInitializer(Initializer): ...@@ -756,7 +756,7 @@ class NumpyArrayInitializer(Initializer):
values = [int(v) for v in self._value.flat] values = [int(v) for v in self._value.flat]
else: else:
raise ValueError("Unsupported dtype %s", self._value.dtype) raise ValueError("Unsupported dtype %s", self._value.dtype)
if self._value.size > 1024 * 1024 * 5: if self._value.size > 1024 * 1024 * 1024:
raise ValueError("The size of input is too big. Please consider " raise ValueError("The size of input is too big. Please consider "
"saving it to file and 'load_op' to load it") "saving it to file and 'load_op' to load it")
op = block._prepend_op( op = block._prepend_op(
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
import unittest import unittest
import numpy as np import numpy as np
import random import random
import os
import sys import sys
import paddle import paddle
...@@ -23,16 +24,15 @@ import paddle.fluid.core as core ...@@ -23,16 +24,15 @@ import paddle.fluid.core as core
from test_imperative_base import new_program_scope from test_imperative_base import new_program_scope
from paddle.fluid.imperative.base import to_variable from paddle.fluid.imperative.base import to_variable
NUM_USERS = 100 DATA_PATH = os.environ.get('DATA_PATH', '')
NUM_ITEMS = 1000 BATCH_SIZE = int(os.environ.get('BATCH_SIZE', 256))
NUM_BATCHES = int(os.environ.get('NUM_BATCHES', 2))
NUM_EPOCHES = int(os.environ.get('NUM_EPOCHES', 1))
BATCH_SIZE = 32
NUM_BATCHES = 2
class DMF(fluid.imperative.Layer):
class MLP(fluid.imperative.Layer):
def __init__(self, name_scope): def __init__(self, name_scope):
super(MLP, self).__init__(name_scope) super(DMF, self).__init__(name_scope)
self._user_latent = fluid.imperative.FC(self.full_name(), 256) self._user_latent = fluid.imperative.FC(self.full_name(), 256)
self._item_latent = fluid.imperative.FC(self.full_name(), 256) self._item_latent = fluid.imperative.FC(self.full_name(), 256)
...@@ -61,9 +61,9 @@ class MLP(fluid.imperative.Layer): ...@@ -61,9 +61,9 @@ class MLP(fluid.imperative.Layer):
return fluid.layers.elementwise_mul(users, items) return fluid.layers.elementwise_mul(users, items)
class DMF(fluid.imperative.Layer): class MLP(fluid.imperative.Layer):
def __init__(self, name_scope): def __init__(self, name_scope):
super(DMF, self).__init__(name_scope) super(MLP, self).__init__(name_scope)
self._user_latent = fluid.imperative.FC(self.full_name(), 256) self._user_latent = fluid.imperative.FC(self.full_name(), 256)
self._item_latent = fluid.imperative.FC(self.full_name(), 256) self._item_latent = fluid.imperative.FC(self.full_name(), 256)
self._match_layers = [] self._match_layers = []
...@@ -87,21 +87,36 @@ class DMF(fluid.imperative.Layer): ...@@ -87,21 +87,36 @@ class DMF(fluid.imperative.Layer):
class DeepCF(fluid.imperative.Layer): class DeepCF(fluid.imperative.Layer):
def __init__(self, name_scope): def __init__(self, name_scope, num_users, num_items, matrix):
super(DeepCF, self).__init__(name_scope) super(DeepCF, self).__init__(name_scope)
self._num_users = num_users
self._num_items = num_items
self._rating_matrix = self.create_parameter(
None,
matrix.shape,
matrix.dtype,
is_bias=False,
default_initializer=fluid.initializer.NumpyArrayInitializer(matrix))
self._rating_matrix._stop_gradient = True
self._user_emb = fluid.imperative.Embedding(self.full_name(), # self._user_emb = fluid.imperative.Embedding(self.full_name(),
[NUM_USERS, 256]) # [self._num_users, 256])
self._item_emb = fluid.imperative.Embedding(self.full_name(), # self._item_emb = fluid.imperative.Embedding(self.full_name(),
[NUM_ITEMS, 256]) # [self._num_items, 256])
self._mlp = MLP(self.full_name()) self._mlp = MLP(self.full_name())
self._dmf = DMF(self.full_name()) self._dmf = DMF(self.full_name())
self._match_fc = fluid.imperative.FC(self.full_name(), 1, act='sigmoid') self._match_fc = fluid.imperative.FC(self.full_name(), 1, act='sigmoid')
def forward(self, users, items): def forward(self, users, items):
users_emb = self._user_emb(users) # users_emb = self._user_emb(users)
items_emb = self._item_emb(items) # items_emb = self._item_emb(items)
sys.stderr.write('forward: %s\n' % users._stop_gradient)
users_emb = fluid.layers.gather(self._rating_matrix, users)
items_emb = fluid.layers.gather(
fluid.layers.transpose(self._rating_matrix, [1, 0]), items)
users_emb.stop_gradient = True
items_emb.stop_gradient = True
mlp_predictive = self._mlp(users_emb, items_emb) mlp_predictive = self._mlp(users_emb, items_emb)
dmf_predictive = self._dmf(users_emb, items_emb) dmf_predictive = self._dmf(users_emb, items_emb)
...@@ -116,40 +131,92 @@ def get_data(): ...@@ -116,40 +131,92 @@ def get_data():
user_ids = [] user_ids = []
item_ids = [] item_ids = []
labels = [] labels = []
matrix = np.zeros([100, 1000], dtype=np.float32)
NUM_USERS = 100
NUM_ITEMS = 1000
for uid in range(NUM_USERS): for uid in range(NUM_USERS):
for iid in range(NUM_ITEMS): for iid in range(NUM_ITEMS):
# 10% positive label = float(random.randint(1, 6) == 1)
label = float(random.randint(1, 10) == 1)
user_ids.append(uid) user_ids.append(uid)
item_ids.append(iid) item_ids.append(iid)
labels.append(label) labels.append(label)
indices = np.arange(NUM_USERS * NUM_ITEMS) matrix[uid, iid] = label
indices = np.arange(len(user_ids))
np.random.shuffle(indices)
users_np = np.array(user_ids, dtype=np.int32)[indices]
items_np = np.array(item_ids, dtype=np.int32)[indices]
labels_np = np.array(labels, dtype=np.float32)[indices]
return np.expand_dims(users_np, -1), \
np.expand_dims(items_np, -1), \
np.expand_dims(labels_np, -1), NUM_USERS, NUM_ITEMS, matrix
def load_data(DATA_PATH):
sys.stderr.write('loading from %s\n' % DATA_PATH)
likes = dict()
num_users = -1
num_items = -1
with open(DATA_PATH, 'r') as f:
for l in f.readlines():
uid, iid, rating = [int(v) for v in l.split('\t')]
num_users = max(num_users, uid + 1)
num_items = max(num_items, iid + 1)
if float(rating) > 0.0:
likes[(uid, iid)] = 1.0
user_ids = []
item_ids = []
labels = []
matrix = np.zeros([num_users, num_items], dtype=np.float32)
for uid, iid in likes.keys():
user_ids.append(uid)
item_ids.append(iid)
labels.append(1.0)
matrix[uid, iid] = 1.0
negative = 0
while negative < 3:
nuid = random.randint(0, num_users - 1)
niid = random.randint(0, num_items - 1)
if (nuid, niid) not in likes:
negative += 1
user_ids.append(nuid)
item_ids.append(niid)
labels.append(0.0)
indices = np.arange(len(user_ids))
np.random.shuffle(indices) np.random.shuffle(indices)
users_np = np.array(user_ids, dtype=np.int64)[indices] users_np = np.array(user_ids, dtype=np.int32)[indices]
items_np = np.array(item_ids, dtype=np.int64)[indices] items_np = np.array(item_ids, dtype=np.int32)[indices]
labels_np = np.array(labels, dtype=np.float32)[indices] labels_np = np.array(labels, dtype=np.float32)[indices]
return np.expand_dims(users_np, -1), \ return np.expand_dims(users_np, -1), \
np.expand_dims(items_np, -1), \ np.expand_dims(items_np, -1), \
np.expand_dims(labels_np, -1) np.expand_dims(labels_np, -1), num_users, num_items, matrix
class TestImperativeDeepCF(unittest.TestCase): class TestImperativeDeepCF(unittest.TestCase):
def test_gan_float32(self): def test_deefcf(self):
seed = 90 seed = 90
users_np, items_np, labels_np = get_data() if DATA_PATH:
(users_np, items_np, labels_np, num_users, num_items,
matrix) = load_data(DATA_PATH)
else:
(users_np, items_np, labels_np, num_users, num_items,
matrix) = get_data()
startup = fluid.Program() startup = fluid.Program()
startup.random_seed = seed startup.random_seed = seed
main = fluid.Program() main = fluid.Program()
main.random_seed = seed main.random_seed = seed
"""
scope = fluid.core.Scope() scope = fluid.core.Scope()
with new_program_scope(main=main, startup=startup, scope=scope): with new_program_scope(main=main, startup=startup, scope=scope):
users = fluid.layers.data('users', [1], dtype='int64') users = fluid.layers.data('users', [1], dtype='int32')
items = fluid.layers.data('items', [1], dtype='int64') items = fluid.layers.data('items', [1], dtype='int32')
labels = fluid.layers.data('labels', [1], dtype='float32') labels = fluid.layers.data('labels', [1], dtype='float32')
deepcf = DeepCF('deepcf') deepcf = DeepCF('deepcf', num_users, num_items, matrix)
prediction = deepcf(users, items) prediction = deepcf(users, items)
loss = fluid.layers.reduce_sum( loss = fluid.layers.reduce_sum(
fluid.layers.log_loss(prediction, labels)) fluid.layers.log_loss(prediction, labels))
...@@ -159,35 +226,45 @@ class TestImperativeDeepCF(unittest.TestCase): ...@@ -159,35 +226,45 @@ class TestImperativeDeepCF(unittest.TestCase):
exe = fluid.Executor(fluid.CPUPlace( exe = fluid.Executor(fluid.CPUPlace(
) if not core.is_compiled_with_cuda() else fluid.CUDAPlace(0)) ) if not core.is_compiled_with_cuda() else fluid.CUDAPlace(0))
exe.run(startup) exe.run(startup)
for slice in range(0, BATCH_SIZE * NUM_BATCHES, BATCH_SIZE): for e in range(NUM_EPOCHES):
static_loss = exe.run( sys.stderr.write('epoch %d\n' % e)
main, for slice in range(0, BATCH_SIZE * NUM_BATCHES, BATCH_SIZE):
feed={ if slice + BATCH_SIZE >= users_np.shape[0]:
users.name: users_np[slice:slice + BATCH_SIZE], break
items.name: items_np[slice:slice + BATCH_SIZE], static_loss = exe.run(
labels.name: labels_np[slice:slice + BATCH_SIZE] main,
}, feed={
fetch_list=[loss])[0] users.name: users_np[slice:slice + BATCH_SIZE],
sys.stderr.write('static loss %s\n' % static_loss) items.name: items_np[slice:slice + BATCH_SIZE],
labels.name: labels_np[slice:slice + BATCH_SIZE]
},
fetch_list=[loss])[0]
sys.stderr.write('static loss %s\n' % static_loss)
"""
with fluid.imperative.guard(): with fluid.imperative.guard():
fluid.default_startup_program().random_seed = seed fluid.default_startup_program().random_seed = seed
fluid.default_main_program().random_seed = seed fluid.default_main_program().random_seed = seed
deepcf = DeepCF('deepcf') deepcf = DeepCF('deepcf', num_users, num_items, matrix)
for slice in range(0, BATCH_SIZE * NUM_BATCHES, BATCH_SIZE): sys.stderr.write('matrix: %s\n' % deepcf._rating_matrix._numpy())
prediction = deepcf( for e in range(NUM_EPOCHES):
to_variable(users_np[slice:slice + BATCH_SIZE]), sys.stderr.write('epoch %d\n' % e)
to_variable(items_np[slice:slice + BATCH_SIZE])) for slice in range(0, BATCH_SIZE * NUM_BATCHES, BATCH_SIZE):
loss = fluid.layers.reduce_sum( prediction = deepcf(
fluid.layers.log_loss(prediction, to_variable(users_np[slice:slice + BATCH_SIZE]),
to_variable(labels_np[slice:slice + to_variable(items_np[slice:slice + BATCH_SIZE]))
BATCH_SIZE]))) loss = fluid.layers.reduce_sum(
loss._backward() fluid.layers.log_loss(prediction,
adam = fluid.optimizer.AdamOptimizer(0.01) to_variable(labels_np[
adam.minimize(loss) slice:slice + BATCH_SIZE])))
deepcf.clear_gradients() loss._backward()
dy_loss = loss._numpy() adam = fluid.optimizer.AdamOptimizer(0.01)
adam.minimize(loss)
deepcf.clear_gradients()
dy_loss = loss._numpy()
sys.stderr.write('dynamic loss: %s\n' % dy_loss)
sys.stderr.write('matrix: %s\n' % deepcf._rating_matrix._numpy())
self.assertEqual(static_loss, dy_loss) self.assertEqual(static_loss, dy_loss)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册