Client¶
- 
class 
paddle::BaseClient¶ it manages all connections to pservers. it exists two modes to manage connections to all pservers. Firstly, one connection owns two threads that separately manage to send and receive data. Secondly, each thread uses one connection for all activation in it. the first solution arms with sendThreads_/recvThreads_ and sendJobQueue_/ recvJobQueue_. the second solution use some shared thread pool to manage connections. In addition to pserver, metric learning also uses network to exchange features within multi-machines, so this class just abstracts some basic threads and queue buffer creation for them
Subclassed by paddle::ParameterClient2
Public Functions
- 
BaseClient(bool separate = false, int numPorts = FLAGS_ports_num)¶ 
- 
virtual 
~BaseClient()¶ 
- template <class DataType>
 - 
void 
putData(int clientId, SendDataType type, DataType *datas, size_t size, DataUpdateMode mode)¶ send data to server, support only synchronize
- template <class DataType>
 - 
void 
putOwnData(int clientId, SendDataType type, DataType *datas, size_t size)¶ 
- template <class DataType>
 - 
void 
getAllData(int clientId, SendDataType type, DataType *datas, size_t size)¶ 
- template <class DataType>
 - 
void 
reduce(DataType *sendBuf, DataType *recvBuf, size_t size, int clientId, int rootId)¶ Reduces values on all clients. This reduce just support SUM. The results are saved in recvBuf of rootId client
- 
virtual TransDataType 
getTransDtype(const std::type_info &info)¶ return trans data type according to the input type
Protected Types
- 
typedef std::unique_ptr<std::thread> 
ThreadPtr¶ 
- 
typedef std::vector<std::vector<iovec>> 
InputIovs¶ 
- 
typedef std::vector<SendParameterRequest> 
SendRequest¶ 
- 
typedef std::vector<SendDataRequest> 
SendDataRequestVec¶ 
Protected Functions
- 
int 
calcClientId(int i, int serviceNum)¶ 
- 
void 
startThreads()¶ start threads in sendThreads_ and recvThreads_
- 
void 
finishThreads()¶ finish threads in sendThreads_ and recvThreads_
- template <class DataType>
 - 
void 
prepareData(int clientId, SendDataType type, DataUpdateMode updateMode, DataType *datas, size_t size, SendJob *sendJob)¶ 
- template <class DataType>
 - 
void 
sendData(int clientId, SendDataType type, DataUpdateMode updateMode, DataType *datas, size_t size)¶ send data to all data servers
- Note
 - each trainer sends all its data to all data servers it’s for broadcast data synchronization, such as features synchronization in metric learning.
 
- 
void 
recvData()¶ recv data from all data servers
- Note
 - synchronize all recv threads
 
- template <typename ProtoIn, typename ProtoOut>
 - 
void 
multiCall(const char *funcName, const ProtoIn &request, std::vector<ProtoOut> *responses)¶ send request, and recv responses
- 
void 
synchronize(SyncObject syncObjectId = SYNC_DEFAULT)¶ synchronize all trainers and pservers
- Note
 - used to ensure that data of all trainers have been received
 
- 
virtual void 
send(int threadId) = 0¶ use multithread to separately send data
- Note
 - each thread should read its own JobQueue to handle requests each thread should calcClientId() to retrieve connections managed by himself. send and recv are implemented in child class.
 
Protected Attributes
- 
bool 
stopping_¶ 
- 
int 
serviceNum_¶ nodes * ports that means the number of real pservers
- 
int 
threadNum_¶ threads num for managing all services. Normally the number of pservers are relatively less than several hundreds so that using thread-based parallelization can benifit traffic performance and pserver’s sgd optimization performance.
- 
std::vector<ProtoClient> 
clients_¶ the connection manager at client end
- 
std::unique_ptr<ThreadBarrier> 
recvSyncBarrier_¶ 
- 
std::vector<std::unique_ptr<SendQueue>> 
sendJobQueue_¶ send/recv queue cooperates with each other to accomplish overlapping communication with forwardBackward action.
- 
int 
numPorts_¶ port num for each node
- 
bool 
separateSendAndRecv_¶ if set, overlapped optimization is disabled
- 
std::vector<CpuMemHandlePtr> 
recvDataMems_¶ 
Protected Static Functions
- 
static int 
divup(int a, int b)¶ for a > 0, b > 0: return the smallest x s.t. b*x >= a
- 
struct 
SendJob¶ Public Members
- 
SendRequest 
parallelRequests¶ store protobuf request
- 
SendDataRequestVec 
parallelDataRequests¶ store data, such as features for metric learning
- 
SendRequest 
 
- 
 
- 
class 
paddle::ParameterClient2¶ The client interface for parameter server. ParameterClient2 supports 2 modes for managing connections to parameter servers, in the 1st mode one connection is shared by 2 threads that are separately responsible for sending and recieving activities, in the 2nd mode one connection is owned by only one thread, and all the sending and recieving activities run in that single thread. TODO(yanfei): Additional core idea to further optimizate pserver performance is to do sync-sgd based parameter level instead of pserver level. full-parallelization based parameter level for sync-sgd also can sense forwardbackward computation layer-by-layer for more deeper layer model. Firstly, pserver can do full-parallelization on all computation based parameter level instead of waiting for all gradients are finished and start to send back parameters value immediately if parameter is ready instead of waiting for all parameters value are ready Secondly, parameter client can write back parameters to GPU instead of waiting until all parameters are received to CPU host end.
Inherits from paddle::BaseClient
Public Functions
- 
ParameterClient2(bool separate = false, int port = FLAGS_port, int numPorts = FLAGS_ports_num)¶ Constructor.
- Parameters
 separate-True if sending and recieving activities are separated into 2 threads, otherwise false.
port-Port number that parameter client runs on.
numPorts-Number of ports parameter clients occupies, numPorts * pserver number is the total number of connections the parameter client maintains.
- 
~ParameterClient2()¶ 
- 
bool 
init(const std::vector<ParameterPtr> ¶meters)¶ 
- 
void 
sendAndReceiveParameter(ParameterUpdateMode updateMode, ParameterType parameterType, const std::vector<ParameterSegments> &segments, int64_t numSamples, real cost, bool sendBackParameter, ParameterType sendBackParameterType, ParameterType recvParameterType)¶ service functions
Sends the segments in parameter to parameter servers, then receives the response from the servers.
- Note
 - Only parameterType will be sent.
 - Parameters
 updateMode-Indicates how parameters should be updated on the server side.
parameterType-Type of parameter that will be sent.
segments-Segments in the parameter that will be sent.
numSamples-Number of samples this update is based on.
cost-Cost of the batch, will be used to calculate global object value.
sendBackParameter-True if the updated parameters should be sent back, otherwise false.
sendBackParameterType-Send back parameter type on pserver, PARAMETER_VALUE by default
recvParameterType-pserver[sendBackParameterType] will be copy to client[recvParameterType]
- 
void 
sendAndReceiveParameter(ParameterUpdateMode updateMode, ParameterType parameterType, int64_t numSamples, real cost, bool sendBackParameter, ParameterType sendBackParameterType = PARAMETER_VALUE, ParameterType recvParameterType = PARAMETER_VALUE)¶ Sends all parameters to parameter servers, and receives the response from the servers.
- 
void 
sendParameter(ParameterUpdateMode updateMode, ParameterType parameterType, const std::vector<ParameterSegments> &segments, int64_t numSamples, real cost, bool sendBackParameter, BatchStatus batchStatus)¶ Sends the segments in parameter to parameter servers. Each sendParameter() must be paired with a recvParameter() in the future. Only parameterType will be sent.
- Note
 - This function is non-blocking. This means that parameter should not change between this call and recvParameter()
 - Parameters
 updateMode-Indicates how parameters should be updated on the server side.
parameterType-Type of parameter that will be sent.
segments-Segments in the parameter that will be sent.
numSamples-Number of samples this update is based on.
cost-Cost of the batch, will be used to calculate global object value.
sendBackParameter-True if the updated parameters should be sent back, otherwise false.
batchStatus-Status of the batch.
- 
void 
recvParameter()¶ 
- 
void 
sendParameter(ParameterUpdateMode updateMode, ParameterType parameterType, int64_t numSamples, real cost, bool sendBackParameter, BatchStatus batchStatus)¶ Sends all parameters to parameter servers, recvParameter() have to be invoked afterwards.
- Note
 - This function is non-blocking. This means that if parameter should not changes between this call and recvParameter()
 
- 
void 
getParameter(ParameterType recvParameterType = PARAMETER_VALUE, ParameterType sendBackParameterType = PARAMETER_VALUE)¶ Get all parameters from parameter servers.
- 
void 
getParameterSparse(ParameterType recvParameterType = PARAMETER_VALUE, ParameterType sendBackParameterType = PARAMETER_VALUE)¶ Get parameters by sparse row ids from parameter servers.
- 
void 
setParameter()¶ Set all parameters on parameter servers using the local parameters.
- 
void 
setParameterZero()¶ Set all parameters on parameter servers, values will be zero means do not sending local parameters
- 
void 
waitPassStart()¶ Wait until all gradient servers start one pass.
- Note
 - This is now only used by the gradient servers for “sgd” algorithm. Calling this function means that the calling gradient server is ready to start a new pass.
 
- 
void 
waitPassFinish()¶ Wait until all gradient servers finish one pass.
- Note
 - This is now only used by the gradient servers for “sgd” algorithm. Calling this function means that the calling gradient server finishes one pass.
 
- 
void 
synchronize(SyncObject syncObjectId = SYNC_DEFAULT)¶ Wait until all gradient servers call this function.
- 
void 
asyncFinishPass(SyncObject syncObjectId = SYNC_DEFAULT)¶ Called when async-sgd finish pass.
- 
void 
asyncStartPass(SyncObject syncObjectId = SYNC_DEFAULT)¶ 
- 
void 
doOperation(PreparedOperations &ops, bool waitForGradient, bool sendBackParameter, bool releasePass = true)¶ Execute the prepared operations on pservers, fetch the results and aggregate results from different pservers.
- Parameters
 ops-Prepared operations that will be executed on pservers.
waitForGradient-If true, wait for gradient to be ready before starting the operations.
sendBackParameter-If true, send back the parameter to clients after the operations are finished.
If-true, and if all clients call waitPassFinish, signal all clients finish the pass.
- 
void 
setConfig(const OptimizationConfig &optConfig, const std::string &saveDir = "", bool isSparseServer = false)¶ Set the configuration of pserver, including parameter config and optimization config
- 
bool 
inStatus(PServerStatus status)¶ Return true if all pservers are in the given status.
- 
bool 
isPassFinish()¶ 
- 
void 
setStatus(PServerStatus status)¶ Set pserver status.
- 
void 
waitForStatus(PServerStatus status)¶ Wait until all pservers are at status.
- Note
 - This function is not suitable for frequent use, because it sleeps 1 second each time when condition is satisfied.
 
- 
PServerVector 
createVector()¶ Create a column vector. The size is the dimension of parameter.
- 
void 
releaseVector(PServerVector handle)¶ Release the PServerVector given handle.
- 
PServerMatrix 
createMatrix(int32_t numCols)¶ Create a column major matrix. The number of rows is the dimension of parameter. The number of columns is specifed by numCols.
- 
void 
releaseMatrix(PServerMatrix handle)¶ Release the PServerMatrix given handle.
- 
real 
vectorDotProduct(PServerVector u, PServerVector v)¶ Calculate the dot product of u and v.
- 
void 
vectorScale(PServerVector u, real a)¶ Scale u by a.
- 
void 
vectorCopy(PServerVector src, PServerVector dst)¶ Copy from src to dest.
- 
void 
vectorAddMult(PServerVector u, PServerVector v, real a)¶ u += v * a
- 
void 
vectorAddMultInto(PServerVector u, PServerVector v, PServerVector w, real a)¶ u = v + w * a
- 
void 
vectorScaleInto(PServerVector u, PServerVector v, real a)¶ u = v * a
- 
PServerVector 
getPServerParameterValue()¶ Return pserver parameter value.
- 
PServerVector 
getPServerParameterGradient()¶ Return pserver parameter gradient.
- 
void 
loadValueVector(const std::string &dirName)¶ Tell pservers to load value vector from file.
- Parameters
 dirName-The directory that contains the value vector file.
- 
void 
saveValueVector(const std::string &dirName)¶ Tell pservers to save value vector to file.
- 
void 
setTrainerId(int trainerId)¶ 
- 
void 
setForwardbackwardTime(uint64_t delta)¶ 
Public Static Functions
- 
int 
calcParameterBlockSize(const std::vector<ParameterPtr> ¶meters, size_t serviceNum)¶ 
Protected Functions
- template <typename ProtoIn, typename ProtoOut>
 - 
void 
multiCall(const char *funcName, const ProtoIn &request, std::vector<ProtoOut> *responses)¶ 
Protected Attributes
- 
int 
port_¶ start port number of pserver it deduce all ports for dense and sparse with some rules
- 
int 
trainerId_¶ identify the trainer id using this client
- 
uint64_t 
forwardbackwordTime_¶ 
- 
std::unordered_map<size_t, ParameterPtr> 
parameterMap_¶ map id to parameter used for decoding protobuf data
- 
std::vector<ParameterSegments> 
allSegments_¶ segments for all parameters that needed to sync
- 
std::unique_ptr<SparseParameterDistribution> 
sparseDistribution_¶ module for sensing sparse parameters distribution on all pservers
- 
std::unique_ptr<SyncThreadPool> 
syncThreadPool_¶ thread pool for parallelizing all connections to pservers
- 
bool 
passFinish_¶ 
Private Functions
- 
void 
destroy()¶ 
- 
void 
sendParallel(int tid, size_t numThreads, ParameterType recvParameterType)¶ management function for parallelizing send/recv all connections to all pservers. it is called under one SyncThreadPool. it supports to use N thread to control M connections. the receiving actions can be started until all sending action to all connections owned by current thread are finished. Different connections controlled by different threads can transfer data asynchronously.
- 
virtual void 
send(int threadId)¶ sending thread routine for asynchronously send data
- 
virtual void 
recv(int threadId)¶ receiving thread routing for asynchronously receive data
- 
void 
prepareSendData(ParameterUpdateMode updateMode, ParameterType parameterType, const std::vector<ParameterSegments> ¶meterSegments, int64_t numSamples, real cost, bool sendBackParameter, ParameterType sendBackParameterType, BatchStatus batchStatus, SendJob *sendJob)¶ main routine to build data for pserver
- Note
 - it can prepare different kinds of parameter type data. it can be regarded as layer for bridging real parameters data and protobuf data for communication. TODO(yanfei): can abstract additional layer to encode and decode data to/from protobuf data.
 
- 
void 
initThreads()¶ start necessary threads for threadPool
-