提交 b2d65384 编写于 作者: L liaogang

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	paddle/math/tests/test_perturbation.cpp
*.DS_Store *.DS_Store
build/ build/
*.user
...@@ -14,6 +14,7 @@ find_package(CUDA QUIET) ...@@ -14,6 +14,7 @@ find_package(CUDA QUIET)
find_package(Protobuf REQUIRED) find_package(Protobuf REQUIRED)
find_package(PythonLibs 2.7 REQUIRED) find_package(PythonLibs 2.7 REQUIRED)
find_package(PythonInterp 2.7 REQUIRED) find_package(PythonInterp 2.7 REQUIRED)
find_package(ZLIB REQUIRED)
find_package(NumPy) find_package(NumPy)
find_package(Threads REQUIRED) find_package(Threads REQUIRED)
find_package(Glog) find_package(Glog)
......
...@@ -58,8 +58,8 @@ set(COMMON_FLAGS ...@@ -58,8 +58,8 @@ set(COMMON_FLAGS
-fPIC -fPIC
-fno-omit-frame-pointer -fno-omit-frame-pointer
-Wall -Wall
# -Wextra -Wextra
# -Werror -Werror
-Wnon-virtual-dtor -Wnon-virtual-dtor
-Wdelete-non-virtual-dtor -Wdelete-non-virtual-dtor
-Wno-unused-parameter -Wno-unused-parameter
......
# MAC OS does not contain start-up and whole-archive args
if(APPLE)
set(GROUP_START "")
set(GROUP_END "")
set(ARCHIVE_START "")
set(ARCHIVE_END "")
else()
set(GROUP_START "-Wl,--start-group")
set(GROUP_END "-Wl,--end-group")
set(ARCHIVE_START "-Wl,--whole-archive")
set(ARCHIVE_END "-Wl,--no-whole-archive")
endif()
# Some common routine for paddle compile. # Some common routine for paddle compile.
# target_circle_link_libraries # target_circle_link_libraries
...@@ -23,17 +7,46 @@ endif() ...@@ -23,17 +7,46 @@ endif()
# Rest Arguments: libraries which link together. # Rest Arguments: libraries which link together.
function(target_circle_link_libraries TARGET_NAME) function(target_circle_link_libraries TARGET_NAME)
if(APPLE) if(APPLE)
set(LIBS)
set(inArchive OFF)
set(libsInArgn)
foreach(arg ${ARGN}) foreach(arg ${ARGN})
list(APPEND OSX_LIBRARIES "-Wl,-force_load" "${arg}") if(${arg} STREQUAL "ARCHIVE_START")
set(inArchive ON)
elseif(${arg} STREQUAL "ARCHIVE_END")
set(inArchive OFF)
else()
if(inArchive)
list(APPEND LIBS "-Wl,-force_load")
endif()
list(APPEND LIBS ${arg})
list(APPEND libsInArgn ${arg})
endif()
endforeach() endforeach()
list(REVERSE libsInArgn)
target_link_libraries(${TARGET_NAME} target_link_libraries(${TARGET_NAME}
${OSX_LIBRARIES} -lz) ${LIBS}
${libsInArgn})
else() # LINUX
set(LIBS)
foreach(arg ${ARGN})
if(${arg} STREQUAL "ARCHIVE_START")
list(APPEND LIBS "-Wl,--whole-archive")
elseif(${arg} STREQUAL "ARCHIVE_END")
list(APPEND LIBS "-Wl,--no-whole-archive")
else() else()
list(APPEND LIBS ${arg})
endif()
endforeach()
target_link_libraries(${TARGET_NAME} target_link_libraries(${TARGET_NAME}
${GROUP_START} "-Wl,--start-group"
${ARGN} ${LIBS}
-lz "-Wl,--end-group")
${GROUP_END})
endif() endif()
endfunction() endfunction()
...@@ -65,20 +78,20 @@ function(link_paddle_exe TARGET_NAME) ...@@ -65,20 +78,20 @@ function(link_paddle_exe TARGET_NAME)
if(PADDLE_WITH_INTERNAL) if(PADDLE_WITH_INTERNAL)
set(INTERAL_LIBS paddle_internal_gserver paddle_internal_parameter) set(INTERAL_LIBS paddle_internal_gserver paddle_internal_parameter)
target_circle_link_libraries(${TARGET_NAME} target_circle_link_libraries(${TARGET_NAME}
${ARCHIVE_START} ARCHIVE_START
paddle_internal_gserver paddle_internal_gserver
paddle_internal_owlqn paddle_internal_owlqn
${ARCHIVE_END} ARCHIVE_END
paddle_internal_parameter) paddle_internal_parameter)
else() else()
set(INTERAL_LIBS "") set(INTERAL_LIBS "")
endif() endif()
target_circle_link_libraries(${TARGET_NAME} target_circle_link_libraries(${TARGET_NAME}
${ARCHIVE_START} ARCHIVE_START
paddle_gserver paddle_gserver
${METRIC_LIBS} ${METRIC_LIBS}
${ARCHIVE_END} ARCHIVE_END
paddle_pserver paddle_pserver
paddle_trainer_lib paddle_trainer_lib
paddle_network paddle_network
...@@ -91,8 +104,10 @@ function(link_paddle_exe TARGET_NAME) ...@@ -91,8 +104,10 @@ function(link_paddle_exe TARGET_NAME)
${PROTOBUF_LIBRARY} ${PROTOBUF_LIBRARY}
${CMAKE_THREAD_LIBS_INIT} ${CMAKE_THREAD_LIBS_INIT}
${CBLAS_LIBS} ${CBLAS_LIBS}
${INTERAL_LIBS}
${ZLIB_LIBRARIES}
${CMAKE_DL_LIBS} ${CMAKE_DL_LIBS}
${INTERAL_LIBS}) )
if(WITH_PYTHON) if(WITH_PYTHON)
target_link_libraries(${TARGET_NAME} target_link_libraries(${TARGET_NAME}
......
...@@ -277,6 +277,7 @@ void NeuralNetwork::getState(MachineState& machineState) { ...@@ -277,6 +277,7 @@ void NeuralNetwork::getState(MachineState& machineState) {
} }
void NeuralNetwork::backward(const UpdateCallback& callback) { void NeuralNetwork::backward(const UpdateCallback& callback) {
gLayerStackTrace.pop(""); // tell layer trace is during backward.
FOR_EACH_R(layer, layers_) { FOR_EACH_R(layer, layers_) {
REGISTER_TIMER_INFO("BackwardTimer", (*layer)->getName().c_str()); REGISTER_TIMER_INFO("BackwardTimer", (*layer)->getName().c_str());
if ((*layer)->needGradient()) { if ((*layer)->needGradient()) {
......
...@@ -49,7 +49,7 @@ public: ...@@ -49,7 +49,7 @@ public:
*/ */
virtual void* alloc(size_t size) { virtual void* alloc(size_t size) {
void* ptr; void* ptr;
posix_memalign(&ptr, 32ul, size); CHECK_EQ(posix_memalign(&ptr, 32ul, size), 0);
CHECK(ptr) << "Fail to allocate CPU memory: size=" << size; CHECK(ptr) << "Fail to allocate CPU memory: size=" << size;
return ptr; return ptr;
} }
......
...@@ -38,7 +38,7 @@ static std::mt19937 RandomEngine(time(0)); ...@@ -38,7 +38,7 @@ static std::mt19937 RandomEngine(time(0));
inline static std::unique_ptr<float[]> NewVector(size_t len = VECTOR_LEN, inline static std::unique_ptr<float[]> NewVector(size_t len = VECTOR_LEN,
size_t align = ALIGN) { size_t align = ALIGN) {
float* ptr; float* ptr;
posix_memalign((void**)&ptr, align, len * sizeof(float)); CHECK_EQ(posix_memalign((void**)&ptr, align, len * sizeof(float)), 0);
return std::unique_ptr<float[]>(ptr); return std::unique_ptr<float[]>(ptr);
} }
......
...@@ -254,10 +254,4 @@ int main(int argc, char** argv) { ...@@ -254,10 +254,4 @@ int main(int argc, char** argv) {
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }
#else
int main(int argc, char const* argv[]) {
return 0;
}
#endif #endif
...@@ -125,9 +125,11 @@ TEST_F(CommonTest, sgdUpdate) { ...@@ -125,9 +125,11 @@ TEST_F(CommonTest, sgdUpdate) {
const size_t alignHeader[] = {0, 2, 3, 5, 7, 8}; const size_t alignHeader[] = {0, 2, 3, 5, 7, 8};
for (auto& size : sizeVec_) { for (auto& size : sizeVec_) {
real *gradientBuffer, *valueBuffer, *momentumBuffer; real *gradientBuffer, *valueBuffer, *momentumBuffer;
posix_memalign((void**)&gradientBuffer, 32, sizeof(real) * size); CHECK_EQ(posix_memalign((void**)&gradientBuffer, 32, sizeof(real) * size),
posix_memalign((void**)&valueBuffer, 32, sizeof(real) * size); 0);
posix_memalign((void**)&momentumBuffer, 32, sizeof(real) * size); CHECK_EQ(posix_memalign((void**)&valueBuffer, 32, sizeof(real) * size), 0);
CHECK_EQ(posix_memalign((void**)&momentumBuffer, 32, sizeof(real) * size),
0);
for (size_t i = 0; i < size; i++) { for (size_t i = 0; i < size; i++) {
gradientBuffer[i] = 1.0; gradientBuffer[i] = 1.0;
......
...@@ -2,11 +2,17 @@ ...@@ -2,11 +2,17 @@
file(GLOB UTIL_HEADERS . *.h) file(GLOB UTIL_HEADERS . *.h)
file(GLOB UTIL_SOURCES . *.cpp) file(GLOB UTIL_SOURCES . *.cpp)
if(APPLE)
file(GLOB UTIL_ARCH_SOURCES . arch/osx/*.cpp)
else()
file(GLOB UTIL_ARCH_SOURCES . arch/linux/*.cpp)
endif()
add_library(paddle_utils STATIC add_library(paddle_utils STATIC
${UTIL_SOURCES}) ${UTIL_SOURCES}
${UTIL_ARCH_SOURCES})
add_style_check_target(paddle_utils ${UTIL_HEADERS}) add_style_check_target(paddle_utils ${UTIL_HEADERS})
add_style_check_target(paddle_utils ${UTIL_SOURCES}) add_style_check_target(paddle_utils ${UTIL_SOURCES}
${UTIL_ARCH_SOURCES})
add_dependencies(paddle_utils gen_proto_cpp) add_dependencies(paddle_utils gen_proto_cpp)
if(WITH_TESTING) if(WITH_TESTING)
add_subdirectory(tests) add_subdirectory(tests)
......
...@@ -14,9 +14,44 @@ limitations under the License. */ ...@@ -14,9 +14,44 @@ limitations under the License. */
#include "CustomStackTrace.h" #include "CustomStackTrace.h"
#include "CommandLineParser.h"
#include <iostream>
P_DEFINE_bool(layer_stack_error_only_current_thread,
true,
"Dump current thread or whole process layer stack when signal error "
"occurred. true means only dump current thread layer stack");
namespace paddle { namespace paddle {
CustomStackTrace<std::string> gLayerStackTrace; CustomStackTrace<std::string> gLayerStackTrace;
static std::mutex gLayerStackTraceMtx;
void installLayerStackTracer() {
logging::installFailureWriter([](const char* data, int sz) {
std::lock_guard<std::mutex> guard(gLayerStackTraceMtx);
if (!gLayerStackTrace.empty()) {
size_t curTid = -1UL;
std::hash<std::thread::id> hasher;
gLayerStackTrace.dump([&curTid, &hasher](std::thread::id tid,
bool* isForwarding,
const std::string& layerName) {
if (curTid != hasher(tid)) {
if (curTid != -1UL) {
std::cerr << std::endl;
}
curTid = hasher(tid);
std::cerr << "Thread [" << tid << "] ";
if (isForwarding) {
std::cerr << (*isForwarding ? "Forwarding ": "Backwarding ");
}
}
std::cerr << layerName << ", ";
}, FLAGS_layer_stack_error_only_current_thread);
std::cerr << std::endl;
}
std::cerr.write(data, sz);
});
}
} // namespace paddle } // namespace paddle
...@@ -15,6 +15,9 @@ limitations under the License. */ ...@@ -15,6 +15,9 @@ limitations under the License. */
#pragma once #pragma once
#include <stack> #include <stack>
#include <thread>
#include <unordered_map>
#include <functional>
#include "ThreadLocal.h" #include "ThreadLocal.h"
...@@ -29,71 +32,160 @@ namespace paddle { ...@@ -29,71 +32,160 @@ namespace paddle {
* @code{.cpp} * @code{.cpp}
* *
* paddle::CustomStackTrace<std::string> stack; * paddle::CustomStackTrace<std::string> stack;
* PASS_TEST=0;
* for (auto& layer : layers){ * for (auto& layer : layers){
* stack.push(layer->getName()); * stack.push(layer->getName());
* layer->forward(passType); * layer->forward();
* } * }
* for (auto& layer : layers){ *
* stack.pop(""); // mark under pop stage.
*
* for (auto it = layers.rbegin(); it != layers.rend(); ++it){
* auto& layer = *it;
* layer->backward(passType); * layer->backward(passType);
* stack.pop(layer->getName()); * stack.pop(layer->getName());
* } * }
* *
* if(passType == PASS_TEST) {
* stack.clear();
* }
* else {
* stack.dump([](const std::string& layername){
* LOG(INFO) << "LayerName: " << layername;
* })
* }
*
*
* @endcode * @endcode
*/ */
template <typename T> template <typename T>
class CustomStackTrace{ class CustomStackTrace{
public: public:
/** /**
* @brief Pop out an item from the top of the stack. For safety the item * @brief Pop out an item from the top of the stack if item == top.
* will be poped should equal to ip. * Else, just set status to popping.
*/ */
void pop(const T& ip) { void pop(const T& item) {
auto& p = *logstack_; pushing() = false;
CHECK_EQ(ip, p.top()); auto& s = this->stack();
p.pop(); if (item == s.top()) {
s.pop();
}
} }
/** /**
* @brief Empty the stack by sequence from top to button. * @brief clear current thread stack.
* @param[in] callback A function deal with each item while dumping.
* It must have and only have a in parameter which is the stack item.
*/ */
template <typename Callback> void clear() {
void dump(Callback callback) { auto& s = stack();
auto& p = *logstack_; while (!s.empty()) {
while (!p.empty()) { s.pop();
callback(p.top());
p.pop();
} }
} }
/** /**
* @brief Only empty the stack. * @brief return true if all thread's stack is empty.
* @return true if empty
*/ */
void clear() { bool empty() const {
dump([](const T& ip){}); std::lock_guard<std::mutex> g(this->mtx_);
for (auto p : this->stackBuffers_) {
std::stack<T>& s = *p.second;
if (!s.empty()) {
return false;
}
}
return true;
}
/**
* @brief DumpCallback Type. It will be invoked many times by dump method.
*
* The first parameter is stack thread id.
* The second parameter is the last action of stack is push or not.
* The third parameter is the item in stack.
*/
typedef std::function<void(const std::thread::id& /*threadId*/,
bool* /*isPushing*/,
const T& /*item*/)> DumpCallback;
/**
* Dump all thread stack, and all stack will be cleared.
*/
void dump(const DumpCallback& callback, bool onlyCurrentThread = false) {
std::lock_guard<std::mutex> g(this->mtx_);
for (auto p : this->stackBuffers_) {
std::thread::id tid = p.first;
if (onlyCurrentThread && tid != std::this_thread::get_id()) {
continue;
}
std::stack<T>& s = *p.second;
bool* isPush = nullptr;
auto it = this->pushingBuffers_.find(tid);
if (it != this->pushingBuffers_.end()) {
isPush = it->second;
} }
while (!s.empty()) {
callback(tid, isPush, s.top());
s.pop();
}
}
}
/**
* @brief Push item to current thread stack.
*/
void push(const T& item) {
pushing() = true;
auto& p = this->stack();
p.push(item);
}
private:
/**
* Get thread local attribute, and save them into a map (threadId => TYPE*)
*
* @tparam TYPE thread local attribute type.
* @param threadLocal Thread Local object.
* @param buffers a map from threadId to TYPE*
*/
template <typename TYPE>
inline TYPE& getThreadLocal(
ThreadLocal<TYPE>& threadLocal,
std::unordered_map<std::thread::id, TYPE*>& buffers) {
TYPE* retv = threadLocal.get(false);
if (retv) {
return *retv;
} else {
std::lock_guard<std::mutex> guard(this->mtx_);
retv = threadLocal.get();
auto id = std::this_thread::get_id();
buffers.insert({id, retv});
return *retv;
}
}
/**
* @brief Get thread local stack reference.
*/
std::stack<T>& stack() {
return this->getThreadLocal(this->logStack_,
this->stackBuffers_);
}
/** /**
* @brief Push item ip to the top of the stack. * @brief Get thread local pushing flag.
*/ */
void push(const T& ip) { bool& pushing() {
auto& p = *logstack_; return this->getThreadLocal(this->isPushing_,
p.push(ip); this->pushingBuffers_);
} }
private: private:
ThreadLocalD<std::stack<T> > logstack_; mutable std::mutex mtx_;
std::unordered_map<std::thread::id, std::stack<T>* > stackBuffers_;
std::unordered_map<std::thread::id, bool* > pushingBuffers_;
ThreadLocal<bool> isPushing_;
ThreadLocal<std::stack<T> > logStack_;
}; };
extern CustomStackTrace<std::string> gLayerStackTrace; extern CustomStackTrace<std::string> gLayerStackTrace;
/**
* @brief Install a failure handler to print layer stack when error.
*/
extern void installLayerStackTracer();
} // namespace paddle } // namespace paddle
/* Copyright (c) 2016 Baidu, Inc. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#ifdef __APPLE__
#include <dispatch/dispatch.h>
#endif
#ifdef __APPLE__
#ifndef PTHREAD_BARRIER_H_
#define PTHREAD_BARRIER_H_
#include <pthread.h>
#include <errno.h>
typedef int pthread_barrierattr_t;
typedef struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
int count;
int tripCount;
} pthread_barrier_t;
int pthread_barrier_init(pthread_barrier_t *barrier,
const pthread_barrierattr_t *attr, unsigned int count) {
if (count == 0) {
errno = EINVAL;
return -1;
}
if (pthread_mutex_init(&barrier->mutex, 0) < 0) {
return -1;
}
if (pthread_cond_init(&barrier->cond, 0) < 0) {
pthread_mutex_destroy(&barrier->mutex);
return -1;
}
barrier->tripCount = count;
barrier->count = 0;
return 0;
}
int pthread_barrier_destroy(pthread_barrier_t *barrier) {
pthread_cond_destroy(&barrier->cond);
pthread_mutex_destroy(&barrier->mutex);
return 0;
}
int pthread_barrier_wait(pthread_barrier_t *barrier) {
pthread_mutex_lock(&barrier->mutex);
++(barrier->count);
if (barrier->count >= barrier->tripCount) {
barrier->count = 0;
pthread_cond_broadcast(&barrier->cond);
pthread_mutex_unlock(&barrier->mutex);
return 1;
} else {
pthread_cond_wait(&barrier->cond, &(barrier->mutex));
pthread_mutex_unlock(&barrier->mutex);
return 0;
}
}
#endif // PTHREAD_BARRIER_H_
typedef int pthread_spinlock_t;
int pthread_spin_init(pthread_spinlock_t *lock, int pshared) {
__asm__ __volatile__("" ::: "memory");
*lock = 0;
return 0;
}
int pthread_spin_destroy(pthread_spinlock_t *lock) {
return 0;
}
int pthread_spin_lock(pthread_spinlock_t *lock) {
while (1) {
int i;
for (i=0; i < 10000; i++) {
if (__sync_bool_compare_and_swap(lock, 0, 1)) {
return 0;
}
}
sched_yield();
}
}
int pthread_spin_unlock(pthread_spinlock_t *lock) {
__asm__ __volatile__("" ::: "memory");
*lock = 0;
return 0;
}
#endif // __APPLE__
...@@ -16,56 +16,11 @@ limitations under the License. */ ...@@ -16,56 +16,11 @@ limitations under the License. */
#pragma once #pragma once
#include <pthread.h> #include <pthread.h>
#include <semaphore.h>
#include <sys/time.h> #include <sys/time.h>
#include <unistd.h>
#include <condition_variable> #include <condition_variable>
#include <mutex> #include <mutex>
#ifdef __APPLE__ #include "DisableCopy.h"
#include <dispatch/dispatch.h>
#endif
#ifdef __APPLE__
#ifndef PTHREAD_BARRIER_H_
#define PTHREAD_BARRIER_H_
#include <pthread.h>
#include <errno.h>
typedef int pthread_barrierattr_t;
typedef struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
int count;
int tripCount;
} pthread_barrier_t;
extern int pthread_barrier_init(pthread_barrier_t *barrier,
const pthread_barrierattr_t *attr,
unsigned int count);
extern int pthread_barrier_destroy(pthread_barrier_t *barrier);
extern int pthread_barrier_wait(pthread_barrier_t *barrier);
#endif // PTHREAD_BARRIER_H_
typedef int pthread_spinlock_t;
extern int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
extern int pthread_spin_destroy(pthread_spinlock_t *lock);
extern int pthread_spin_lock(pthread_spinlock_t *lock);
extern int pthread_spin_unlock(pthread_spinlock_t *lock);
#endif
namespace paddle { namespace paddle {
...@@ -142,58 +97,44 @@ protected: ...@@ -142,58 +97,44 @@ protected:
* which means it will keep trying to lock until lock on successfully. * which means it will keep trying to lock until lock on successfully.
* The SpinLock disable copy. * The SpinLock disable copy.
*/ */
class SpinLockPrivate;
class SpinLock { class SpinLock {
public: public:
SpinLock() { pthread_spin_init(&lock_, 0); } DISABLE_COPY(SpinLock);
~SpinLock() { pthread_spin_destroy(&lock_); } SpinLock();
SpinLock(const SpinLock&) = delete; ~SpinLock();
SpinLock& operator=(const SpinLock&) = delete;
// std::mutext interface // std::mutext interface
void lock() { pthread_spin_lock(&lock_); } void lock();
void unlock() { pthread_spin_unlock(&lock_); } void unlock();
protected: private:
pthread_spinlock_t lock_; SpinLockPrivate* m;
char padding_[64 - sizeof(pthread_spinlock_t)];
}; };
/** /**
* A simple wapper of semaphore which can only be shared in the same process. * A simple wapper of semaphore which can only be shared in the same process.
*/ */
class SemaphorePrivate;
#ifdef __APPLE__
class Semaphore { class Semaphore {
public: public:
explicit Semaphore(int initValue = 0) { //! Disable copy & assign
sem_ = dispatch_semaphore_create(initValue); Semaphore(const Semaphore& other) = delete;
} Semaphore& operator= (const Semaphore&& other) = delete;
~Semaphore() { dispatch_release(sem_); } //! Enable move.
bool timeWait(struct timespec* ts) { Semaphore(Semaphore&& other): m(std::move(other.m)) {
dispatch_time_t m = dispatch_walltime(ts, 0);
return (0 == dispatch_semaphore_wait(sem_, m));
} }
void wait() { dispatch_semaphore_wait(sem_, DISPATCH_TIME_FOREVER); }
void post() { dispatch_semaphore_signal(sem_);}
protected:
dispatch_semaphore_t sem_;
};
#else
class Semaphore {
public: public:
/** /**
* @brief Construct Function. * @brief Construct Function.
* @param[in] initValue the initial value of the * @param[in] initValue the initial value of the
* semaphore, default 0. * semaphore, default 0.
*/ */
explicit Semaphore(int initValue = 0) { sem_init(&sem_, 0, initValue); } explicit Semaphore(int initValue = 0);
~Semaphore() { sem_destroy(&sem_); } ~Semaphore();
/** /**
* @brief The same as wait(), except if the decrement can not * @brief The same as wait(), except if the decrement can not
...@@ -203,43 +144,38 @@ public: ...@@ -203,43 +144,38 @@ public:
* @return ture if the decrement proceeds before ts, * @return ture if the decrement proceeds before ts,
* else return false. * else return false.
*/ */
bool timeWait(struct timespec* ts) { return (0 == sem_timedwait(&sem_, ts)); } bool timeWait(struct timespec* ts);
/** /**
* @brief decrement the semaphore. If the semaphore's value is 0, then call blocks. * @brief decrement the semaphore. If the semaphore's value is 0, then call blocks.
*/ */
void wait() { sem_wait(&sem_); } void wait();
/** /**
* @brief increment the semaphore. If the semaphore's value * @brief increment the semaphore. If the semaphore's value
* greater than 0, wake up a thread blocked in wait(). * greater than 0, wake up a thread blocked in wait().
*/ */
void post() { sem_post(&sem_); } void post();
protected: private:
sem_t sem_; SemaphorePrivate* m;
}; };
#endif
static_assert(sizeof(SpinLock) == 64, "Wrong padding");
/** /**
* A simple wrapper of thread barrier. * A simple wrapper of thread barrier.
* The ThreadBarrier disable copy. * The ThreadBarrier disable copy.
*/ */
class ThreadBarrierPrivate;
class ThreadBarrier { class ThreadBarrier {
public: public:
DISABLE_COPY(ThreadBarrier);
/** /**
* @brief Construct Function. Initialize the barrier should * @brief Construct Function. Initialize the barrier should
* wait for count threads in wait(). * wait for count threads in wait().
*/ */
explicit ThreadBarrier(int count) { explicit ThreadBarrier(int count);
pthread_barrier_init(&barrier_, NULL, count); ~ThreadBarrier();
}
~ThreadBarrier() { pthread_barrier_destroy(&barrier_); }
ThreadBarrier(const ThreadBarrier&) = delete;
ThreadBarrier& operator=(const ThreadBarrier&) = delete;
/** /**
* @brief . * @brief .
...@@ -247,10 +183,10 @@ public: ...@@ -247,10 +183,10 @@ public:
* then wake up all the count - 1 threads and continue run together. * then wake up all the count - 1 threads and continue run together.
* Else block the thread until waked by other thread . * Else block the thread until waked by other thread .
*/ */
void wait() { pthread_barrier_wait(&barrier_); } void wait();
protected: private:
pthread_barrier_t barrier_; ThreadBarrierPrivate* m;
}; };
/** /**
......
...@@ -25,7 +25,7 @@ namespace paddle { ...@@ -25,7 +25,7 @@ namespace paddle {
pid_t getTID() { pid_t getTID() {
#if defined(__APPLE__) || defined(__OSX__) #if defined(__APPLE__) || defined(__OSX__)
pid_t tid = syscall(SYS_thread_selfid); pid_t tid = syscall(SYS_thread_selfid);
#elif defined(__LINUX__) #else
#ifndef __NR_gettid #ifndef __NR_gettid
#define __NR_gettid 224 #define __NR_gettid 224
#endif #endif
......
...@@ -129,13 +129,7 @@ void runInitFunctions() { ...@@ -129,13 +129,7 @@ void runInitFunctions() {
void initMain(int argc, char** argv) { void initMain(int argc, char** argv) {
initializeLogging(argc, argv); initializeLogging(argc, argv);
logging::installFailureWriter([](const char* data, int sz) { installLayerStackTracer();
std::cerr << "Current Layer forward/backward stack is " << std::endl;
gLayerStackTrace.dump([](const std::string& layername){
std::cerr << "LayerName: " << layername << std::endl;
});
std::cerr.write(data, sz);
});
std::string line; std::string line;
for (int i = 0; i < argc; ++i) { for (int i = 0; i < argc; ++i) {
line += argv[i]; line += argv[i];
......
/* Copyright (c) 2016 Baidu, Inc. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/utils/Locks.h"
#include <semaphore.h>
#include <unistd.h>
namespace paddle {
class SemaphorePrivate {
public:
sem_t sem;
};
Semaphore::Semaphore(int initValue): m(new SemaphorePrivate()) {
sem_init(&m->sem, 0, initValue);
}
Semaphore::~Semaphore() {
sem_destroy(&m->sem);
}
bool Semaphore::timeWait(struct timespec* ts) {
return (0 == sem_timedwait(&m->sem, ts));
}
void Semaphore::wait() {
sem_wait(&m->sem);
}
void Semaphore::post() {
sem_post(&m->sem);
}
class SpinLockPrivate {
public:
inline SpinLockPrivate() { pthread_spin_init(&lock_, 0); }
inline ~SpinLockPrivate() { pthread_spin_destroy(&lock_); }
pthread_spinlock_t lock_;
char padding_[64 - sizeof(pthread_spinlock_t)];
};
SpinLock::SpinLock():m(new SpinLockPrivate()) {}
SpinLock::~SpinLock() { delete m; }
void SpinLock::lock() {
pthread_spin_lock(&m->lock_);
}
void SpinLock::unlock() {
pthread_spin_unlock(&m->lock_);
}
class ThreadBarrierPrivate {
public:
pthread_barrier_t barrier_;
};
ThreadBarrier::ThreadBarrier(int count): m(new ThreadBarrierPrivate()) {
pthread_barrier_init(&m->barrier_, nullptr, count);
}
ThreadBarrier::~ThreadBarrier() {
pthread_barrier_destroy(&m->barrier_);
delete m;
}
void ThreadBarrier::wait() {
pthread_barrier_wait(&m->barrier_);
}
} // namespace paddle
/* Copyright (c) 2016 Baidu, Inc. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/utils/Locks.h"
#include "paddle/utils/Logging.h"
#include <dispatch/dispatch.h>
#include <libkern/OSAtomic.h>
namespace paddle {
class SemaphorePrivate {
public:
~SemaphorePrivate() {
dispatch_release(sem);
}
dispatch_semaphore_t sem;
};
Semaphore::Semaphore(int initValue): m(new SemaphorePrivate()) {
m->sem = dispatch_semaphore_create(initValue);
}
Semaphore::~Semaphore() {
delete m;
}
bool Semaphore::timeWait(timespec *ts) {
dispatch_time_t tm = dispatch_walltime(ts, 0);
return (0 == dispatch_semaphore_wait(m->sem, tm));
}
void Semaphore::wait() {
dispatch_semaphore_wait(m->sem, DISPATCH_TIME_FOREVER);
}
void Semaphore::post() {
dispatch_semaphore_signal(m->sem);
}
class SpinLockPrivate {
public:
SpinLockPrivate(): lock_(0) {}
OSSpinLock lock_;
char padding_[64 - sizeof(OSSpinLock)]; // Padding to cache line size
};
SpinLock::SpinLock(): m(new SpinLockPrivate()) {}
SpinLock::~SpinLock() { delete m; }
void SpinLock::lock() {
OSSpinLockLock(&m->lock_);
}
void SpinLock::unlock() {
OSSpinLockUnlock(&m->lock_);
}
class ThreadBarrierPrivate {
public:
pthread_mutex_t mutex;
pthread_cond_t cond;
int count;
int tripCount;
inline explicit ThreadBarrierPrivate(int cnt):count(0), tripCount(cnt) {
CHECK_NE(cnt, 0);
CHECK_GE(pthread_mutex_init(&mutex, 0), 0);
CHECK_GE(pthread_cond_init(&cond, 0), 0);
}
inline ~ThreadBarrierPrivate() {
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
}
/**
* @brief wait
* @return true if the last wait
*/
inline bool wait() {
pthread_mutex_lock(&mutex);
++count;
if (count > tripCount) {
count = 0;
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&mutex);
return true;
} else {
pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex);
return false;
}
}
};
ThreadBarrier::ThreadBarrier(int count): m(new ThreadBarrierPrivate(count)) {}
ThreadBarrier::~ThreadBarrier() { delete m; }
void ThreadBarrier::wait() { m->wait(); }
} // namespace paddle
...@@ -2,3 +2,15 @@ add_simple_unittest(test_CommandLineParser) ...@@ -2,3 +2,15 @@ add_simple_unittest(test_CommandLineParser)
add_simple_unittest(test_Logging) add_simple_unittest(test_Logging)
add_simple_unittest(test_Thread) add_simple_unittest(test_Thread)
add_simple_unittest(test_StringUtils) add_simple_unittest(test_StringUtils)
add_simple_unittest(test_CustomStackTrace)
add_executable(
test_CustomStackTracePrint
test_CustomStackTracePrint.cpp
)
link_paddle_exe(test_CustomStackTracePrint)
if(NOT APPLE)
add_test(NAME test_CustomStackTracePrint
COMMAND ${PROJ_ROOT}/paddle/utils/tests/test_CustomStackTracePrint.sh
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
endif()
/* Copyright (c) 2016 Baidu, Inc. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <gtest/gtest.h>
#include <chrono>
#include "paddle/utils/CustomStackTrace.h"
#include "paddle/utils/CommandLineParser.h"
#include "paddle/utils/Util.h"
#include "paddle/utils/Locks.h"
P_DEFINE_int32(test_thread_num, 10, "testing thread number");
void testNormalImpl(const std::function<void(
paddle::CustomStackTrace<std::string>&,
size_t, size_t,
paddle::ThreadBarrier&,
paddle::ThreadBarrier&)>& callback) {
paddle::CustomStackTrace<std::string> tracer;
paddle::ThreadBarrier doneBarrier(FLAGS_test_thread_num + 1);
paddle::ThreadBarrier startBarrier(FLAGS_test_thread_num + 1);
constexpr size_t countDown = 10;
constexpr size_t layerSize = 1000;
std::vector<std::unique_ptr<std::thread>> threads;
threads.reserve(FLAGS_test_thread_num);
for (int32_t i=0; i < FLAGS_test_thread_num; ++i) {
threads.emplace_back(new std::thread([&tracer, &countDown, &layerSize,
&startBarrier, &doneBarrier,
&callback]{
callback(tracer, countDown, layerSize, startBarrier, doneBarrier);
}));
}
size_t cntDown = countDown;
while (cntDown-- > 0) {
startBarrier.wait();
doneBarrier.wait();
ASSERT_TRUE(tracer.empty());
}
for (auto& thread : threads) {
thread->join();
}
}
TEST(CustomStackTrace, normalTrain) {
testNormalImpl([](paddle::CustomStackTrace<std::string>& tracer,
size_t countDown, size_t layerSize,
paddle::ThreadBarrier& start, paddle::ThreadBarrier& finish){
while (countDown-- > 0) {
start.wait();
for (size_t i=0; i < layerSize; ++i) {
tracer.push("layer_" + std::to_string(i));
}
tracer.pop("");
for (size_t i=0; i < layerSize; ++i) {
tracer.pop("layer_" + std::to_string(layerSize - 1 - i));
}
finish.wait();
}
});
}
TEST(CustomStackTrace, normalTest) {
testNormalImpl([] (paddle::CustomStackTrace<std::string>& tracer,
size_t countDown, size_t layerSize,
paddle::ThreadBarrier& start, paddle::ThreadBarrier& finish){
while (countDown-- > 0) {
start.wait();
for (size_t i=0; i < layerSize; ++i) {
tracer.push("layer_" + std::to_string(i));
}
tracer.clear(); // in forward test, tracer will clear after forward.
finish.wait();
}
});
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
paddle::initMain(argc, argv);
return RUN_ALL_TESTS();
}
/* Copyright (c) 2016 Baidu, Inc. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/utils/Util.h"
#include "paddle/utils/CustomStackTrace.h"
int main(int argc, char** argv) {
paddle::initMain(argc, argv);
for (size_t i=0; i < 1000; ++i) {
paddle::gLayerStackTrace.push("layer_" + std::to_string(i));
if (i == 998) {
throw "Unhandle exception";
}
}
return 0;
}
#!/bin/bash
echo "Test Custom Stack Trace print correct result when fail"
./test_CustomStackTracePrint >customStackTraceLog 2>&1
if [ $? -eq 0 ]; then
exit 1
else
set -e
TEXT=""
for ((i=0; i<=998; i++))
do
TEXT="layer_$i, "$TEXT
done
TEXT="Forwarding "$TEXT
grep -q "$TEXT" customStackTraceLog
fi
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册