提交 32585ece 编写于 作者: Y Yang Yu

Merge branch 'develop' into feature/test_w2v_parallel.do

......@@ -26,7 +26,7 @@ nv_test(lod_tensor_gpu_test SRCS lod_tensor_test.cu DEPS lod_tensor)
cc_test(variable_test SRCS variable_test.cc)
cc_library(threadpool SRCS threadpool.cc)
cc_library(threadpool SRCS threadpool.cc DEPS enforce)
cc_test(threadpool_test SRCS threadpool_test.cc DEPS threadpool)
cc_library(scope SRCS scope.cc DEPS glog threadpool)
......@@ -98,3 +98,5 @@ if(NOT WITH_C_API AND WITH_FLUID)
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/framework.pb.h DESTINATION include/paddle/framework)
install(FILES details/cow_ptr.h details/op_registry.h DESTINATION include/paddle/framework/details)
endif()
cc_test(channel_test SRCS channel_test.cc)
......@@ -13,75 +13,52 @@ See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <condition_variable>
#include <mutex>
#include <queue>
#include <stddef.h> // for size_t
namespace paddle {
namespace framework {
// Channel is the abstract class of buffered and un-buffered channels.
template <typename T>
class Channel {
public:
explicit Channel(std::size_t capacity) : capacity_(capacity) {}
void Send(T* channel_element) {
std::unique_lock<std::mutex> lock(mu_);
if (IsBounded()) {
full_cond_var_.wait(lock, [this]() {
bool capacity_valid = capacity_ > 0 ? !IsCapacityFull() : true;
return capacity_valid;
});
}
channel_.push_back(std::move(*channel_element));
lock.unlock();
empty_cond_var_.notify_one();
}
virtual void Send(T*) = 0;
virtual void Receive(T*) = 0;
virtual size_t Cap() = 0;
T* Receive() {
std::unique_lock<std::mutex> lock(mu_);
empty_cond_var_.wait(lock, [this]() { return !channel_.empty(); });
T* channel_element = std::move(channel_.front());
channel_.pop_front();
NotifyAllSenders(&lock);
return channel_element;
}
size_t Size() {
std::unique_lock<std::mutex> lock(mu_);
return channel_.size();
}
// Don't delete channels; instead, call Channel::Close.
protected:
virtual ~Channel() {}
};
void Clear() {
std::unique_lock<std::mutex> lock(mu_);
channel_.clear();
// Forward declaration of channel implementations.
namespace details {
template <typename T>
class Buffered;
template <typename T>
class UnBuffered;
} // namespace details
NotifyAllSenders(&lock);
template <typename T>
Channel<T>* MakeChannel(size_t buffer_size) {
if (buffer_size > 0) {
return new details::Buffered<T>(buffer_size);
}
return new details::UnBuffered<T>();
}
private:
std::size_t capacity_;
std::mutex mu_;
std::condition_variable empty_cond_var_;
std::condition_variable full_cond_var_;
std::deque<T> channel_;
private:
void NotifyAllSenders(std::unique_lock<std::mutex>* lock) {
if (IsBounded()) {
lock->unlock();
full_cond_var_.notify_one();
}
template <typename T>
void CloseChannel(Channel<T>* ch) {
if (ch->Cap() > 0) {
delete dynamic_cast<details::Buffered<T>*>(ch);
} else {
delete dynamic_cast<details::UnBuffered<T>*>(ch);
}
}
bool IsBounded() const { return capacity_ > 0; }
bool IsCapacityFull() const { return channel_.size() >= capacity_; }
};
} // namespace operator
} // namespace framework
} // namespace paddle
#include "paddle/framework/details/buffered_channel.h"
#include "paddle/framework/details/unbuffered_channel.h"
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/framework/channel.h"
#include "gtest/gtest.h"
TEST(Channel, MakeAndClose) {
using paddle::framework::Channel;
using paddle::framework::MakeChannel;
using paddle::framework::CloseChannel;
Channel<int>* ch = MakeChannel<int>(10);
CloseChannel(ch);
}
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <condition_variable>
#include <deque>
#include <mutex>
#include "paddle/framework/channel.h"
namespace paddle {
namespace framework {
namespace details {
template <typename T>
class Buffered : public paddle::framework::Channel<T> {
friend Channel<T>* paddle::framework::MakeChannel<T>(size_t);
friend void paddle::framework::CloseChannel<T>(Channel<T>*);
public:
virtual void Send(T*);
virtual void Receive(T*);
virtual size_t Cap() { return cap_; }
private:
size_t cap_;
std::mutex mu_;
std::condition_variable empty_cond_var_;
std::condition_variable full_cond_var_;
std::deque<T> channel_;
Buffered(size_t cap) : cap_(cap) {}
virtual ~Buffered();
void NotifyAllSenders(std::unique_lock<std::mutex>*);
};
template <typename T>
void Buffered<T>::Send(T* item) {
std::unique_lock<std::mutex> lock(mu_);
full_cond_var_.wait(lock, [this]() { return channel_.size() < cap_; });
channel_.push_back(std::move(*item));
lock.unlock();
empty_cond_var_.notify_one();
}
template <typename T>
void Buffered<T>::Receive(T* item) {
std::unique_lock<std::mutex> lock(mu_);
empty_cond_var_.wait(lock, [this]() { return !channel_.empty(); });
*item = std::move(channel_.front());
channel_.pop_front();
NotifyAllSenders(&lock);
}
template <typename T>
Buffered<T>::~Buffered() {
std::unique_lock<std::mutex> lock(mu_);
channel_.clear();
NotifyAllSenders(&lock);
}
template <typename T>
void Buffered<T>::NotifyAllSenders(std::unique_lock<std::mutex>* lock) {
lock->unlock();
full_cond_var_.notify_one();
}
} // namespace details
} // namespace framework
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <condition_variable>
#include <deque>
#include <mutex>
#include "paddle/framework/channel.h"
namespace paddle {
namespace framework {
namespace details {
template <typename T>
class UnBuffered : public paddle::framework::Channel<T> {
friend Channel<T>* paddle::framework::MakeChannel<T>(size_t);
friend void paddle::framework::CloseChannel<T>(Channel<T>*);
public:
virtual void Send(T*);
virtual void Receive(T*);
virtual size_t Cap() { return 0; }
private:
UnBuffered() {}
virtual ~UnBuffered();
};
template <typename T>
void UnBuffered<T>::Send(T* channel_element) {}
template <typename T>
void UnBuffered<T>::Receive(T*) {}
template <typename T>
UnBuffered<T>::~UnBuffered() {}
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -17,6 +17,7 @@ limitations under the License. */
#include <algorithm>
#include <set>
#include <string>
#include <unordered_map>
#include <vector>
#include <glog/logging.h>
......@@ -102,6 +103,32 @@ void prune_impl(const proto::ProgramDesc& input, proto::ProgramDesc* output,
*op_field->Add() = input.blocks(block_id).ops(i);
}
}
// remove the VarDescs in BlockDesc that are not referenced in
// the pruned OpDescs
std::unordered_map<std::string, proto::VarDesc> var_map;
auto* var_field = output->mutable_blocks(block_id)->mutable_vars();
for (const auto& var : *var_field) {
var_map[var.name()] = var;
}
var_field->Clear();
for (const auto& op : *op_field) {
// add VarDescs of all input arguments for each OpDesc
auto& input_field = op.inputs();
for (auto& input_var : input_field) {
for (auto& arg : input_var.arguments()) {
*var_field->Add() = var_map[arg];
}
}
// add VarDescs of all output arguments for each OpDesc
auto& output_field = op.outputs();
for (auto& output_var : output_field) {
for (auto& arg : output_var.arguments()) {
*var_field->Add() = var_map[arg];
}
}
}
}
// TODO(fengjiayi): Prune() could be inplaced to avoid unnecessary copies
......
......@@ -14,6 +14,8 @@
#include "paddle/framework/threadpool.h"
#include "paddle/platform/enforce.h"
namespace paddle {
namespace framework {
......
......@@ -22,7 +22,7 @@ limitations under the License. */
#include <thread>
#include <vector>
#include "paddle/platform/enforce.h"
#include "paddle/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN
namespace paddle {
namespace framework {
......
......@@ -97,6 +97,21 @@ bool RPCClient::AsyncGetVariable(const std::string& ep,
return true;
}
bool RPCClient::AsyncSendBatchBarrier(const std::string& ep, int64_t time_out) {
const auto ch = GetChannel(ep);
BatchBarrierProcessor* s = new BatchBarrierProcessor(ch);
s->Prepare(time_out);
sendrecv::VariableMessage req;
req.set_varname(BATCH_BARRIER_MESSAGE);
auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, (void*)s);
req_count_++;
return true;
}
bool RPCClient::Wait() {
if (req_count_ <= 0) {
return true;
......
......@@ -71,6 +71,15 @@ class ClientBase {
context_->set_deadline(deadline);
}
virtual void Prepare(int64_t time_out) {
context_.reset(new grpc::ClientContext());
std::chrono::system_clock::time_point deadline =
std::chrono::system_clock::now() + std::chrono::milliseconds(time_out);
context_->set_deadline(deadline);
}
virtual void Process() = 0;
std::unique_ptr<sendrecv::SendRecvService::Stub> stub_;
......@@ -117,6 +126,17 @@ class GetProcessor : public ClientBase {
RequestGetCallBack response_call_back_ = ProcGetResponse;
};
class BatchBarrierProcessor : public ClientBase {
public:
explicit BatchBarrierProcessor(std::shared_ptr<grpc::Channel> ch)
: ClientBase(ch) {}
virtual ~BatchBarrierProcessor() {}
virtual void Process() {}
sendrecv::VoidMessage reply_;
};
class RPCClient {
public:
bool AsyncSendVariable(const std::string& ep,
......@@ -130,6 +150,10 @@ class RPCClient {
const framework::Scope& scope,
const std::string& var_name,
int64_t time_out = 600 * 1000);
bool AsyncSendBatchBarrier(const std::string& ep,
int64_t time_out = 600 * 1000);
bool Wait();
private:
......
......@@ -132,6 +132,7 @@ void AsyncGRPCServer::RunSyncUpdate() {
cq_send_ = builder.AddCompletionQueue();
cq_get_ = builder.AddCompletionQueue();
server_ = builder.BuildAndStart();
LOG(INFO) << "Server listening on " << address_ << std::endl;
......@@ -141,11 +142,11 @@ void AsyncGRPCServer::RunSyncUpdate() {
std::bind(&AsyncGRPCServer::TryToRegisterNewGetOne, this);
t_send_.reset(
new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, false,
new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this,
cq_send_.get(), "cq_send", send_register)));
t_get_.reset(
new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this, true,
new std::thread(std::bind(&AsyncGRPCServer::HandleRequest, this,
cq_get_.get(), "cq_get", get_register)));
// wait server
......@@ -174,7 +175,7 @@ void AsyncGRPCServer::TryToRegisterNewSendOne() {
}
RequestSend* send =
new RequestSend(&service_, cq_send_.get(), &var_recv_queue_);
VLOG(4) << "create RequestSend status:" << send->Status();
VLOG(4) << "Create RequestSend status:" << send->Status();
}
void AsyncGRPCServer::TryToRegisterNewGetOne() {
......@@ -184,11 +185,11 @@ void AsyncGRPCServer::TryToRegisterNewGetOne() {
}
RequestGet* get = new RequestGet(&service_, cq_get_.get(), scope_, dev_ctx_,
&var_get_queue_);
VLOG(4) << "create Requestget status:" << get->Status();
VLOG(4) << "Create RequestGet status:" << get->Status();
}
// FIXME(typhoonzero): remove wait argument and change cq_name to enum.
void AsyncGRPCServer::HandleRequest(bool wait, grpc::ServerCompletionQueue* cq,
// FIXME(typhoonzero): change cq_name to enum.
void AsyncGRPCServer::HandleRequest(grpc::ServerCompletionQueue* cq,
std::string cq_name,
std::function<void()> TryToRegisterNewOne) {
TryToRegisterNewOne();
......
......@@ -57,8 +57,7 @@ class AsyncGRPCServer final : public sendrecv::SendRecvService::Service {
void ShutDown();
protected:
void HandleRequest(bool wait, grpc::ServerCompletionQueue *cq,
std::string cq_name,
void HandleRequest(grpc::ServerCompletionQueue *cq, std::string cq_name,
std::function<void()> TryToRegisterNewOne);
void TryToRegisterNewSendOne();
void TryToRegisterNewGetOne();
......
......@@ -30,6 +30,9 @@ namespace paddle {
namespace operators {
namespace detail {
#define LISTEN_TERMINATE_MESSAGE "TERMINATE@RECV"
#define BATCH_BARRIER_MESSAGE "BATCH_BARRIER@RECV"
void SerializeToMessage(const std::string& name, const framework::Variable* var,
const platform::DeviceContext& ctx,
sendrecv::VariableMessage* msg);
......
......@@ -29,8 +29,6 @@ limitations under the License. */
#include "paddle/operators/detail/simple_block_queue.h"
#include "paddle/string/printf.h"
#define LISTEN_TERMINATE_MESSAGE "TERMINATE@RECV"
namespace paddle {
namespace operators {
......@@ -95,7 +93,6 @@ class RecvOp : public framework::OperatorBase {
auto param_list = Attr<std::vector<std::string>>("ParamList");
auto grad_list = Attr<std::vector<std::string>>("GradList");
auto fan_in = Attr<int>("Fanin");
size_t param_count = param_list.size();
auto *block = Attr<framework::BlockDesc *>(kOptimizeBlock);
auto *program = block->Program();
......@@ -103,38 +100,50 @@ class RecvOp : public framework::OperatorBase {
// TODO(typhoonzero): change this to a while_op for every cluster-batch.
bool exit_flag = false;
size_t barrier_size = param_count * fan_in;
while (!exit_flag) {
// Get from multiple trainers, we don't care about the order in which
// the gradients arrives, just add suffix 0~n and merge the gradient.
rpc_service_->SetCond(0);
for (size_t i = 0; i < barrier_size; ++i) {
size_t recv_var_cnt = 0;
int batch_barrier = 0;
while (batch_barrier != fan_in) {
const detail::MessageWithName &v = rpc_service_->Get();
auto grad_var_name = v.first;
if (grad_var_name == LISTEN_TERMINATE_MESSAGE) {
LOG(INFO) << "received terminate message and exit";
exit_flag = true;
break;
}
auto it = std::find(grad_list.begin(), grad_list.end(), grad_var_name);
std::string param_var_name;
if (it != grad_list.end()) {
param_var_name = param_list[it - grad_list.begin()];
} else if (grad_var_name == BATCH_BARRIER_MESSAGE) {
VLOG(3) << "recv batch barrier message";
batch_barrier++;
continue;
} else {
LOG(ERROR) << "grad has no paired param:" << grad_var_name;
}
VLOG(3) << "received grad: " << grad_var_name
<< " updating param: " << param_var_name;
if (fan_in > 1) {
grad_var_name = this->GetGradVarNameForTrainer(grad_var_name);
}
auto *var = recv_scope.FindVar(grad_var_name);
if (var == nullptr) {
LOG(ERROR) << "Can not find server side var: " << grad_var_name;
PADDLE_THROW("Can not find server side var");
// receive a variable
recv_var_cnt++;
auto it =
std::find(grad_list.begin(), grad_list.end(), grad_var_name);
std::string param_var_name;
if (it != grad_list.end()) {
param_var_name = param_list[it - grad_list.begin()];
} else {
LOG(ERROR) << "grad has no paired param:" << grad_var_name;
}
VLOG(3) << "received grad: " << grad_var_name
<< " updating param: " << param_var_name;
if (fan_in > 1) {
grad_var_name = this->GetGradVarNameForTrainer(grad_var_name);
}
auto *var = recv_scope.FindVar(grad_var_name);
if (var == nullptr) {
LOG(ERROR) << "Can not find server side var: " << grad_var_name;
PADDLE_THROW("Can not find server side var");
}
detail::DeserializeFromMessage(v.second, dev_ctx, var);
}
detail::DeserializeFromMessage(v.second, dev_ctx, var);
}
VLOG(3) << "recv " << recv_var_cnt << " parmeters for one barrier.";
// TODO(Yancey1989): merge SelectedRows variables here
if (exit_flag) {
break;
}
......@@ -146,7 +155,7 @@ class RecvOp : public framework::OperatorBase {
LOG(ERROR) << "run sub program error " << e.what();
}
rpc_service_->SetCond(1);
rpc_service_->WaitClientGet(barrier_size);
rpc_service_->WaitClientGet(recv_var_cnt);
grads_counter_.clear();
} // while(true)
}
......@@ -161,7 +170,6 @@ class RecvOpMaker : public framework::OpProtoAndCheckerMaker {
public:
RecvOpMaker(OpProto *proto, OpAttrChecker *op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("RX", "(Tensor) Input tensor to be optimized").AsDuplicable();
AddComment(R"DOC(
Recv operator
......
......@@ -13,7 +13,6 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/operators/reduce_op.h"
#include "paddle/operators/net_op.h"
namespace paddle {
namespace operators {
......@@ -38,10 +37,14 @@ class ReduceOp : public framework::OperatorWithKernel {
dim, x_rank,
"The dim should be in the range [-rank(input), rank(input)).");
bool reduce_all = ctx->Attrs().Get<bool>("reduce_all");
bool keep_dim = ctx->Attrs().Get<bool>("keep_dim");
if (reduce_all) {
ctx->SetOutputDim("Out", {1});
if (keep_dim)
ctx->SetOutputDim(
"Out", framework::make_ddim(std::vector<int64_t>(x_rank, 1)));
else
ctx->SetOutputDim("Out", {1});
} else {
bool keep_dim = ctx->Attrs().Get<bool>("keep_dim");
auto dims_vector = vectorize(x_dims);
if (keep_dim || x_rank == 1) {
dims_vector[dim] = 1;
......
......@@ -37,17 +37,25 @@ class SendOp : public framework::OperatorBase {
auto ins = Inputs("X");
auto outs = Outputs("Out");
std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap");
std::vector<std::string> endpoints =
Attr<std::vector<std::string>>("endpoints");
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(place);
for (size_t i = 0; i < ins.size(); i++) {
VLOG(3) << "sending " << ins[i];
VLOG(3) << "sending " << ins[i] << " to " << epmap[i];
client_.AsyncSendVariable(epmap[i], ctx, scope, ins[i]);
}
PADDLE_ENFORCE(client_.Wait());
for (auto& ep : endpoints) {
VLOG(3) << "batch barrier, ep: " << ep;
client_.AsyncSendBatchBarrier(ep);
}
PADDLE_ENFORCE(client_.Wait());
for (size_t i = 0; i < outs.size(); i++) {
VLOG(3) << "getting " << outs[i];
VLOG(3) << "getting " << outs[i] << " from " << epmap[i];
client_.AsyncGetVariable(epmap[i], ctx, scope, outs[i]);
}
......
......@@ -30,8 +30,13 @@ class SequenceReshapeOp : public framework::OperatorWithKernel {
auto x_numel = product(x_dims);
PADDLE_ENFORCE_EQ(x_dims.size(), 2U, "Rank of Input(X) should be 2.");
int new_dim = ctx->Attrs().Get<int>("new_dim");
ctx->SetOutputDim("Out",
{x_numel / new_dim, static_cast<int64_t>(new_dim)});
if (ctx->IsRuntime()) {
ctx->SetOutputDim("Out",
{x_numel / new_dim, static_cast<int64_t>(new_dim)});
} else {
// when compiling, the batch size is undetermined, just set to -1
ctx->SetOutputDim("Out", {-1, static_cast<int64_t>(new_dim)});
}
}
};
......
......@@ -32,7 +32,7 @@ function cmake_gen() {
cat <<EOF
========================================
Configuring cmake in /paddle/build ...
-DCMAKE_BUILD_TYPE=Release
-DCMAKE_BUILD_TYPE=${BUILD_TYPE:Release}
${PYTHON_FLAGS}
-DWITH_DOC=OFF
-DWITH_GPU=${WITH_GPU:-OFF}
......@@ -54,7 +54,7 @@ EOF
# docker environment is fully controlled by this script.
# See /Paddle/CMakeLists.txt, UNITTEST_USE_VIRTUALENV option.
cmake .. \
-DCMAKE_BUILD_TYPE=Release \
-DCMAKE_BUILD_TYPE=${BUILD_TYPE:Release} \
${PYTHON_FLAGS} \
-DWITH_DOC=OFF \
-DWITH_GPU=${WITH_GPU:-OFF} \
......
......@@ -140,8 +140,13 @@ def init_config_environment(
g_submodel_stack=[],
g_add_submodel_suffix=False, ):
for k, v in locals().iteritems():
globals()[k] = copy.deepcopy(v)
# directly iterate through locals().iteritems() will change
# the size of locals() due to introducing k, v into scope
# which will break the process in some env
local_vars = copy.deepcopy(locals())
for k, v in local_vars.iteritems():
globals()[k] = v
# Because type is widely used as a variable name in this code.
......
......@@ -30,6 +30,9 @@ __all__ = [
class BaseErrorClipAttr(object):
def __str__(self):
raise NotImplementedError()
def append_clip_op(self, block, grad_name):
raise NotImplementedError()
......@@ -44,6 +47,9 @@ class ErrorClipByValue(BaseErrorClipAttr):
self.max = max
self.min = min
def __str__(self):
return "ByValue, min=%f, max=%f" % (self.min, self.max)
def append_clip_op(self, block, grad_name):
clip_op_desc = block.desc.append_op()
clip_op_desc.set_type("clip")
......@@ -71,6 +77,9 @@ def error_clip_callback(block, context):
class BaseGradientClipAttr(object):
def __str__(self):
raise NotImplementedError()
def process_context(self, context, param, grad):
raise NotImplementedError()
......@@ -79,6 +88,9 @@ class BaseGradientClipAttr(object):
class NullGradientClipAttr(BaseGradientClipAttr):
def __str__(self):
return "Null"
def process_context(self, context, param, grad):
pass
......@@ -96,6 +108,9 @@ class GradientClipByValue(BaseGradientClipAttr):
self.max = max
self.min = min
def __str__(self):
return "ByValue, min=%f, max=%f" % (self.min, self.max)
def process_context(self, context, param, grad):
pass
......@@ -108,6 +123,9 @@ class GradientClipByNorm(BaseGradientClipAttr):
def __init__(self, clip_norm):
self.clip_norm = clip_norm
def __str__(self):
return "ByNorm, clip_norm=%f" % self.clip_norm
def process_context(self, context, param, grad):
pass
......@@ -124,6 +142,10 @@ class GradientClipByGlobalNorm(BaseGradientClipAttr):
self.clip_norm = clip_norm
self.group_name = group_name
def __str__(self):
return "ByGlobalNorm, group_name=%s, clip_norm=%f" % (self.group_name,
self.clip_norm)
def process_context(self, context, param, grad):
if self.group_name not in context:
context[self.group_name] = []
......@@ -160,6 +182,17 @@ class GradientClipByGlobalNorm(BaseGradientClipAttr):
def set_gradient_clip(clip, param_list=None, program=None):
"""
To specify parameters that require gradient clip.
Args:
clip(BaseGradientClipAttr): An instance of some derived class of BaseGradientClipAttr,
which describes the type and detailed attributes of required gradient clip.
param_list(list, None by default): Parameters that require gradient clip.
It can be a list of parameter or a list of parameter's name.
When it's None, all parameters in the program will be included.
program(Program, None by default): The program where parameters are.
Will be the default main program when assigned with None.
"""
if not isinstance(clip, BaseGradientClipAttr):
raise TypeError(
"'clip' should be an instance of BaseGradientClipAttr's derived class"
......@@ -199,3 +232,5 @@ def append_gradient_clip_ops(param_grad):
ClipByValue = GradientClipByValue
ClipByNorm = GradientClipByNorm
ClipByGlobalNorm = GradientClipByGlobalNorm
......@@ -474,8 +474,7 @@ class DistributeTranspiler:
# Append the recv op
pserver_program.global_block().append_op(
type="recv",
inputs={"RX": self.param_grad_ep_mapping[endpoint]["grads"]
}, # grads to recv
inputs={},
outputs={},
attrs={
"OptimizeBlock": optimize_sub_program.global_block(),
......
......@@ -14,6 +14,7 @@
import collections
import contextlib
import re
import numpy as np
......@@ -239,20 +240,30 @@ class Variable(object):
def __str__(self):
return self.to_string(True)
def to_string(self, throw_on_error):
def to_string(self, throw_on_error, with_details=False):
"""
Get debug string.
Args:
throw_on_error(bool): True if raise an exception when self is not
intialized.
with_details(bool): more details about variables and parameters
(e.g. trainable, optimize_attr, ...) will be printed when with_details is True
Returns(str): The debug string.
"""
assert isinstance(throw_on_error, bool) and isinstance(with_details,
bool)
protostr = self.desc.serialize_to_string()
proto = framework_pb2.VarDesc.FromString(str(protostr))
return _debug_string_(proto, throw_on_error)
res_str = _debug_string_(proto, throw_on_error)
if with_details:
additional_attr = ("error_clip", "stop_gradient")
for attr_name in additional_attr:
res_str += "%s: %s\n" % (attr_name,
str(getattr(self, attr_name)))
return res_str
__repr__ = __str__
......@@ -629,10 +640,36 @@ class Block(object):
def __str__(self):
return self.to_string(True)
def to_string(self, throw_on_error):
protostr = self.desc.serialize_to_string()
proto = framework_pb2.BlockDesc.FromString(str(protostr))
return _debug_string_(proto, throw_on_error)
def to_string(self, throw_on_error, with_details=False):
"""
To debug string.
Args:
throw_on_error(bool): raise exception when self is not initialized
when throw_on_error is True
with_details(bool): more details about variables and parameters
(e.g. trainable, optimize_attr, ...) will be printed when with_details is True
Returns(str): The debug string.
"""
assert isinstance(throw_on_error, bool) and isinstance(with_details,
bool)
if with_details:
re_add_indent = re.compile(r"\n(.)")
res_str = "blocks {\n idx: %d\n parent_idx: %d" % (
self.idx, self.parent_idx)
for var in self.vars.itervalues():
res_str += "\n vars {\n %s }" % re_add_indent.sub(
r"\n \1", var.to_string(throw_on_error, with_details))
for op in self.ops:
res_str += "\n ops {\n %s }" % re_add_indent.sub(
r"\n \1", op.to_string(throw_on_error))
res_str += "\n}"
else:
protostr = self.desc.serialize_to_string()
proto = framework_pb2.BlockDesc.FromString(str(protostr))
res_str = _debug_string_(proto, throw_on_error)
return res_str
__repr__ = __str__
......@@ -796,10 +833,29 @@ class Program(object):
def __str__(self):
return self.to_string(True)
def to_string(self, throw_on_error):
protostr = self.desc.serialize_to_string()
proto = framework_pb2.ProgramDesc.FromString(str(protostr))
return _debug_string_(proto, throw_on_error)
def to_string(self, throw_on_error, with_details=False):
"""
To debug string.
Args:
throw_on_error(bool): raise exception when self is not initialized
when throw_on_error is True
with_details(bool): more details about variables and parameters
(e.g. trainable, optimize_attr, ...) will be printed when with_details is True
Returns(str): The debug string.
"""
assert isinstance(throw_on_error, bool) and isinstance(with_details,
bool)
if with_details:
res_str = ""
for block in self.blocks:
res_str += block.to_string(throw_on_error, with_details)
else:
protostr = self.desc.serialize_to_string()
proto = framework_pb2.ProgramDesc.FromString(str(protostr))
res_str = _debug_string_(proto, throw_on_error)
return res_str
def get_desc(self):
return self.desc
......@@ -950,6 +1006,36 @@ class Parameter(Variable):
self.gradient_clip_attr = kwargs.get('gradient_clip_attr', None)
def __str__(self):
return self.to_string(True)
def to_string(self, throw_on_error, with_details=False):
"""
To debug string.
Args:
throw_on_error(bool): raise exception when self is not initialized
when throw_on_error is True
with_details(bool): more details about variables and parameters
(e.g. trainable, optimize_attr, ...) will be printed when with_details is True
Returns(str): The debug string.
"""
assert isinstance(throw_on_error, bool) and isinstance(with_details,
bool)
if with_details:
res_str = Variable.to_string(self, throw_on_error, True)
additional_attr = ("trainable", "optimize_attr", "regularizer",
"gradient_clip_attr")
for attr_name in additional_attr:
res_str += "%s: %s\n" % (attr_name,
str(getattr(self, attr_name)))
else:
res_str = Variable.to_string(self, throw_on_error, False)
return res_str
__repr__ = __str__
# program is a global instance.
_main_program_ = Program()
......
......@@ -18,7 +18,7 @@ import itertools
from framework import Variable, Parameter, default_main_program, default_startup_program, \
unique_name, dtype_is_floating
from paddle.v2.fluid.initializer import Constant, Xavier
from param_attr import ParamAttr
from param_attr import ParamAttr, WeightNormParamAttr
class LayerHelper(object):
......@@ -104,6 +104,177 @@ class LayerHelper(object):
(dtype, each.dtype))
return dtype
def _create_weight_normalize(self, attr, shape, dtype):
from .layers import elementwise_mul, elementwise_div, reshape
# Remove these ops when LayerHelper and layers support indicating
# program and block.
def __norm_op(x,
out=None,
p=2,
dim=None,
keep_dim=False,
block=self.startup_program.global_block()):
if out is None:
out = block.create_var(
name=unique_name(".".join([self.name, 'weight_norm_norm'])),
dtype=dtype,
persistable=False)
abs_out = block.create_var(
name=unique_name(".".join([self.name, 'weight_norm_abs'])),
dtype=dtype,
persistable=False)
block.append_op(
type='abs', inputs={'X': x}, outputs={'Out': abs_out})
pow_out = block.create_var(
name=unique_name(".".join([self.name, 'weight_norm_pow'])),
dtype=dtype,
persistable=False)
block.append_op(
type='pow',
inputs={'X': abs_out},
outputs={'Out': pow_out},
attrs={'factor': float(p)})
sum_out = block.create_var(
name=unique_name(".".join([self.name, 'weight_norm_sum'])),
dtype=dtype,
persistable=False)
block.append_op(
type='reduce_sum',
inputs={'X': pow_out},
outputs={'Out': sum_out},
attrs={
'dim': dim,
'keep_dim': keep_dim,
'reduce_all': True if dim is None else False
})
block.append_op(
type='pow',
inputs={'X': sum_out},
outputs={'Out': out},
attrs={'factor': 1. / p})
return out
def __reshape_op(x,
shape,
out=None,
block=self.startup_program.global_block()):
if out is None:
out = block.create_var(
name=unique_name(".".join(
[self.name, 'weight_norm_reshape'])),
dtype=dtype,
persistable=False)
block.append_op(
type='reshape',
inputs={'X': x},
outputs={'Out': out},
attrs={'shape': shape})
return out
def __transpose_op(x,
axis,
out=None,
block=self.startup_program.global_block()):
if out is None:
out = block.create_var(
name=unique_name(".".join(
[self.name, 'weight_norm_transpose'])),
dtype=dtype,
persistable=False)
block.append_op(
type='transpose',
inputs={'X': x},
outputs={'Out': out},
attrs={'axis': axis})
return out
def __norm_except_dim(x,
out=None,
dim=None,
block=self.startup_program.global_block()):
"""Computes the norm over all dimensions except dim"""
if out is None:
out = block.create_var(
name=unique_name(".".join([self.name, 'weight_norm_norm'])),
dtype=dtype,
persistable=False)
if dim is None:
__norm_op(x, out, dim=dim, block=block)
elif dim == 0:
out_shape = [x.shape[0]] + [1] * (len(x.shape) - 1)
reshape = __reshape_op(x, shape=[x.shape[0], -1], block=block)
norm = __norm_op(reshape, dim=1, block=block)
__reshape_op(norm, out=out, shape=out_shape, block=block)
elif dim == len(x.shape) - 1:
out_shape = [1] * (len(x.shape) - 1) + [x.shape[-1]]
reshape = __reshape_op(x, shape=[-1, x.shape[-1]], block=block)
norm = __norm_op(reshape, dim=0, block=block)
__reshape_op(norm, out=out, shape=out_shape, block=block)
else:
perm = range(len(x.shape))
perm[0], perm[dim] = dim, 0
transpose = __transpose_op(x, perm, block=block)
norm = __norm_op(transpose, dim=0, block=block)
__transpose_op(norm, perm, out=out, block=block)
return out
def __weight_normalize(g, v, dim):
"""Calculations for weight normalization"""
norm = __norm_except_dim(
v, dim=dim, block=self.main_program.current_block())
scale = elementwise_div(
x=g, y=norm) # The shapes of g and norm are the same.
# Currently, elementwise_mul only support broadcast when the shape
# of y is a subset of the shape of x. Thus, we reshape y to squeeze
# to achive the subset.
w = elementwise_mul(
x=v,
y=scale if dim is None else reshape(
x=scale, shape=[v.shape[dim]]),
axis=-1 if dim is None else dim)
# To serialize the original parameter for inference, maybe a
# parameter rather than a variable should be returned.
return w
g_param_attr = copy.deepcopy(attr)
g_param_attr.name = attr.name + '_g'
g_param_shape = [1] * len(shape)
if attr.dim is not None:
g_param_shape[attr.dim] = shape[attr.dim]
v_param_attr = copy.deepcopy(attr)
v_param_attr.name = attr.name + '_v'
v_param_shape = shape
# Add to startup_program to initialize g and v.
# Try to reconstruct the initializer of w by initializing g and v.
# Set the initializers of g and v as below, then the distribution
# of w is the same as initializing w with the given initializer.
# For Data-Dependent Initialization, please compute the init-values
# of g and v in external and then feed the values to g and v by
# executing an extra program.
g_param = self.startup_program.global_block().create_parameter(
dtype=dtype,
shape=g_param_shape,
**g_param_attr.to_kwargs(with_initializer=False))
v_param = self.startup_program.global_block().create_parameter(
dtype=dtype,
shape=v_param_shape,
**v_param_attr.to_kwargs(with_initializer=True))
__norm_except_dim(
x=v_param,
out=g_param,
dim=attr.dim,
block=self.startup_program.global_block())
# Add weight normalization to main_program
g_param = self.main_program.global_block().create_parameter(
dtype=dtype, shape=g_param_shape, **g_param_attr.to_kwargs())
v_param = self.main_program.global_block().create_parameter(
dtype=dtype, shape=v_param_shape, **v_param_attr.to_kwargs())
w_param = __weight_normalize(g_param, v_param, dim=attr.dim)
return w_param
def create_parameter(self,
attr,
shape,
......@@ -114,16 +285,23 @@ class LayerHelper(object):
attr = copy.deepcopy(attr)
assert isinstance(attr, ParamAttr)
suffix = 'b' if is_bias else 'w'
if attr.name is None:
attr.name = unique_name(".".join([self.name, suffix]))
if default_initializer is None:
if default_initializer is None and attr.initializer is None:
if is_bias:
attr.set_default_bias_initializer()
else:
attr.set_default_param_initializer()
else:
attr.set_default_initializer(default_initializer)
if attr.name is None:
attr.name = unique_name(".".join([self.name, suffix]))
# If weight normalization is set, insert extra parameters and ops.
# Refer to https://arxiv.org/pdf/1602.07868.pdf
if isinstance(attr, WeightNormParamAttr):
param = self._create_weight_normalize(attr, shape, dtype)
WeightNormParamAttr.params_with_weight_norm.append(param)
return param
self.startup_program.global_block().create_parameter(
dtype=dtype, shape=shape, **attr.to_kwargs(with_initializer=True))
......
......@@ -14,8 +14,10 @@
from .. import core
from ..layer_helper import LayerHelper
from control_flow import BlockGuard
from ..layer_helper import LayerHelper
__all__ = ['data']
__all__ = ['data', 'BlockGuardServ', 'ListenAndServ', 'Send']
def data(name,
......@@ -74,3 +76,123 @@ def data(name,
type=type,
stop_gradient=stop_gradient,
lod_level=lod_level)
class BlockGuardServ(BlockGuard):
"""
BlockGuardServ class.
BlockGuardServ class is used to create an op with a block in a program.
"""
def __init__(self, server):
if not (isinstance(server, ListenAndServ)):
raise TypeError("BlockGuardServ takes a ListenAndServ")
super(BlockGuardServ, self).__init__(server.helper.main_program)
self.server = server
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
return False
self.server.complete_op()
return super(BlockGuardServ, self).__exit__(exc_type, exc_val, exc_tb)
class ListenAndServ(object):
"""
ListenAndServ class.
ListenAndServ class is used to wrap listen_and_serv op to create a server
which can receive variables from clients and run a block.
"""
def __init__(self, endpoint, fan_in=1, optimizer_mode=True):
self.helper = LayerHelper("recv")
self.inputs = []
self.outputs = []
self.endpoint = endpoint
self.fan_in = fan_in
# FIXME(typhoonzero): add optimizer_mode is stupid, should make it more
# general.
self.optimizer_mode = optimizer_mode
def do(self):
return BlockGuardServ(self)
def get_params_and_grads(self):
main_program = self.helper.main_program
current_block = main_program.current_block()
parent_block = self.parent_block()
# params and grads in the same order.
params = list()
grads = list()
for op in current_block.ops:
# FIXME(typhoonzero): op.inputs is None if it's cloned.
if self.optimizer_mode:
if "Grad" in op.inputs and "Param" in op.inputs:
params.append(op.inputs["Param"].name)
grads.append(op.inputs["Grad"].name)
else:
# simple recv mode, recv operators inputs.
for iname in op.input_names:
for in_var_name in op.input(iname):
params.append(parent_block.var(in_var_name))
grads.append(parent_block.var(in_var_name))
return params, grads
def parent_block(self):
prog = self.helper.main_program
parent_idx = prog.current_block().parent_idx
assert parent_idx >= 0
parent_block = prog.block(parent_idx)
return parent_block
def complete_op(self):
main_program = self.helper.main_program
current_block = main_program.current_block()
parent_block = self.parent_block()
params, grads = self.get_params_and_grads()
param_names = [p.name for p in params]
grad_names = [g.name for g in grads]
parent_block.append_op(
type='recv',
inputs={},
outputs={},
attrs={
'endpoint': self.endpoint,
'Fanin': self.fan_in,
'ParamList': param_names,
'GradList': grad_names,
'OptimizeBlock': current_block
})
def Send(endpoints, send_vars, get_vars):
"""
Send layer
Args:
endpoints: comma seperated IP:PORT pairs in the order
of send_vars to send
send_vars: vars to send
get_vars: vars to get from server after send completes.
Send variables to the server side, and get vars from server
side when server have finished running server side program.
"""
assert (type(send_vars) == list)
assert (type(get_vars) == list)
epmap = endpoints.split(",")
endpoints = list(set(epmap))
helper = LayerHelper("Send", **locals())
helper.append_op(
type="send",
inputs={"X": send_vars},
outputs={"Out": get_vars},
attrs={"endpoints": endpoints,
"epmap": epmap})
......@@ -15,7 +15,10 @@
from initializer import Initializer, Xavier, Constant
from regularizer import WeightDecayRegularizer
__all__ = ['ParamAttr']
__all__ = [
'ParamAttr',
'WeightNormParamAttr',
]
class ParamAttr(object):
......@@ -82,3 +85,20 @@ class ParamAttr(object):
if with_initializer:
kwargs['initializer'] = self.initializer
return kwargs
class WeightNormParamAttr(ParamAttr):
"""
Used for weight normalization. Any field in ParamAttr can also be set here.
Besides, an extra field dim can be set to indicate the dimension except
which to normalize.
"""
# List to record the parameters reparameterized by weight normalization.
# If these parameters are treated as Variable rather than Parameter,
# it can be used to discriminate these parameters and help to serialize
# these paramters for inference.
params_with_weight_norm = []
def __init__(self, dim=None, **kwargs):
super(WeightNormParamAttr, self).__init__(**kwargs)
self.dim = dim
......@@ -87,6 +87,11 @@ class WeightDecayRegularizer(object):
"""
raise NotImplementedError()
def __str__(self):
"""Debug string
"""
raise NotImplementedError()
class L2DecayRegularizer(WeightDecayRegularizer):
"""Implements the L2 Weight Decay Regularization
......@@ -123,6 +128,9 @@ class L2DecayRegularizer(WeightDecayRegularizer):
return decay
def __str__(self):
return "L2Decay, regularization_coeff=%f" % self._regularization_coeff
class L1DecayRegularizer(WeightDecayRegularizer):
"""Implements the L1 Weight Decay Regularization
......@@ -163,6 +171,9 @@ class L1DecayRegularizer(WeightDecayRegularizer):
return decay
def __str__(self):
return "L1Decay, regularization_coeff=%f" % self._regularization_coeff
# We short the class name, since users will use the regulaizer with the package
# name. The sample code:
......
file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py")
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
if(NOT WITH_DISTRIBUTE)
list(REMOVE_ITEM TEST_OPS test_recv_op)
endif(NOT WITH_DISTRIBUTE)
foreach(src ${TEST_OPS})
py_test(${src} SRCS ${src}.py)
endforeach()
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle.v2.fluid as fluid
import paddle.v2.fluid.layers as layers
import numpy
from multiprocessing import Process
import os, sys
class TestRecvOp(unittest.TestCase):
def test_send(self):
# Run init_serv in a thread
place = fluid.CPUPlace()
p = Process(target=self.init_serv, args=(place, ))
p.daemon = True
p.start()
self.init_client(place)
# FIXME(typhoonzero): find a way to gracefully shutdown the server.
os.system("kill -9 %d" % p.pid)
p.join()
def init_serv(self, place):
main = fluid.Program()
with fluid.program_guard(main):
x = layers.data(
shape=[32, 32],
dtype='float32',
name="X",
append_batch_size=False)
fluid.initializer.Constant(value=1.0)(x, main.global_block())
serv = layers.ListenAndServ("127.0.0.1:6174", optimizer_mode=False)
with serv.do():
o = layers.scale(x=x, scale=10.0)
main.global_block().create_var(
name=o.name, psersistable=False, dtype=o.dtype, shape=o.shape)
exe = fluid.Executor(place)
exe.run(main)
def init_client(self, place):
main = fluid.Program()
with fluid.program_guard(main):
x = layers.data(
shape=[32, 32],
dtype='float32',
name='X',
append_batch_size=False)
fluid.initializer.Constant(value=1.0)(x, main.global_block())
layers.Send("127.0.0.1:6174", [x], [x])
exe = fluid.Executor(place)
exe.run(main)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy
import collections
import paddle.v2.fluid as fluid
import paddle.v2.fluid.core as core
from paddle.v2.fluid.initializer import ConstantInitializer
from paddle.v2.fluid.param_attr import WeightNormParamAttr
class TestWeightNormalization(unittest.TestCase):
batch_size = 3
hidden_size = 5
data_desc = (['x', [10], 0], )
@classmethod
def setUpClass(cls):
cls.set_program()
@classmethod
def set_program(cls):
data = fluid.layers.data(
name=cls.data_desc[0][0], shape=cls.data_desc[0][1])
out = fluid.layers.fc(input=data,
size=cls.hidden_size,
param_attr=WeightNormParamAttr(
dim=None,
name='weight_norm_param',
initializer=ConstantInitializer(1.0)),
bias_attr=False,
act=None)
loss = fluid.layers.reduce_sum(out)
fluid.backward.append_backward(loss=loss)
cls.fetch_list = [
'weight_norm_param_g', 'weight_norm_param_v',
'weight_norm_param_g@GRAD'
]
def run_program(self):
outputs = []
places = [core.CPUPlace()]
if core.is_compiled_with_cuda():
places.append(core.CUDAPlace(0))
for place in places:
self.set_inputs(place)
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
output = exe.run(fluid.default_main_program(),
feed=self.inputs,
fetch_list=self.fetch_list,
return_numpy=False)
outputs.append(output)
self.actual_outputs = outputs
def set_data(self):
self.data = collections.OrderedDict()
for desc in self.data_desc:
data_name = desc[0]
data_shape = desc[1]
data_lod_level = desc[2]
data_lod = []
for i in range(data_lod_level):
lod_level_i = numpy.random.randint(
low=1,
high=5,
size=self.batch_size if i == 0 else lod_level_i[-1])
lod_level_i = [0] + numpy.cumsum(lod_level_i).tolist()
data_lod.append(lod_level_i)
data_value = numpy.random.random(
size=[data_lod[-1][-1] if data_lod else self.batch_size
] + data_shape).astype('float32')
self.data[data_name] = (data_value, data_lod)
def set_inputs(self, place):
self.inputs = {}
for desc in self.data_desc:
tensor = fluid.Tensor()
tensor.set(self.data[desc[0]][0], place)
if self.data[desc[0]][1]:
tensor.set_lod(self.data[desc[0]][1])
self.inputs[desc[0]] = tensor
def weight_normalize(self):
v = numpy.ones((self.data[self.data_desc[0][0]][0].shape[-1],
self.hidden_size))
g = numpy.linalg.norm(v, axis=None, keepdims=True)
w = g * v / numpy.linalg.norm(v, axis=None, keepdims=True)
x = self.data[self.data_desc[0][0]][0]
out = numpy.dot(x, w)
g_grad = (numpy.dot(x.T, numpy.ones_like(out)) * (v / numpy.linalg.norm(
v, axis=None, keepdims=True))).sum(axis=None, keepdims=True)
return g, v, g_grad
def test_weight_normalization(self):
self.set_data()
self.run_program()
expect_output = self.weight_normalize()
for actual_output in self.actual_outputs:
[
self.assertTrue(
numpy.allclose(
numpy.array(actual), expect, atol=0.001))
for expect, actual in zip(expect_output, actual_output)
]
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册