Client¶
- 
class paddle::BaseClient¶
- it manages all connections to pservers. it exists two modes to manage connections to all pservers. Firstly, one connection owns two threads that separately manage to send and receive data. Secondly, each thread uses one connection for all activation in it. the first solution arms with sendThreads_/recvThreads_ and sendJobQueue_/ recvJobQueue_. the second solution use some shared thread pool to manage connections. In addition to pserver, metric learning also uses network to exchange features within multi-machines, so this class just abstracts some basic threads and queue buffer creation for them - Subclassed by paddle::ParameterClient2 - Public Functions - 
BaseClient(bool separate = false, int numPorts = FLAGS_ports_num)¶
 - 
virtual ~BaseClient()¶
 - template <class DataType>
- 
void putData(int clientId, SendDataType type, DataType *datas, size_t size, DataUpdateMode mode)¶
- send data to server, support only synchronize 
 - template <class DataType>
- 
void putOwnData(int clientId, SendDataType type, DataType *datas, size_t size)¶
 - template <class DataType>
- 
void getAllData(int clientId, SendDataType type, DataType *datas, size_t size)¶
 - template <class DataType>
- 
void reduce(DataType *sendBuf, DataType *recvBuf, size_t size, int clientId, int rootId)¶
- Reduces values on all clients. This reduce just support SUM. The results are saved in recvBuf of rootId client 
 - 
virtual TransDataType getTransDtype(const std::type_info &info)¶
- return trans data type according to the input type 
 - Protected Types - 
typedef std::unique_ptr<std::thread> ThreadPtr¶
 - 
typedef std::vector<std::vector<iovec>> InputIovs¶
 - 
typedef std::vector<SendParameterRequest> SendRequest¶
 - 
typedef std::vector<SendDataRequest> SendDataRequestVec¶
 - Protected Functions - 
int calcClientId(int i, int serviceNum)¶
 - 
void startThreads()¶
- start threads in sendThreads_ and recvThreads_ 
 - 
void finishThreads()¶
- finish threads in sendThreads_ and recvThreads_ 
 - template <class DataType>
- 
void prepareData(int clientId, SendDataType type, DataUpdateMode updateMode, DataType *datas, size_t size, SendJob *sendJob)¶
 - template <class DataType>
- 
void sendData(int clientId, SendDataType type, DataUpdateMode updateMode, DataType *datas, size_t size)¶
- send data to all data servers - Note
- each trainer sends all its data to all data servers it’s for broadcast data synchronization, such as features synchronization in metric learning.
 
 - 
void recvData()¶
- recv data from all data servers - Note
- synchronize all recv threads
 
 - template <typename ProtoIn, typename ProtoOut>
- 
void multiCall(const char *funcName, const ProtoIn &request, std::vector<ProtoOut> *responses)¶
- send request, and recv responses 
 - 
void synchronize(SyncObject syncObjectId = SYNC_DEFAULT)¶
- synchronize all trainers and pservers - Note
- used to ensure that data of all trainers have been received
 
 - 
virtual void send(int threadId) = 0¶
- use multithread to separately send data - Note
- each thread should read its own JobQueue to handle requests each thread should calcClientId() to retrieve connections managed by himself. send and recv are implemented in child class.
 
 - Protected Attributes - 
bool stopping_¶
 - 
int serviceNum_¶
- nodes * ports that means the number of real pservers 
 - 
int threadNum_¶
- threads num for managing all services. Normally the number of pservers are relatively less than several hundreds so that using thread-based parallelization can benifit traffic performance and pserver’s sgd optimization performance. 
 - 
std::vector<ProtoClient> clients_¶
- the connection manager at client end 
 - 
std::unique_ptr<ThreadBarrier> recvSyncBarrier_¶
 - 
std::vector<std::unique_ptr<SendQueue>> sendJobQueue_¶
- send/recv queue cooperates with each other to accomplish overlapping communication with forwardBackward action. 
 - 
int numPorts_¶
- port num for each node 
 - 
bool separateSendAndRecv_¶
- if set, overlapped optimization is disabled 
 - 
std::vector<CpuMemHandlePtr> recvDataMems_¶
 - Protected Static Functions - 
static int divup(int a, int b)¶
- for a > 0, b > 0: return the smallest x s.t. b*x >= a 
 - 
struct SendJob¶
- Public Members - 
SendRequest parallelRequests¶
- store protobuf request 
 - 
SendDataRequestVec parallelDataRequests¶
- store data, such as features for metric learning 
 
- 
SendRequest 
 
- 
- 
class paddle::ParameterClient2¶
- The client interface for parameter server. ParameterClient2 supports 2 modes for managing connections to parameter servers, in the 1st mode one connection is shared by 2 threads that are separately responsible for sending and recieving activities, in the 2nd mode one connection is owned by only one thread, and all the sending and recieving activities run in that single thread. TODO(yanfei): Additional core idea to further optimizate pserver performance is to do sync-sgd based parameter level instead of pserver level. full-parallelization based parameter level for sync-sgd also can sense forwardbackward computation layer-by-layer for more deeper layer model. Firstly, pserver can do full-parallelization on all computation based parameter level instead of waiting for all gradients are finished and start to send back parameters value immediately if parameter is ready instead of waiting for all parameters value are ready Secondly, parameter client can write back parameters to GPU instead of waiting until all parameters are received to CPU host end. - Inherits from paddle::BaseClient - Public Functions - 
ParameterClient2(bool separate = false, int port = FLAGS_port, int numPorts = FLAGS_ports_num)¶
- Constructor. - Parameters
- separate-- True if sending and recieving activities are separated into 2 threads, otherwise false. 
- port-- Port number that parameter client runs on. 
- numPorts-- Number of ports parameter clients occupies, numPorts * pserver number is the total number of connections the parameter client maintains. 
 
 
 - 
~ParameterClient2()¶
 - 
bool init(const std::vector<ParameterPtr> ¶meters)¶
 - 
void sendAndReceiveParameter(ParameterUpdateMode updateMode, ParameterType parameterType, const std::vector<ParameterSegments> &segments, int64_t numSamples, real cost, bool sendBackParameter, ParameterType sendBackParameterType, ParameterType recvParameterType)¶
- service functions - Sends the segments in parameter to parameter servers, then receives the response from the servers. - Note
- Only parameterType will be sent.
- Parameters
- updateMode-- Indicates how parameters should be updated on the server side. 
- parameterType-- Type of parameter that will be sent. 
- segments-- Segments in the parameter that will be sent. 
- numSamples-- Number of samples this update is based on. 
- cost-- Cost of the batch, will be used to calculate global object value. 
- sendBackParameter-- True if the updated parameters should be sent back, otherwise false. 
- sendBackParameterType-- Send back parameter type on pserver, PARAMETER_VALUE by default 
- recvParameterType-- pserver[sendBackParameterType] will be copy to client[recvParameterType] 
 
 
 - 
void sendAndReceiveParameter(ParameterUpdateMode updateMode, ParameterType parameterType, int64_t numSamples, real cost, bool sendBackParameter, ParameterType sendBackParameterType = PARAMETER_VALUE, ParameterType recvParameterType = PARAMETER_VALUE)¶
- Sends all parameters to parameter servers, and receives the response from the servers. 
 - 
void sendParameter(ParameterUpdateMode updateMode, ParameterType parameterType, const std::vector<ParameterSegments> &segments, int64_t numSamples, real cost, bool sendBackParameter, BatchStatus batchStatus)¶
- Sends the segments in parameter to parameter servers. Each sendParameter() must be paired with a recvParameter() in the future. Only parameterType will be sent. - Note
- This function is non-blocking. This means that parameter should not change between this call and recvParameter()
- Parameters
- updateMode-- Indicates how parameters should be updated on the server side. 
- parameterType-- Type of parameter that will be sent. 
- segments-- Segments in the parameter that will be sent. 
- numSamples-- Number of samples this update is based on. 
- cost-- Cost of the batch, will be used to calculate global object value. 
- sendBackParameter-- True if the updated parameters should be sent back, otherwise false. 
- batchStatus-- Status of the batch. 
 
 
 - 
void recvParameter()¶
 - 
void sendParameter(ParameterUpdateMode updateMode, ParameterType parameterType, int64_t numSamples, real cost, bool sendBackParameter, BatchStatus batchStatus)¶
- Sends all parameters to parameter servers, recvParameter() have to be invoked afterwards. - Note
- This function is non-blocking. This means that if parameter should not changes between this call and recvParameter()
 
 - 
void getParameter(ParameterType recvParameterType = PARAMETER_VALUE, ParameterType sendBackParameterType = PARAMETER_VALUE)¶
- Get all parameters from parameter servers. 
 - 
void getParameterSparse(ParameterType recvParameterType = PARAMETER_VALUE, ParameterType sendBackParameterType = PARAMETER_VALUE)¶
- Get parameters by sparse row ids from parameter servers. 
 - 
void setParameter()¶
- Set all parameters on parameter servers using the local parameters. 
 - 
void setParameterZero()¶
- Set all parameters on parameter servers, values will be zero means do not sending local parameters 
 - 
void waitPassStart()¶
- Wait until all gradient servers start one pass. - Note
- This is now only used by the gradient servers for “sgd” algorithm. Calling this function means that the calling gradient server is ready to start a new pass.
 
 - 
void waitPassFinish()¶
- Wait until all gradient servers finish one pass. - Note
- This is now only used by the gradient servers for “sgd” algorithm. Calling this function means that the calling gradient server finishes one pass.
 
 - 
void synchronize(SyncObject syncObjectId = SYNC_DEFAULT)¶
- Wait until all gradient servers call this function. 
 - 
void asyncFinishPass(SyncObject syncObjectId = SYNC_DEFAULT)¶
- Called when async-sgd finish pass. 
 - 
void asyncStartPass(SyncObject syncObjectId = SYNC_DEFAULT)¶
 - 
void doOperation(PreparedOperations &ops, bool waitForGradient, bool sendBackParameter, bool releasePass = true)¶
- Execute the prepared operations on pservers, fetch the results and aggregate results from different pservers. - Parameters
- ops-- Prepared operations that will be executed on pservers. 
- waitForGradient-- If true, wait for gradient to be ready before starting the operations. 
- sendBackParameter-- If true, send back the parameter to clients after the operations are finished. 
- If-- true, and if all clients call waitPassFinish, signal all clients finish the pass. 
 
 
 - 
void setConfig(const OptimizationConfig &optConfig, const std::string &saveDir = "", bool isSparseServer = false)¶
- Set the configuration of pserver, including parameter config and optimization config 
 - 
bool inStatus(PServerStatus status)¶
- Return true if all pservers are in the given status. 
 - 
bool isPassFinish()¶
 - 
void setStatus(PServerStatus status)¶
- Set pserver status. 
 - 
void waitForStatus(PServerStatus status)¶
- Wait until all pservers are at status. - Note
- This function is not suitable for frequent use, because it sleeps 1 second each time when condition is satisfied.
 
 - 
PServerVector createVector()¶
- Create a column vector. The size is the dimension of parameter. 
 - 
void releaseVector(PServerVector handle)¶
- Release the PServerVector given handle. 
 - 
PServerMatrix createMatrix(int32_t numCols)¶
- Create a column major matrix. The number of rows is the dimension of parameter. The number of columns is specifed by numCols. 
 - 
void releaseMatrix(PServerMatrix handle)¶
- Release the PServerMatrix given handle. 
 - 
real vectorDotProduct(PServerVector u, PServerVector v)¶
- Calculate the dot product of u and v. 
 - 
void vectorScale(PServerVector u, real a)¶
- Scale u by a. 
 - 
void vectorCopy(PServerVector src, PServerVector dst)¶
- Copy from src to dest. 
 - 
void vectorAddMult(PServerVector u, PServerVector v, real a)¶
- u += v * a 
 - 
void vectorAddMultInto(PServerVector u, PServerVector v, PServerVector w, real a)¶
- u = v + w * a 
 - 
void vectorScaleInto(PServerVector u, PServerVector v, real a)¶
- u = v * a 
 - 
PServerVector getPServerParameterValue()¶
- Return pserver parameter value. 
 - 
PServerVector getPServerParameterGradient()¶
- Return pserver parameter gradient. 
 - 
void loadValueVector(const std::string &dirName)¶
- Tell pservers to load value vector from file. - Parameters
- dirName-- The directory that contains the value vector file. 
 
 
 - 
void saveValueVector(const std::string &dirName)¶
- Tell pservers to save value vector to file. 
 - 
void setTrainerId(int trainerId)¶
 - 
void setForwardbackwardTime(uint64_t delta)¶
 - Public Static Functions - 
int calcParameterBlockSize(const std::vector<ParameterPtr> ¶meters, size_t serviceNum)¶
 - Protected Functions - template <typename ProtoIn, typename ProtoOut>
- 
void multiCall(const char *funcName, const ProtoIn &request, std::vector<ProtoOut> *responses)¶
 - Protected Attributes - 
int port_¶
- start port number of pserver it deduce all ports for dense and sparse with some rules 
 - 
int trainerId_¶
- identify the trainer id using this client 
 - 
uint64_t forwardbackwordTime_¶
 - 
std::unordered_map<size_t, ParameterPtr> parameterMap_¶
- map id to parameter used for decoding protobuf data 
 - 
std::vector<ParameterSegments> allSegments_¶
- segments for all parameters that needed to sync 
 - 
std::unique_ptr<SparseParameterDistribution> sparseDistribution_¶
- module for sensing sparse parameters distribution on all pservers 
 - 
std::unique_ptr<SyncThreadPool> syncThreadPool_¶
- thread pool for parallelizing all connections to pservers 
 - 
bool passFinish_¶
 - Private Functions - 
void destroy()¶
 - 
void sendParallel(int tid, size_t numThreads, ParameterType recvParameterType)¶
- management function for parallelizing send/recv all connections to all pservers. it is called under one SyncThreadPool. it supports to use N thread to control M connections. the receiving actions can be started until all sending action to all connections owned by current thread are finished. Different connections controlled by different threads can transfer data asynchronously. 
 - 
virtual void send(int threadId)¶
- sending thread routine for asynchronously send data 
 - 
virtual void recv(int threadId)¶
- receiving thread routing for asynchronously receive data 
 - 
void prepareSendData(ParameterUpdateMode updateMode, ParameterType parameterType, const std::vector<ParameterSegments> ¶meterSegments, int64_t numSamples, real cost, bool sendBackParameter, ParameterType sendBackParameterType, BatchStatus batchStatus, SendJob *sendJob)¶
- main routine to build data for pserver - Note
- it can prepare different kinds of parameter type data. it can be regarded as layer for bridging real parameters data and protobuf data for communication. TODO(yanfei): can abstract additional layer to encode and decode data to/from protobuf data.
 
 - 
void initThreads()¶
- start necessary threads for threadPool 
 
-