提交 dbb4f0d3 编写于 作者: T tangwei12

Merge branch 'develop' of github.com:PaddlePaddle/Paddle into dis_ckpt_fix

......@@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <stack>
#include <vector>
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/node.h"
......
......@@ -8,7 +8,7 @@ cc_library(analysis SRCS pass_manager.cc dot.cc node.cc data_flow_graph.cc graph
helper.cc
model_store_pass.cc
DEPS framework_proto proto_desc)
cc_test(test_node SRCS node_tester.cc DEPS analysis)
cc_test(test_node SRCS node_tester.cc DEPS analysis gflags glog gtest)
cc_test(test_dot SRCS dot_tester.cc DEPS analysis)
cc_binary(inference_analyzer SRCS analyzer_main.cc DEPS analysis)
......
......@@ -20,17 +20,6 @@ namespace paddle {
namespace inference {
namespace analysis {
template <>
std::string &NodeAttr::As<std::string>() {
if (data_.empty()) {
type_index_ = std::type_index(typeid(std::string));
}
PADDLE_ENFORCE_EQ(type_index_, std::type_index(typeid(std::string)));
return data_;
}
std::string &NodeAttr::String() { return As<std::string>(); }
std::vector<Dot::Attr> Value::dot_attrs() const {
return std::vector<Dot::Attr>({Dot::Attr("style", "filled,rounded"),
Dot::Attr("shape", "box"),
......
......@@ -29,6 +29,7 @@ limitations under the License. */
#include "paddle/fluid/inference/analysis/device.h"
#include "paddle/fluid/inference/analysis/dot.h"
#include "paddle/fluid/inference/analysis/helper.h"
#include "paddle/fluid/platform/variant.h"
namespace paddle {
namespace inference {
......@@ -38,39 +39,35 @@ class NodeMap;
// A helper class to maintain the status from Pass.
struct NodeAttr {
using any_t =
boost::variant<bool, float, int32_t, int64_t, void *, std::string>;
// NOTE T should be a primary type or a struct combined by several primary
// types.
// NOTE the STL containers should not use here.
// Some usages
// Attr attr;
// attr.Bool() = true;
bool &Bool() { return As<bool>(); }
float &Float() { return As<float>(); }
int32_t &Int32() { return As<int32_t>(); }
int64_t &Int64() { return As<int64_t>(); }
void *&Pointer() { return As<void *>(); }
std::string &String();
std::string &String() { return As<std::string>(); }
private:
template <typename T>
T &As() {
// init storage in the first usage.
if (data_.empty()) {
VLOG(4) << "resize data to " << sizeof(T);
type_index_ = std::type_index(typeid(T));
data_.resize(sizeof(T));
if (type_index_ == typeid(NodeAttr)) {
type_index_ = typeid(T);
any_data_ = T();
} else {
PADDLE_ENFORCE(type_index_ == typeid(T), "fetch error type");
}
PADDLE_ENFORCE(framework::IsType<T>(type_index_),
"type not matched, origin is %s, want %s",
DataTypeNamer::Global().repr(type_index_),
DataTypeNamer::Global().repr<T>());
PADDLE_ENFORCE_EQ(data_.size(), sizeof(T), "Node attr type recast error");
return *reinterpret_cast<T *>(&data_[0]);
return boost::get<T>(any_data_);
}
private:
std::string data_;
any_t any_data_;
std::type_index type_index_{typeid(NodeAttr)};
};
......
......@@ -20,6 +20,24 @@ namespace paddle {
namespace inference {
namespace analysis {
TEST(NodeAttr, bool) {
NodeAttr x;
x.Bool() = true;
ASSERT_EQ(x.Bool(), true);
}
TEST(NodeAttr, int32) {
NodeAttr x;
x.Int32() = 32;
ASSERT_EQ(x.Int32(), 32);
}
TEST(NodeAttr, string) {
NodeAttr x;
x.String() = "Hello";
ASSERT_EQ(x.String(), "Hello");
}
TEST(Node, Attr) {
// Node is an abstract class, use Value instead for they share the same Attr
// logic.
......@@ -27,6 +45,9 @@ TEST(Node, Attr) {
auto* node = nodes.Create(Node::Type::kValue);
node->attr("v0").Int32() = 2008;
ASSERT_EQ(node->attr("v0").Int32(), 2008);
node->attr("str").String() = "hello world";
ASSERT_EQ(node->attr("str").String(), "hello world");
}
} // namespace analysis
......
......@@ -57,6 +57,8 @@ class RecvOp : public framework::OperatorBase {
class RecvOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() {
AddInput("X", "(Any) Dummy inputs, used for control dependency")
.AsDuplicable();
AddOutput("Out", "(Tensor) Variables to get from server.").AsDuplicable();
AddComment(R"DOC(
Recv operator
......
......@@ -37,22 +37,19 @@ class SendBarrierOp : public framework::OperatorBase {
void RunImpl(const framework::Scope& scope,
const platform::Place& place) const override {
std::vector<std::string> eps = Attr<std::vector<std::string>>("endpoints");
bool sync_mode = Attr<bool>("sync_mode");
distributed::RPCClient* rpc_client =
distributed::RPCClient::GetInstance<RPCCLIENT_T>();
VLOG(3) << "SendBarrierOp sync_mode:" << sync_mode;
VLOG(3) << "SendBarrierOp sync";
// need to wait before sending send_barrier message
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
if (sync_mode) {
for (auto& ep : eps) {
VLOG(3) << "send barrier, ep: " << ep;
rpc_client->AsyncSendBatchBarrier(ep);
}
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
for (auto& ep : eps) {
VLOG(3) << "send barrier, ep: " << ep;
rpc_client->AsyncSendBatchBarrier(ep);
}
PADDLE_ENFORCE(rpc_client->Wait(), "internal error in RPCClient");
}
};
......@@ -70,7 +67,6 @@ the Parameter Server would knew all variables have been sent.
"(string vector, default 127.0.0.1:6164)"
"Server endpoints to send variables to.")
.SetDefault({"127.0.0.1:6164"});
AddAttr<bool>("sync_mode", "work in sync_mode or not").SetDefault(true);
}
};
......
......@@ -66,6 +66,8 @@ class SendOpMaker : public framework::OpProtoAndCheckerMaker {
void Make() {
AddInput("X", "(Tensor, SelectedRows) Input variables to be sent")
.AsDuplicable();
AddOutput("Out", "(Any) Dummy outputs, used for control dependency")
.AsDuplicable();
AddComment(R"DOC(
Send operator
......
......@@ -36,7 +36,7 @@ __forceinline__ __device__ T CudaShuffleDownSync(unsigned mask, T val,
#if CUDA_VERSION < 9000
return __shfl_down(val, delta, width);
#else
return __shfl_down_sync(mask, val, delta, width);
return __shfl_down_sync(mask, val, static_cast<unsigned>(delta), width);
#endif
}
......@@ -46,9 +46,16 @@ template <>
__forceinline__ __device__ float16 CudaShuffleDownSync(unsigned mask,
float16 val, int delta,
int width) {
half tmp = static_cast<half>(val);
__shfl_down(tmp, static_cast<unsigned>(delta), width);
return float16(tmp);
return float16(
__shfl_down(static_cast<half>(val), static_cast<unsigned>(delta), width));
}
#else
template <>
__forceinline__ __device__ float16 CudaShuffleDownSync(unsigned mask,
float16 val, int delta,
int width) {
return float16(__shfl_down_sync(mask, static_cast<half>(val),
static_cast<unsigned>(delta), width));
}
#endif
......
......@@ -13,6 +13,7 @@
// limitations under the License.
#include <gtest/gtest.h>
#include <algorithm>
#include <iostream>
#include <random>
......@@ -123,7 +124,7 @@ void TestUnalign(size_t num, const int shift_bit) {
cudaMemcpy(out, d_in2, array_size, cudaMemcpyDeviceToHost);
cudaDeviceSynchronize();
for (size_t i = 0; i < num / 2; ++i) {
// NOTE(dzhwinter): the float16 add has small underflow/overflow
// NOTE(dzhwinter): the float16 add has small truncate error.
// so we use EXPECT_NEAR to check the result.
EXPECT_NEAR(static_cast<float>(out[i]),
static_cast<float>(AddFunctor<float16>()(r_in1[i], r_in2[i])),
......@@ -151,3 +152,83 @@ TEST(CudaAtomic, float16Unalign) {
TestUnalign(static_cast<size_t>(1024), /*shift_bit*/ 3);
TestUnalign(static_cast<size_t>(1024 * 1024), /*shift_bit*/ 3);
}
// https://devblogs.nvidia.com/faster-parallel-reductions-kepler/
template <typename T>
static __forceinline__ __device__ T WarpReduceSum(T val) {
unsigned mask = 0u;
CREATE_SHFL_MASK(mask, true);
for (int offset = warpSize / 2; offset > 0; offset /= 2) {
val += paddle::platform::CudaShuffleDownSync(mask, val, offset);
}
return val;
}
template <typename T>
__forceinline__ __device__ T BlockReduce(T val) {
static __shared__ T shared[32]; // Shared mem for 32 partial sums
int lane = threadIdx.x % warpSize;
int wid = threadIdx.x / warpSize;
val = WarpReduceSum(val); // Each warp performs partial reduction
if (lane == 0) shared[wid] = val; // Write reduced value to shared memory
__syncthreads(); // Wait for all partial reductions
// read from shared memory only if that warp existed
val =
(threadIdx.x < blockDim.x / warpSize) ? shared[lane] : static_cast<T>(0);
if (wid == 0) val = WarpReduceSum(val); // Final reduce within first warp
return val;
}
template <typename T>
__global__ void DeviceReduceSum(T* in, T* out, size_t N) {
T sum(0);
for (int i = blockIdx.x * blockDim.x + threadIdx.x; i < N;
i += blockDim.x * gridDim.x) {
sum += in[i];
}
sum = BlockReduce<T>(sum);
__syncthreads();
if (threadIdx.x == 0) out[blockIdx.x] = sum;
}
template <typename T>
void TestReduce(size_t num, float atol = 0.01) {
T* in1;
T *d_in1, *d_in2;
size_t size = sizeof(T) * num;
cudaMalloc(reinterpret_cast<void**>(&d_in1), size);
cudaMalloc(reinterpret_cast<void**>(&d_in2), sizeof(T));
in1 = reinterpret_cast<T*>(malloc(size));
std::minstd_rand engine;
std::uniform_real_distribution<double> dist(0.0, 1.0);
for (size_t i = 0; i < num; ++i) {
in1[i] = static_cast<T>(dist(engine));
}
auto out = std::accumulate(in1, in1 + num, static_cast<T>(0));
cudaMemcpy(d_in1, in1, size, cudaMemcpyHostToDevice);
cudaDeviceSynchronize();
DeviceReduceSum<T><<<1, PADDLE_CUDA_NUM_THREADS>>>(d_in1, d_in2, num);
cudaMemcpy(in1, d_in2, sizeof(T), cudaMemcpyDeviceToHost);
cudaDeviceSynchronize();
// NOTE(dzhwinter): the float16 add has small underflow/overflow
// so we use EXPECT_NEAR to check the result.
EXPECT_NEAR(static_cast<float>(in1[0]), static_cast<float>(out), atol);
free(in1);
cudaFree(d_in1);
cudaFree(d_in2);
}
TEST(CudaShuffleSync, float16) {
TestReduce<float>(10);
TestReduce<float>(1000);
// float16 will overflow or accumulate truncate errors in big size.
TestReduce<float16>(10);
TestReduce<float16>(100, /*atol error*/ 1.0);
}
......@@ -54,7 +54,7 @@ function cpu_config() {
if [ $platform == "Linux" ]; then
ht=`lscpu |grep "per core"|awk -F':' '{print $2}'|xargs`
elif [ $platform == "Darwin" ]; then
if [`sysctl -n hw.physicalcpu` -eq `sysctl -n hw.logicalcpu`]; then
if [ `sysctl -n hw.physicalcpu` -eq `sysctl -n hw.logicalcpu` ]; then
# HT is OFF
ht=1
fi
......
......@@ -24,7 +24,6 @@ import paddle.dataset.common
import subprocess
import numpy
import platform
import six
import tempfile
from six.moves import range
__all__ = ['train', 'test', 'convert']
......
......@@ -24,7 +24,7 @@ from .layer_function_generator import templatedoc
from .. import core
from ..executor import global_scope
from ..framework import convert_np_dtype_to_dtype_, default_main_program, \
default_startup_program, program_guard, Program
default_startup_program, program_guard, Program, Variable
from ..layer_helper import LayerHelper
from ..unique_name import generate as unique_name
......@@ -209,7 +209,7 @@ class ListenAndServ(object):
})
def Send(endpoints, send_vars, sync=True):
def Send(endpoints, send_vars, dummy_output=None, sync=True):
"""
Send variables to the server side, and get vars from server
side when server have finished running server side program.
......@@ -223,6 +223,13 @@ def Send(endpoints, send_vars, sync=True):
"""
assert (type(send_vars) == list)
if dummy_output is None:
dummy_output = []
elif isinstance(dummy_output, Variable):
dummy_output = [dummy_output]
assert (type(dummy_output) == list)
epmap = endpoints.split(",")
endpoints = list(set(epmap))
......@@ -232,6 +239,7 @@ def Send(endpoints, send_vars, sync=True):
helper.append_op(
type="send",
inputs={"X": send_vars},
outputs={"Out": dummy_output},
attrs={
"endpoints": endpoints,
"epmap": epmap,
......@@ -241,7 +249,7 @@ def Send(endpoints, send_vars, sync=True):
helper.append_op(type="send_barrier", attrs={"endpoints": endpoints})
def Recv(endpoints, get_vars, sync=True):
def Recv(endpoints, get_vars, dummy_input=None, sync=True):
"""
Receive variables from server side
......@@ -256,13 +264,20 @@ def Recv(endpoints, get_vars, sync=True):
"""
assert (type(get_vars) == list)
if dummy_input is None:
dummy_input = []
elif isinstance(dummy_input, Variable):
dummy_input = [dummy_input]
assert (type(dummy_input) == list)
epmap = endpoints.split(",")
endpoints = list(set(epmap))
helper = LayerHelper("Recv", **locals())
helper.append_op(
type="recv",
inputs={"X": get_vars},
inputs={"X": dummy_input},
outputs={"Out": get_vars},
attrs={"endpoints": endpoints,
"epmap": epmap})
......
......@@ -16,7 +16,6 @@ from __future__ import print_function
import numpy as np
import argparse
import six
import time
import math
......
......@@ -34,6 +34,7 @@ import math
import random
import numpy as np
import collections
import six
from .ps_dispatcher import RoundRobin, HashName, PSDispatcher
from .. import core, framework
......@@ -210,6 +211,9 @@ class DistributeTranspiler(object):
ps_dispatcher = self.config.split_method(self.pserver_endpoints)
self.has_distributed_lookup_table = self._has_distributed_lookup_table()
self.param_name_to_grad_name = dict()
for param_var, grad_var in self.params_grads:
self.param_name_to_grad_name[param_var.name] = grad_var.name
# add distributed attrs to program
self.origin_program._is_distributed = True
......@@ -236,34 +240,39 @@ class DistributeTranspiler(object):
random.seed(self.origin_program.random_seed)
random.shuffle(grad_var_mapping_items)
for orig_varname, splited_vars in grad_var_mapping_items:
grad_name_to_send_dummy_out = dict()
for grad_varname, splited_vars in grad_var_mapping_items:
eplist = ps_dispatcher.dispatch(splited_vars)
if not self.config.slice_var_up:
assert (len(splited_vars) == 1)
splited_grad_varname = grad_varname
if len(splited_vars) == 1:
orig_varname = splited_vars[0].name
splited_grad_varname = splited_vars[0].name
index = find_op_by_output_arg(program.global_block(),
orig_varname)
splited_grad_varname)
elif len(splited_vars) > 1:
orig_var = program.global_block().vars[orig_varname]
orig_var = program.global_block().vars[splited_grad_varname]
index = find_op_by_output_arg(program.global_block(),
orig_varname)
splited_grad_varname)
self._insert_split_op(program, orig_var, index, splited_vars)
index += 1
else:
AssertionError("Can not insert the send op by original "
"variable name :", orig_varname)
"variable name :", splited_grad_varname)
dummy_output = program.global_block().create_var()
grad_name_to_send_dummy_out[grad_varname] = dummy_output
program.global_block()._insert_op(
index=index + 1,
type="send",
inputs={"X": splited_vars},
outputs={},
outputs={"Out": dummy_output},
attrs={
"epmap": eplist,
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE,
"sync_mode": not self.sync_mode,
})
for _, var in enumerate(splited_vars):
send_vars.append(var)
......@@ -275,7 +284,6 @@ class DistributeTranspiler(object):
outputs={},
attrs={
"endpoints": pserver_endpoints,
"sync_mode": self.sync_mode,
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
})
......@@ -291,19 +299,21 @@ class DistributeTranspiler(object):
self.param_grad_ep_mapping[ep]["grads"].append(send_vars[i])
# step4: Concat the parameters splits together after recv.
for varname, splited_var in six.iteritems(self.param_var_mapping):
for param_varname, splited_var in six.iteritems(self.param_var_mapping):
eps = []
for var in splited_var:
index = [v.name for v in recv_vars].index(var.name)
eps.append(eplist[index])
grad_send_dummy_out = grad_name_to_send_dummy_out[
self.param_name_to_grad_name[param_varname]]
program.global_block().append_op(
type="recv",
inputs={},
inputs={"X": [grad_send_dummy_out]},
outputs={"Out": splited_var},
attrs={
"epmap": eps,
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE,
"sync_mode": not self.sync_mode
})
if self.sync_mode:
......@@ -316,10 +326,10 @@ class DistributeTranspiler(object):
RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE
})
for varname, splited_var in six.iteritems(self.param_var_mapping):
for param_varname, splited_var in six.iteritems(self.param_var_mapping):
if len(splited_var) <= 1:
continue
orig_param = program.global_block().vars[varname]
orig_param = program.global_block().vars[param_varname]
program.global_block().append_op(
type="concat",
inputs={"X": splited_var},
......@@ -387,7 +397,7 @@ class DistributeTranspiler(object):
op = startup_program.global_block().append_op(
type="recv",
inputs={},
inputs={"X": []},
outputs={"Out": splited_var},
attrs={
"epmap": eps,
......@@ -826,19 +836,21 @@ class DistributeTranspiler(object):
self.config.min_block_size)
assert (len(grad_blocks) == len(param_blocks))
# origin_varname -> [splited_var]
# origin_param_name -> [splited_param_vars]
self.param_var_mapping = self._create_vars_from_blocklist(
self.origin_program, param_blocks)
# origin_grad_name -> [splited_grad_vars]
self.grad_var_mapping = self._create_vars_from_blocklist(
self.origin_program,
grad_blocks,
add_trainer_suffix=self.trainer_num > 1)
# dict(grad_splited_var -> param_splited_var)
self.grad_param_mapping = collections.OrderedDict()
for g, p in zip(grad_blocks, param_blocks):
g_name, g_bid, _ = g.split(":")
p_name, p_bid, _ = p.split(":")
self.grad_param_mapping[self.grad_var_mapping[g_name][int(g_bid)]] = \
self.param_var_mapping[p_name][int(p_bid)]
self.param_var_mapping[p_name][int(p_bid)]
# create mapping of endpoint -> split var to create pserver side program
self.param_grad_ep_mapping = collections.OrderedDict()
......@@ -959,7 +971,7 @@ class DistributeTranspiler(object):
index=op_index + 2,
type="send",
inputs={'X': self.trainer_side_table_grad_list},
outputs={},
outputs={'Out': []},
attrs={
"sync_mode": True,
"epmap": pserver_endpoints,
......
......@@ -13,7 +13,7 @@ ENV PATH /opt/rh/devtoolset-2/root/usr/bin:$PATH
ENV LD_LIBRARY_PATH /opt/rh/devtoolset-2/root/usr/lib64:/opt/rh/devtoolset-2/root/usr/lib:/usr/local/lib64:/usr/local/lib:${LD_LIBRARY_PATH}
ENV PKG_CONFIG_PATH=/usr/local/lib/pkgconfig
RUN yum install -y sqlite-devel zlib-devel openssl-devel pcre-devel vim tk-devel tkinter libtool xz freetype-devel libpng-devel graphviz
RUN yum install -y sqlite-devel zlib-devel openssl-devel pcre-devel vim tk-devel tkinter libtool xz graphviz
COPY build_scripts /build_scripts
RUN bash build_scripts/build.sh && \
bash build_scripts/install_nccl2.sh && rm -r build_scripts
......
......@@ -28,7 +28,7 @@ AUTOCONF_HASH=954bd69b391edc12d6a4a51a2dd1476543da5c6bbf05a95b59dc0dd6fd4c2969
PYTHON_COMPILE_DEPS="zlib-devel bzip2-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel"
# Libraries that are allowed as part of the manylinux1 profile
MANYLINUX1_DEPS="glibc-devel libstdc++-devel glib2-devel libX11-devel libXext-devel libXrender-devel mesa-libGL-devel libICE-devel libSM-devel ncurses-devel"
MANYLINUX1_DEPS="glibc-devel libstdc++-devel glib2-devel libX11-devel libXext-devel libXrender-devel mesa-libGL-devel libICE-devel libSM-devel ncurses-devel freetype-devel libpng-devel"
# Get build utilities
MY_DIR=$(dirname "${BASH_SOURCE[0]}")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册