Server¶
- 
class paddle::ProtoServer¶
- It implements the rpc framework, which launchs one thread for each connection. Here define one parameter server as single TCP server binding on single port. All connections share single tcp ProtoServer object, each connection handles all requests from specified trainer within single worker thread. to accelerate bandwidth efficiency and harness multicore for pserver optimization to reduce pserver latency, you could launch more port for single NIC hardward with port=N(N>1) for small cluster job. - Inherits from paddle::SocketServer - Subclassed by paddle::ParameterServer2 - Public Types - 
typedef std::function<void(const google::protobuf::MessageLite& protoOut, const std::vector<iovec>& outputIovs)> ProtoResponseCallbackEx¶
 - 
typedef std::function<void(const google::protobuf::MessageLite& protoOut)> ProtoResponseCallback¶
 - Public Functions - 
ProtoServer(const std::string &addr, int port, int rdmaCpu = -1)¶
- rdmaCpu controls the cpu affinity of RDMA server daemon, which could benifit performance. rdmaCpu = -1 means TCP is used instead of RDMA transport. 
 - template <class ProtoIn>
- 
void registerServiceFunction(const std::string &funcName, std::function<void(const ProtoIn &request, ProtoResponseCallback callback)> func)¶
- Register a service function for this server void(const ProtoIn& request, ProtoResponseCallback callback) The service function process the request and call the callback after it finishes the request. - Use macro REGISTER_SERVICE_FUNCTION as a helper to simplify the use. 
 - template <class ProtoIn>
- 
void registerServiceFunctionEx(const std::string &funcName, std::function<void(const ProtoIn &, std::unique_ptr< MsgReader > msgReader, ProtoResponseCallbackEx callback)> func)¶
- Register a service function for this server The signature of the service function is void(const ProtoIn&, std::unique_ptr<MsgReader> msgReader, ProtoResponseCallbackEx callback) The service function process the request and call the callback after it finishes the request. The extended service function can take extra input blocks from the communication channel by reading msgReader. It can also send extra blocks to the communication channel by providing outputIovs as the argument for the callback function. - Use macro REGISTER_SERVICE_FUNCTION_EX as a helper to simplify the use. 
 - template <class ProtoIn>
- 
void registerServiceFunctionEx(const std::string &funcName, std::function<void(const ProtoIn &, std::unique_ptr< MsgReader > msgReader, ProtoResponseCallbackEx callback)> func)¶
- create wrapper function for parameter server high level function and register the wrapper function into function mapping. 
 - template <class ProtoIn>
- 
void registerServiceFunction(const std::string &funcName, std::function<void(const ProtoIn &, ProtoResponseCallback callback)> func)¶
 - Protected Types - 
typedef std::function<void(std::unique_ptr<MsgReader> msgReader, ResponseCallback callback)> ServiceFunction¶
 - Protected Functions - 
virtual void handleRequest(std::unique_ptr<MsgReader> msgReader, ResponseCallback callback)¶
- handle rpc request - Note
- it lookups rpc function mapping table to find function pointer, then call this function with further reading data from connection
- Parameters
- msgReader-- Message reader for reading data from connection 
- callback-- equal to channel->writeMessage 
 
 
 - 
void registerServiceFunctionImp(const std::string &funcName, ServiceFunction func)¶
- register one RPC function in function mapping - Parameters
- funcName-- function name string 
- func-- rpc function wrapped with reading and writing data 
 
 
 - Protected Attributes - 
ThreadLocal<struct timeval> handleRequestBegin_¶
- Tuning bare network overhead: the beginning of receiving request. 
 - 
std::map<std::string, ServiceFunction> nameToFuncMap_¶
- mapping to find rpc function while handling request 
 
- 
typedef std::function<void(const google::protobuf::MessageLite& protoOut, const std::vector<iovec>& outputIovs)> 
- 
class paddle::ParameterServer2¶
- Client interface for the parameter server - it implements several rpc API for remote parameter client usage. for sync-sgd, client needs one controller thread to build connections to all pservers, these controller connections do barriers synchronization with these connections used for transfering data. each data connection uses block based fine grained synchronization to gain better scalability. Merging gradients from different trainers are concurrently executed with block units, so that some network overhead will be hidden in merging gradient. for async-sgd, the difference is that pserver will do optimization immediately if the gradients are ready, so that pserver needs to prepare separate buffer to store value for sending back to trainer to prevent from being polluted. - Inherits from paddle::ProtoServer - Public Types - 
typedef void(ParameterServer2::* paddle::ParameterServer2::OperatorFunction) (const Operation &operation, OperationResult *result)
 - Public Functions - 
ParameterServer2(const std::string &addr, int port, int rdmaCpu = -1)¶
- disable default parameter for overloading :the id of cpu core hosting RDMA server(0-N) -1 means using TCP transport instead of RDMA 
 - 
~ParameterServer2()¶
 - template <typename Dtype>
- 
void reduceAndSendData(const SendDataRequest &request, std::unique_ptr<MsgReader> &msgReader, ProtoResponseCallbackEx &callback)¶
- service functions 
 - 
void templateReduceSum(const SendDataRequest &request, std::unique_ptr<MsgReader> &msgReader, ProtoResponseCallbackEx &callback)¶
 - 
void sendParameter(const SendParameterRequest &request, std::unique_ptr<MsgReader> msgReader, ProtoResponseCallbackEx callback)¶
- framework for sending parameters - Note
- different parameter data type can be sent to pserver. in most case, the api is used to send gradients from trainer to pserver. it also can be used to retrieve parameters from pserver
 
 - 
void sendData(const SendDataRequest &request, std::unique_ptr<MsgReader> msgReader, ProtoResponseCallbackEx callback)¶
 - 
void setConfig(const SetConfigRequest &request, ProtoResponseCallback callback)¶
- send config to pserver - Note
- it can help pserver to understand the configuration for optimization, logging control, duplicated initialization, etc.
 
 - 
void getStatus(const GetStatusRequest &request, ProtoResponseCallback callback)¶
- get status for pserver - Note
- used to check if parameters are ready at pserver
 
 - 
void setStatus(const SetStatusRequest &request, ProtoResponseCallback callback)¶
- set status for pserver - Note
- used to check if parameters are ready at pserver, since parameters at pserver are initialized by trainer
 
 - 
void doOperation(const DoOperationRequest &request, ProtoResponseCallback callback)¶
- framework for doing some operation at pserver end - Note
- if sync-sgd is used, controller will calling op_SGD action for gradient optimization. check avaiable operations in opFuncs[]
 
 - 
void createVector(const CreateVectorRequest &request, ProtoResponseCallback callback)¶
- Create a column vector. The size is the dimension of parameter. 
 - 
void releaseVector(const ReleaseVectorRequest &request, ProtoResponseCallback callback)¶
 - 
void createMatrix(const CreateMatrixRequest &request, ProtoResponseCallback callback)¶
- Create a column major matrix. The number of rows is the dimension of parameter. The number of columns is specifed by num_cols. 
 - 
void releaseMatrix(const ReleaseMatrixRequest &request, ProtoResponseCallback callback)¶
 - 
void waitPassStart(const WaitPassStartRequest &request, ProtoResponseCallback callback)¶
- stateful control for indicationg sync pass start - Note
- it is valuable for logging and state control, especially for sync-sgd control
 
 - 
void waitPassFinish(const WaitPassFinishRequest &request, ProtoResponseCallback callback)¶
- stateful control for indicationg sync pass end - Note
- it is valuable for logging and state control, especially for sync-sgd control
 
 - 
void synchronize(const SynchronizeRequest &request, ProtoResponseCallback callback)¶
- synchronize all distributed trainers - Note
- it’s general api for synchronizing trainer and pserver
 
 - 
void asyncFinishPass(const SynchronizeRequest &request, ProtoResponseCallback callback)¶
- stateful control for indicating async pass is finished - Note
- it is valuable for logging control, state reset, etc.
 
 - 
void loadValueVector(const LoadValueRequest &request, ProtoResponseCallback callback)¶
 - 
void saveValueVector(const SaveValueRequest &request, ProtoResponseCallback callback)¶
 - 
bool init()¶
- initialize parameter server 
 - 
void setParameter(const SendParameterRequest &request, std::vector<Buffer> &inputBuffers, SendParameterResponse *response, std::vector<Buffer> *outputBuffers)¶
- set parameters at pserver - Note
- do parameter initialization if neccessy.
 
 - 
void asyncSGD(const SendParameterRequest &request, std::vector<Buffer> &inputBuffers, SendParameterResponse *response, std::vector<Buffer> *outputBuffers)¶
- receive gradients and do optimization for async-sgd - Note
- this api asynchronizately receives all data from all trainers, and immediately do optimization and return optimizated value for trainer. this above routine are block based atomic updating, which means different block could based different stale gradient. it will discard some lagged gradients by default for better convergence.
 
 - 
void addGradient(const SendParameterRequest &request, std::vector<Buffer> &inputBuffers, SendParameterResponse *response, std::vector<Buffer> *outputBuffers)¶
- merge gradients from all trainer - Note
- this api use block based parallelization as fine grained parallelization which benifits lock contention and latency hidden for communication, also can harness multi-core efficiently. it also implements the synchronization for sync-sgd
 
 - 
void getParameter(const SendParameterRequest &request, std::vector<Buffer> &inputBuffers, SendParameterResponse *response, std::vector<Buffer> *outputBuffers)¶
- get dense parameters from pserver - Note
- for some specified condition, trainer will get parameters from pservers. e.g. if all parameters are stored at perver end for big model training trainer can use it to retrieve all parameters if necessary.
 
 - 
void getParameterSparse(const SendParameterRequest &request, std::vector<Buffer> &inputBuffers, SendParameterResponse *response, std::vector<Buffer> *outputBuffers)¶
- get sparse value from parameter server - Note
- with sparse enabled, pservers own all latest value while trainer only retrieve value that only are needed. e.g. trainer will do prefetch action to retrieve necessary latest value from pserver for sparse calculation.
 
 - 
void op_SGD(const Operation &operation, OperationResult *result)¶
 - 
void op_RESET(const Operation &operation, OperationResult *result)¶
 - 
void op_utv(const Operation &operation, OperationResult *result)¶
 - 
void op_au_bv(const Operation &operation, OperationResult *result)¶
 - 
void op_COPY(const Operation &operation, OperationResult *result)¶
 - 
void op_au(const Operation &operation, OperationResult *result)¶
 - 
void op_au_bv_cw(const Operation &operation, OperationResult *result)¶
 - 
void op_make_steepest_desc_dir(const Operation &operation, OperationResult *result)¶
 - 
void op_fix_dir_signs(const Operation &operation, OperationResult *result)¶
 - 
void op_dir_deriv(const Operation &operation, OperationResult *result)¶
 - 
void op_fix_omega_signs(const Operation &operation, OperationResult *result)¶
 - 
void op_cost(const Operation &operation, OperationResult *result)¶
 - 
void op_start_pass(const Operation &operation, OperationResult *result)¶
 - 
void op_finish_pass(const Operation &operation, OperationResult *result)¶
 - 
void op_apply(const Operation &operation, OperationResult *result)¶
 - 
void op_randomize(const Operation &operation, OperationResult *result)¶
 - 
void op_load(const Operation &operation, OperationResult *result)¶
 - 
void op_save(const Operation &operation, OperationResult *result)¶
 - 
void tuningSgdMidOutput()¶
- output log in at the middle stage of training - Note
- flush log histroy and state at the end for sgd
 
 - 
void tuningSgdFinished()¶
- output log in at the end stage of training - Note
- flush log histroy and state at the end for sgd. it will also flush some stateful stat for next pass.
 
 - 
void tuningAsyncsgdMidOutput()¶
- output log in at the middle stage of training - Note
- flush log histroy and state at the end for async-sgd. it will log some performance log if some lagged node are found
 
 - 
void tuningAsyncsgdFinished()¶
- output log in at the end stage of training - Note
- flush log histroy and state at the end for async-sgd.
 
 - Public Static Attributes - 
const std::string kRetMsgInvalidMatrixHandle¶
 - 
const std::string kRetMsgInvalidVectorHandle¶
 - 
const std::string kRetMsgUnknownOperation¶
 - 
ParameterServer2::OperatorFunction opFuncs¶
- doOperation will call following operations indirectly e.g. for sync-sgd control, the controller in remote updater will send op_SGD command to pserver, then send sendParameter request to pserver immediately. the two function at pserver end will do cooperation to achieve the sync-sgd gradient merge and optimization. the most following operations are specified for owlqn, all operations are under the context of doOperation function 
 - Protected Types - 
typedef std::pair<size_t, int64_t> BlockKey¶
 - 
typedef std::unordered_map<BlockKey, int64_t, BlockKeyHash> BlockMap¶
- all parameters are stored in CpuVector with a blockMap_ data structure to index block data required by requests. 
 - 
typedef std::vector<std::pair<int64_t, int64_t>> BlockSegments¶
 - 
typedef std::function<void(int64_t blockId, const VectorPtr vecs[])> ExecFunc¶
- framework routine for block parallelization e.g. for optimization on all blocks at pserver end, this routine can facilitize the parallelize of do optimization on all blocks with multithreads. 
 - Protected Functions - 
bool asyncGrdientCommitCheckAndStat(const SendParameterRequest &request)¶
- async gradient commit control 
 - 
void printAsyncGradientCommitStatAndReset()¶
 - 
void mergeSegments(BlockSegments *segments)¶
 - 
void readAllBlocks(MsgReader *msgReader, std::vector<ParameterServer2::Buffer> *buffers)¶
- read all data from connection and store it in static pre-allocated buffer 
 - 
const ParameterConfig &getParameterConfig(const ParameterBlock &block)¶
 - 
const ParameterConfig &getParameterConfig(int64_t blockId) const¶
- it implictly check blockOffsetMap_ while retrieving blockId 
 - template <class Response>
- 
bool isValidVectorHandle(int64_t handle, Response *response)¶
 - template <class Response>
- 
bool isValidMatrixHandle(int64_t handle, Response *response)¶
 - 
int64_t getBlockOffset(const ParameterBlock &block) const¶
- get block offset - Note
- block.begin_dim is added to the block offset. return -1 if block cannot be found
 
 - 
int64_t getBlockId(const ParameterBlock &block) const¶
- return -1 if block cannot be found 
 - 
void sendBackParameter(const ParameterBlock &block, int parameterType, SendParameterResponse *response, std::vector<Buffer> *outputBuffers)¶
- prepare data for sending back - Note
- modify reponse and outputBuffers for sending parameter back to client. The buffer for socket sending uses vectors_[parameterType] directly for dense with sync-sgd
 
 - 
void sendBackParameter(const ParameterBlock &block, int parameterType, SendParameterResponse *response, Buffer *buffer, std::vector<Buffer> *outputBuffers)¶
- prepare data for sending back - Note
- modify response and outputBuffers for sending parameter back to client. The buffer for socket sending uses buffer->base The parameter values are copied from vectors_[parameterType] to buffer->base. for dense with async-sgd
 
 - 
void sendBackParameterSparse(const ParameterBlock &block, int parameterType, SendParameterResponse *response, Buffer *buffer, size_t width, std::vector<Buffer> *outputBuffers)¶
- prepare data for sending back - Note
- specified for sparse
 
 - 
void blockTraverse(BlockInfo &info, const ParameterConfig &config, int64_t offset, size_t size, const VectorPtr vecs[], const ParameterOptimizer::TraverseCallback &callback)¶
 - Protected Attributes - 
std::vector<CpuVectorPtr> vectors_¶
 - 
std::vector<CpuMatrixPtr> matrices_¶
 - 
std::vector<CpuMemHandlePtr> dataMems_¶
 - 
std::unordered_map<size_t, ParameterConfig> configMap_¶
- mapping between parameter and config different parameter allows different config, such as decay_rate. for each request, it need to read config for adding gradient and optmization. 
 - 
BlockSegments usedSegments_¶
- Because some blocks might not be fully used. We keep a record of which segments are used. 
 - 
PServerStatus status_¶
- record pserver status, all status defined in ParameterService.pb 
 - 
std::atomic<int64_t> numSamplesProcessed_¶
- record all samples processed which could be used by optimizater 
 - 
double cost_¶
 - 
int mpiSize_¶
 - 
int dataSize_¶
 - 
OptimizationConfig config_¶
- configuration for current parameter optimizer 
 - 
ThreadLocal<ReadWriteBuffer<real, ALIGN_HINT>> readWriteBuffer_¶
- to buffer the data from network for further processing to reduce redundant memory allocation. 
 - 
int64_t size_¶
- size of the parameter 
 - 
ThreadBarrier gradientReadyBarrier_¶
- for synchronized training, check details in addGradient() and doOperation() 
 - 
ThreadBarrier parameterReadyBarrier_¶
 - 
ThreadBarrier passBarrier_¶
 - 
ThreadLocal<std::vector<SendParameterRequest>> requestVec_¶
 - 
ThreadLocal<std::vector<ProtoResponseCallbackEx>> callbackVec_¶
 - 
std::atomic<int> numPassFinishClients_¶
 - 
bool allClientPassFinish_¶
 - 
std::vector<std::unique_ptr<ThreadBarrier>> synchronizeBarriers_¶
 - 
std::atomic<int> serverId_¶
 - 
int64_t asyncLaggedThreshold_¶
- for lagged async gradient gradient commit control in Async Sgd. discard lagged gradients from too slow nodes, whose gradients exhibits bad quality. Algorithm: pserver: - initial asyncUpdaterSteps = 0, asyncTrainerSteps_[N] = 0. syncUpdaterSteps means the version of parameter value.
- when pull arrives, record asyncUpdateSteps_ into syncTrainerSteps_[trainer_id]
- when push arrives, compare asyncUpdateSteps_ with syncTrainerSteps_[trainer_id] if delta > threshold, discard current gradient, else commit gradient.
- reset asyncUpdaterSteps_ and asyncTrainerSteps_[N] when pass finished Note: it can not discard all lag-gradient strictly in some special condition. part of gradients could be discarded if ConcurrentRemoteParameterUpdater is sed. this algorithm is implemented in asynSGD()
 
 - 
std::atomic<int64_t> asyncUpdateSteps_¶
 - 
std::vector<int64_t> asyncTrainerSteps_¶
 - 
size_t asyncLaggedGradientsNum_¶
 - 
std::vector<size_t> asyncUpdateStat_¶
- stat all async update 
 - 
std::vector<size_t> asyncTrainerDiscardStat_¶
- stat per trainer_id 
 - 
std::vector<size_t> asyncTrainerCommitStat_¶
- stat per trainer_id 
 - 
std::unique_ptr<SyncThreadPool> syncThreadPool_¶
- only used by controller and other control cmd from trainer number 0 
 - 
bool isSparseServer_¶
- pserver for sparse remote update parameters 
 - 
std::atomic<int64_t> batchId_¶
- barrier performance tuning sync-sgd required 
 - 
ThreadLocal<struct timeval> addGradBegin_¶
- the beginning of addGradient without network overhead 
 - 
std::unique_ptr<StatSet> statSet_¶
- tuning barrier performance to better control log for sparse and dense parameter, we use different log entities for different parameterServer objects. it will output lots of performance stats to perceive the overhead of network, fluctuation of computation from forwardbackward and network, computation from optimization at pserver end, barrier overhead, etc. to understand tuning data, focus on the synchronization between addGradient and doOperation which indirectly call op_SGD operation controlled by remote updater controller 
 - 
struct BlockInfo¶
- to parallelize the multi-thread and multi-connnection computation at pserver, it use block unit to reduce the contention for computation, even further use block level optimizater control for each block for some special reason annotated below. - Public Members - 
const ParameterConfig *config¶
 - 
std::unique_ptr<std::mutex> lock¶
 - 
uint64_t offset¶
- global offset for all parameters 
 - 
std::unique_ptr<ParameterOptimizer> optimizer¶
- Async sgd in pserver is very different from sync sgd. Each trainer follows startBatch, update*, finishBatch as in sync sgd, but all these actions are almost executed by multi-core and multi-thread simutaneously, so that async sgd optimization is based on block level in reality, then per block optimization is necessary indeed. In addition, per block optimization is also perfered for performance with multithreads. 
 
- 
const ParameterConfig *
 - 
struct BlockKeyHash¶
 - 
struct Buffer¶
 - template <typename T, size_t AlignBytes>
- 
class ReadWriteBuffer¶
- The ReadWriteBuffer is based on std::vector, but aligned for avx/sse compute. And add some helper method to allocate memory aligned blocks. - Parameters
- T-- type of element. 
- AlignBytes-- the memory aligned bytes for allocated blocks. 
 
 - Inherits from std::vector< T, AlignedAllocator< T, AlignBytes > > - Public Functions - 
void resizeWithAlignHints(size_t size, size_t alignBlockCount = 1)¶
- Resize Buffer, with block count that will be allocated. Each block will be memory aligned in AlignBytes. - Parameters
- size-- The element count in all blocks. 
- alignBlockCount-- The block count that will be allocated. 
 
 
 - 
void resetAlignAlloc()¶
- reset aligned allocate blocks. 
 - 
T *nextBlock(size_t blockSize)¶
- get next aligned block address. - Return
- Aligned block address.
- Parameters
- blockSize-- is the element count in each block. 
 
 
 - Public Static Attributes - 
constexpr bool IsTLargerThanAlign¶
- IsTLargerThanAlign compiled time calculated constant for is type T larger than alignments. 
 - 
constexpr size_t AlignElementCount¶
- if AlignBytes > sizeof(T), then will calcuate how many elements can be stored in AlignBytes. 
 - Private Members - 
size_t curOffset_¶
 
 
- 
typedef