未验证 提交 572b3e90 编写于 作者: Y yaoxuefeng 提交者: GitHub

add pull gpups sparse op (#37124)

 add pull gpups sparse op
上级 9409ff6b
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/operators/pull_gpups_sparse_op.h"
namespace paddle {
namespace operators {
class PullGpuPSSparseOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext* ctx) const override {
PADDLE_ENFORCE_GE(
ctx->Inputs("Ids").size(), 1UL,
platform::errors::InvalidArgument(
"Inputs(Ids) of PullGpuPSSparseOp should not be empty."));
PADDLE_ENFORCE_GE(
ctx->Outputs("Out").size(), 1UL,
platform::errors::InvalidArgument(
"Outputs(Out) of PullGpuPSSparseOp should not be empty."));
auto embedding_size_vec = ctx->Attrs().Get<std::vector<int>>("size");
PADDLE_ENFORCE_EQ(
ctx->Inputs("Ids").size(), embedding_size_vec.size(),
platform::errors::InvalidArgument("The ids size: %lu must be equal to "
"the length of embedding size: %lu.",
ctx->Inputs("Ids").size(),
embedding_size_vec.size()));
auto all_ids_dim = ctx->GetInputsDim("Ids");
const size_t n_ids = all_ids_dim.size();
std::vector<framework::DDim> outs_dims;
outs_dims.resize(n_ids);
for (size_t i = 0; i < n_ids; ++i) {
int embedding_size = embedding_size_vec[i];
const auto ids_dims = all_ids_dim[i];
int ids_rank = ids_dims.size();
PADDLE_ENFORCE_EQ(ids_dims[ids_rank - 1], 1,
platform::errors::InvalidArgument(
"Shape error in %lu id, the last dimension of the "
"'Ids' tensor must be 1.",
i));
auto out_dim = framework::vectorize(
framework::slice_ddim(ids_dims, 0, ids_rank - 1));
out_dim.push_back(embedding_size);
outs_dims[i] = framework::make_ddim(out_dim);
}
ctx->SetOutputsDim("Out", outs_dims);
for (size_t i = 0; i < n_ids; ++i) {
ctx->ShareLoD("Ids", "Out", i, i);
}
}
protected:
framework::OpKernelType GetExpectedKernelType(
const framework::ExecutionContext& ctx) const override {
return framework::OpKernelType(framework::proto::VarType::FP32,
ctx.device_context());
}
};
class PullGpuPSSparseOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddInput("W",
"(Tensor) The input represents embedding tensors, "
"which is a learnable parameter.")
.AsDispensable();
AddInput("Ids",
"Input tensors with type int32 or int64 "
"contains the ids to be looked up in GpuPS. "
"The last dimension size must be 1.")
.AsDuplicable();
AddOutput("Out", "The lookup results tensors.").AsDuplicable();
AddAttr<std::vector<int>>(
"size", "(vector<int>, the embedding size of corresponding slot")
.SetDefault(std::vector<int>());
AddAttr<bool>("is_sparse",
"(boolean, default false) "
"Sparse update.")
.SetDefault(false);
AddAttr<bool>("is_distributed",
"(boolean, default false) distributed lookup table.")
.SetDefault(false);
AddComment(R"DOC(
Pull GpuPS Sparse Operator.
This operator is used to perform lookups on the GpuPS,
then concatenated into a dense tensor.
The input Ids can carry the LoD (Level of Details) information,
or not. And the output only shares the LoD information with input Ids.
)DOC");
}
};
template <typename T>
class PushGpuPSSparseOpMaker : public framework::SingleGradOpMaker<T> {
public:
using framework::SingleGradOpMaker<T>::SingleGradOpMaker;
protected:
void Apply(GradOpPtr<T> op) const override {
op->SetType("push_gpups_sparse");
op->SetInput("Ids", this->Input("Ids"));
op->SetInput(framework::GradVarName("Out"), this->OutputGrad("Out"));
op->SetOutput(framework::GradVarName("Out"), this->OutputGrad("Out"));
op->SetAttrMap(this->Attrs());
}
};
class PushGpuPSSparseOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext* ctx) const override {}
protected:
framework::OpKernelType GetExpectedKernelType(
const framework::ExecutionContext& ctx) const override {
return framework::OpKernelType(OperatorWithKernel::IndicateVarDataType(
ctx, framework::GradVarName("Out")),
ctx.device_context());
}
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OPERATOR(pull_gpups_sparse, ops::PullGpuPSSparseOp,
ops::PullGpuPSSparseOpMaker,
ops::PushGpuPSSparseOpMaker<paddle::framework::OpDesc>,
ops::PushGpuPSSparseOpMaker<paddle::imperative::OpBase>);
REGISTER_OPERATOR(push_gpups_sparse, ops::PushGpuPSSparseOp);
REGISTER_OP_CPU_KERNEL(pull_gpups_sparse, ops::PullGpuPSSparseCPUKernel<float>,
ops::PullGpuPSSparseCPUKernel<double>)
REGISTER_OP_CPU_KERNEL(push_gpups_sparse, ops::PushGpuPSSparseCPUKernel<float>,
ops::PushGpuPSSparseCPUKernel<double>)
\ No newline at end of file
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/operators/pull_gpups_sparse_op.h"
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
#include "paddle/fluid/platform/device/gpu/gpu_primitives.h"
namespace paddle {
namespace operators {
using platform::PADDLE_CUDA_NUM_THREADS;
using LoDTensor = framework::LoDTensor;
template <typename T>
class PullGpuPSSparseCUDAKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &ctx) const override {
PullGpuPSSparseFunctor<T>(ctx);
}
};
template <typename T>
class PushGpuPSSparseCUDAKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &ctx) const override {
PushGpuPSSparseFunctor<T>(ctx);
}
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OP_CUDA_KERNEL(pull_gpups_sparse,
ops::PullGpuPSSparseCUDAKernel<float>,
ops::PullGpuPSSparseCUDAKernel<double>)
REGISTER_OP_CUDA_KERNEL(push_gpups_sparse,
ops::PushGpuPSSparseCUDAKernel<float>,
ops::PushGpuPSSparseCUDAKernel<double>)
\ No newline at end of file
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <memory>
#include <vector>
#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/tensor.h"
namespace paddle {
namespace operators {
template <typename T>
static void PullGpuPSSparseFunctor(const framework::ExecutionContext &ctx) {
auto inputs = ctx.MultiInput<framework::Tensor>("Ids");
auto outputs = ctx.MultiOutput<framework::Tensor>("Out");
const auto slot_size = inputs.size();
std::vector<const uint64_t *> all_keys(slot_size);
// GpuPSPS only supports float now
std::vector<float *> all_values(slot_size);
std::vector<int64_t> slot_lengths(slot_size);
for (size_t i = 0; i < slot_size; i++) {
const auto *slot = inputs[i];
const uint64_t *single_slot_keys =
reinterpret_cast<const uint64_t *>(slot->data<int64_t>());
all_keys[i] = single_slot_keys;
slot_lengths[i] = slot->numel();
auto *output = outputs[i]->mutable_data<T>(ctx.GetPlace());
// double type is not fully supported now
all_values[i] = reinterpret_cast<float *>(output);
}
#ifdef PADDLE_WITH_HETERPS
auto gpu_ps_ptr = paddle::framework::PSGPUWrapper::GetInstance();
gpu_ps_ptr->PullSparse(ctx.GetPlace(), 0, all_keys, all_values, slot_lengths,
0);
#endif
}
template <typename T>
static void PushGpuPSSparseFunctor(const framework::ExecutionContext &ctx) {
auto inputs = ctx.MultiInput<framework::LoDTensor>("Ids");
auto d_output =
ctx.MultiInput<framework::Tensor>(framework::GradVarName("Out"));
const auto slot_size = inputs.size();
std::vector<const uint64_t *> all_keys(slot_size);
std::vector<const float *> all_grad_values(slot_size);
std::vector<int64_t> slot_lengths(slot_size);
int batch_size = -1;
for (size_t i = 0; i < slot_size; i++) {
const auto *slot = inputs[i];
const uint64_t *single_slot_keys =
reinterpret_cast<const uint64_t *>(slot->data<int64_t>());
all_keys[i] = single_slot_keys;
slot_lengths[i] = slot->numel();
int cur_batch_size =
slot->lod().size() ? slot->lod()[0].size() - 1 : slot->dims()[0];
if (batch_size == -1) {
batch_size = cur_batch_size;
} else {
PADDLE_ENFORCE_EQ(batch_size, cur_batch_size,
platform::errors::PreconditionNotMet(
"The batch size of all input slots should be same, "
"please cheack"));
}
const float *grad_value = d_output[i]->data<float>();
all_grad_values[i] = grad_value;
}
#ifdef PADDLE_WITH_HETERPS
auto gpu_ps_ptr = paddle::framework::PSGPUWrapper::GetInstance();
gpu_ps_ptr->PushSparseGrad(ctx.GetPlace(), 0, all_keys, all_grad_values,
slot_lengths, 0, batch_size);
#endif
}
using LoDTensor = framework::LoDTensor;
template <typename T>
class PullGpuPSSparseCPUKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &ctx) const override {
PullGpuPSSparseFunctor<T>(ctx);
}
};
template <typename T>
class PushGpuPSSparseCPUKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &ctx) const override {
PushGpuPSSparseFunctor<T>(ctx);
}
};
} // namespace operators
} // namespace paddle
......@@ -94,7 +94,8 @@ class DistributedAdam(DistributedOptimizerImplBase):
".batch_size@GRAD", ".batch_square_sum@GRAD", ".batch_sum@GRAD"
]
self.supported_embedding_types = [
"lookup_table", "pull_sparse", "pull_sparse_v2", "pull_box_sparse"
"lookup_table", "pull_sparse", "pull_sparse_v2", "pull_box_sparse",
"pull_gpups_sparse"
]
self.supported_embedding_grad_types = [
"lookup_table_grad", "push_sparse", "push_sparse_v2"
......
......@@ -665,6 +665,69 @@ def _pull_sparse_v2(input,
return outs
def _pull_gpups_sparse(input,
size,
dtype='float32',
is_distributed=False,
is_sparse=False):
r"""
**Pull GpuPS Sparse Layer**
This layer is used to lookup embeddings of IDs, provided by :attr:`input`, in
GpuPS lookup table. The result of this lookup is the embedding of each ID in the
:attr:`input`.
Args:
input(Variable|list of Variable): Input is a Tensor<int64> Variable, which
contains the IDs information.
size(int|list of int): The embedding size parameter of each input, which indicates the size of
each embedding vector respectively.
dtype(str): The dtype refers to the data type of output tensor. Only supports
float32 now.
Returns:
Variable|list of Variable: The tensor variable storing the embeddings of the \
supplied inputs, whose size are indicated by size respectively.
Examples:
.. code-block:: python
import paddle.fluid as fluid
slots = []
data_1 = fluid.layers.data(name='sequence', shape=[1], dtype='int64', lod_level=1)
slots.append(data_1)
data_2 = fluid.layers.data(name='sequence', shape=[1], dtype='int64', lod_level=1)
slots.append(data_2)
embs = fluid.layers.pull_gpups_sparse(input=slots, size=[11, 35])
"""
helper = LayerHelper('pull_gpups_sparse', **locals())
if dtype != 'float32':
raise ValueError(
"GpuPS only support float type embedding now, and your type is: " +
dtype)
helper.input_dtype()
inputs = helper.multiple_input()
outs = [
helper.create_variable_for_type_inference(dtype)
for i in range(len(inputs))
]
w = helper.create_parameter(
attr=helper.param_attr, shape=[11], dtype=dtype, is_bias=False)
helper.append_op(
type='pull_gpups_sparse',
inputs={'Ids': inputs,
'W': w},
outputs={'Out': outs},
attrs={
'size': size,
'is_distributed': is_distributed,
'is_sparse': is_sparse
})
if len(outs) == 1:
return outs[0]
return outs
def _pull_box_sparse(input,
size,
dtype='float32',
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
from paddle.fluid import Program, program_guard
from paddle.fluid.layers.nn import _pull_gpups_sparse
paddle.enable_static()
class TestPullGpupsSparse(unittest.TestCase):
"""Test PullGpupsSparse op."""
def test_static_graph(self):
startup_program = fluid.Program()
train_program = fluid.Program()
slots = []
with fluid.program_guard(train_program, startup_program):
l = fluid.layers.data(
name='input', shape=[1], dtype="int64", lod_level=1)
slots.append(l)
output = _pull_gpups_sparse(
slots, size=[11], is_distributed=True, is_sparse=True)
cost = paddle.fluid.layers.mean(output)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
sgd_optimizer.minimize(cost, train_program)
block = train_program.global_block()
place = fluid.CPUPlace()
if fluid.core.is_compiled_with_cuda():
place = fluid.CUDAPlace(0)
exe = fluid.Executor(place)
exe.run(startup_program)
img = np.array([1]).astype(np.int64)
res = exe.run(train_program,
feed={'input': img},
fetch_list=[output])
if __name__ == "__main__":
unittest.main()
......@@ -728,4 +728,5 @@ STATIC_MODE_TESTING_LIST = [
'test_fill_diagonal_tensor_op',
'test_fill_any_op',
'test_margin_cross_entropy_op',
'test_pull_gpups_sparse_op',
]
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册