Client¶
-
class
paddle::
BaseClient
¶ it manages all connections to pservers. it exists two modes to manage connections to all pservers. Firstly, one connection owns two threads that separately manage to send and receive data. Secondly, each thread uses one connection for all activation in it. the first solution arms with sendThreads_/recvThreads_ and sendJobQueue_/ recvJobQueue_. the second solution use some shared thread pool to manage connections. In addition to pserver, metric learning also uses network to exchange features within multi-machines, so this class just abstracts some basic threads and queue buffer creation for them
Subclassed by paddle::ParameterClient2
Public Functions
-
BaseClient
(bool separate = false, int numPorts = FLAGS_ports_num)¶
-
~BaseClient
()¶
- template <class DataType>
-
void
putData
(int clientId, SendDataType type, DataType *datas, size_t size, DataUpdateMode mode)¶ send data to server, support only synchronize
- template <class DataType>
-
void
putOwnData
(int clientId, SendDataType type, DataType *datas, size_t size)¶
- template <class DataType>
-
void
getAllData
(int clientId, SendDataType type, DataType *datas, size_t size)¶
- template <class DataType>
-
void
reduce
(DataType *sendBuf, DataType *recvBuf, size_t size, int clientId, int rootId)¶ Reduces values on all clients. This reduce just support SUM. The results are saved in recvBuf of rootId client
-
virtual TransDataType
getTransDtype
(const std::type_info &info)¶ return trans data type according to the input type
Protected Types
-
typedef std::unique_ptr<std::thread>
ThreadPtr
¶
-
typedef std::vector<std::vector<iovec>>
InputIovs
¶
-
typedef std::vector<SendParameterRequest>
SendRequest
¶
-
typedef std::vector<SendDataRequest>
SendDataRequestVec
¶
Protected Functions
-
int
calcClientId
(int i, int serviceNum)¶
-
void
startThreads
()¶ start threads in sendThreads_ and recvThreads_
-
void
finishThreads
()¶ finish threads in sendThreads_ and recvThreads_
- template <class DataType>
-
void
prepareData
(int clientId, SendDataType type, DataUpdateMode updateMode, DataType *datas, size_t size, SendJob *sendJob)¶
- template <class DataType>
-
void
sendData
(int clientId, SendDataType type, DataUpdateMode updateMode, DataType *datas, size_t size)¶ send data to all data servers
- Note
- each trainer sends all its data to all data servers it’s for broadcast data synchronization, such as features synchronization in metric learning.
-
void
recvData
()¶ recv data from all data servers
- Note
- synchronize all recv threads
- template <typename ProtoIn, typename ProtoOut>
-
void
multiCall
(const char *funcName, const ProtoIn &request, std::vector<ProtoOut> *responses)¶ send request, and recv responses
-
void
synchronize
(SyncObject syncObjectId = SYNC_DEFAULT)¶ synchronize all trainers and pservers
- Note
- used to ensure that data of all trainers have been received
-
virtual void
send
(int threadId) = 0¶ use multithread to separately send data
- Note
- each thread should read its own JobQueue to handle requests each thread should calcClientId() to retrieve connections managed by himself. send and recv are implemented in child class.
Protected Attributes
-
bool
stopping_
¶
-
int
serviceNum_
¶ nodes * ports that means the number of real pservers
-
int
threadNum_
¶ threads num for managing all services. Normally the number of pservers are relatively less than several hundreds so that using thread-based parallelization can benifit traffic performance and pserver’s sgd optimization performance.
-
std::vector<ProtoClient>
clients_
¶ the connection manager at client end
-
std::unique_ptr<ThreadBarrier>
recvSyncBarrier_
¶
-
std::vector<std::unique_ptr<SendQueue>>
sendJobQueue_
¶ send/recv queue cooperates with each other to accomplish overlapping communication with forwardBackward action.
-
int
numPorts_
¶ port num for each node
-
bool
separateSendAndRecv_
¶ if set, overlapped optimization is disabled
-
std::vector<CpuMemHandlePtr>
recvDataMems_
¶
Protected Static Functions
-
static int
divup
(int a, int b)¶ for a > 0, b > 0: return the smallest x s.t. b*x >= a
-
struct
SendJob
¶ Public Members
-
SendRequest
parallelRequests
¶ store protobuf request
-
SendDataRequestVec
parallelDataRequests
¶ store data, such as features for metric learning
-
SendRequest
-
-
class
paddle::
ParameterClient2
¶ The client interface for parameter server. ParameterClient2 supports 2 modes for managing connections to parameter servers, in the 1st mode one connection is shared by 2 threads that are separately responsible for sending and recieving activities, in the 2nd mode one connection is owned by only one thread, and all the sending and recieving activities run in that single thread. TODO(yanfei): Additional core idea to further optimizate pserver performance is to do sync-sgd based parameter level instead of pserver level. full-parallelization based parameter level for sync-sgd also can sense forwardbackward computation layer-by-layer for more deeper layer model. Firstly, pserver can do full-parallelization on all computation based parameter level instead of waiting for all gradients are finished and start to send back parameters value immediately if parameter is ready instead of waiting for all parameters value are ready Secondly, parameter client can write back parameters to GPU instead of waiting until all parameters are received to CPU host end.
Inherits from paddle::BaseClient
Public Functions
-
ParameterClient2
(bool separate = false, int port = FLAGS_port, int numPorts = FLAGS_ports_num)¶ Constructor.
- Parameters
separate
: True if sending and recieving activities are separated into 2 threads, otherwise false.port
: Port number that parameter client runs on.numPorts
: Number of ports parameter clients occupies, numPorts * pserver number is the total number of connections the parameter client maintains.
-
~ParameterClient2
()¶
-
bool
init
(const std::vector<ParameterPtr> ¶meters)¶
-
void
sendAndReceiveParameter
(ParameterUpdateMode updateMode, ParameterType parameterType, const std::vector<ParameterSegments> &segments, int64_t numSamples, real cost, bool sendBackParameter, ParameterType sendBackParameterType, ParameterType recvParameterType)¶ service functions
Sends the segments in parameter to parameter servers, then receives the response from the servers.
- Note
- Only parameterType will be sent.
- Parameters
updateMode
: Indicates how parameters should be updated on the server side.parameterType
: Type of parameter that will be sent.segments
: Segments in the parameter that will be sent.numSamples
: Number of samples this update is based on.cost
: Cost of the batch, will be used to calculate global object value.sendBackParameter
: True if the updated parameters should be sent back, otherwise false.sendBackParameterType
: Send back parameter type on pserver, PARAMETER_VALUE by defaultrecvParameterType
: pserver[sendBackParameterType] will be copy to client[recvParameterType]
-
void
sendAndReceiveParameter
(ParameterUpdateMode updateMode, ParameterType parameterType, int64_t numSamples, real cost, bool sendBackParameter, ParameterType sendBackParameterType = PARAMETER_VALUE, ParameterType recvParameterType = PARAMETER_VALUE)¶ Sends all parameters to parameter servers, and receives the response from the servers.
-
void
sendParameter
(ParameterUpdateMode updateMode, ParameterType parameterType, const std::vector<ParameterSegments> &segments, int64_t numSamples, real cost, bool sendBackParameter, BatchStatus batchStatus)¶ Sends the segments in parameter to parameter servers. Each sendParameter() must be paired with a recvParameter() in the future. Only parameterType will be sent.
- Note
- This function is non-blocking. This means that parameter should not change between this call and recvParameter()
- Parameters
updateMode
: Indicates how parameters should be updated on the server side.parameterType
: Type of parameter that will be sent.segments
: Segments in the parameter that will be sent.numSamples
: Number of samples this update is based on.cost
: Cost of the batch, will be used to calculate global object value.sendBackParameter
: True if the updated parameters should be sent back, otherwise false.batchStatus
: Status of the batch.
-
void
recvParameter
()¶
-
void
sendParameter
(ParameterUpdateMode updateMode, ParameterType parameterType, int64_t numSamples, real cost, bool sendBackParameter, BatchStatus batchStatus)¶ Sends all parameters to parameter servers, recvParameter() have to be invoked afterwards.
- Note
- This function is non-blocking. This means that if parameter should not changes between this call and recvParameter()
-
void
getParameter
(ParameterType recvParameterType = PARAMETER_VALUE, ParameterType sendBackParameterType = PARAMETER_VALUE)¶ Get all parameters from parameter servers.
-
void
getParameterSparse
(ParameterType recvParameterType = PARAMETER_VALUE, ParameterType sendBackParameterType = PARAMETER_VALUE)¶ Get parameters by sparse row ids from parameter servers.
-
void
setParameter
()¶ Set all parameters on parameter servers using the local parameters.
-
void
setParameterZero
()¶ Set all parameters on parameter servers, values will be zero means do not sending local parameters
-
void
waitPassStart
()¶ Wait until all gradient servers start one pass.
- Note
- This is now only used by the gradient servers for “sgd” algorithm. Calling this function means that the calling gradient server is ready to start a new pass.
-
void
waitPassFinish
()¶ Wait until all gradient servers finish one pass.
- Note
- This is now only used by the gradient servers for “sgd” algorithm. Calling this function means that the calling gradient server finishes one pass.
-
void
synchronize
(SyncObject syncObjectId = SYNC_DEFAULT)¶ Wait until all gradient servers call this function.
-
void
asyncFinishPass
(SyncObject syncObjectId = SYNC_DEFAULT)¶ Called when async-sgd finish pass.
-
void
asyncStartPass
(SyncObject syncObjectId = SYNC_DEFAULT)¶
-
void
doOperation
(PreparedOperations &ops, bool waitForGradient, bool sendBackParameter, bool releasePass = true)¶ Execute the prepared operations on pservers, fetch the results and aggregate results from different pservers.
- Parameters
ops
: Prepared operations that will be executed on pservers.waitForGradient
: If true, wait for gradient to be ready before starting the operations.sendBackParameter
: If true, send back the parameter to clients after the operations are finished.If
: true, and if all clients call waitPassFinish, signal all clients finish the pass.
-
void
setConfig
(const OptimizationConfig &optConfig, const std::string &saveDir = "", bool isSparseServer = false)¶ Set the configuration of pserver, including parameter config and optimization config
-
bool
inStatus
(PServerStatus status)¶ Return true if all pservers are in the given status.
-
bool
isPassFinish
()¶
-
void
setStatus
(PServerStatus status)¶ Set pserver status.
-
void
waitForStatus
(PServerStatus status)¶ Wait until all pservers are at status.
- Note
- This function is not suitable for frequent use, because it sleeps 1 second each time when condition is satisfied.
-
PServerVector
createVector
()¶ Create a column vector. The size is the dimension of parameter.
-
void
releaseVector
(PServerVector handle)¶ Release the PServerVector given handle.
-
PServerMatrix
createMatrix
(int32_t numCols)¶ Create a column major matrix. The number of rows is the dimension of parameter. The number of columns is specifed by numCols.
-
void
releaseMatrix
(PServerMatrix handle)¶ Release the PServerMatrix given handle.
-
real
vectorDotProduct
(PServerVector u, PServerVector v)¶ Calculate the dot product of u and v.
-
void
vectorScale
(PServerVector u, real a)¶ Scale u by a.
-
void
vectorCopy
(PServerVector src, PServerVector dst)¶ Copy from src to dest.
-
void
vectorAddMult
(PServerVector u, PServerVector v, real a)¶ u += v * a
-
void
vectorAddMultInto
(PServerVector u, PServerVector v, PServerVector w, real a)¶ u = v + w * a
-
void
vectorScaleInto
(PServerVector u, PServerVector v, real a)¶ u = v * a
-
PServerVector
getPServerParameterValue
()¶ Return pserver parameter value.
-
PServerVector
getPServerParameterGradient
()¶ Return pserver parameter gradient.
-
void
loadValueVector
(const std::string &dirName)¶ Tell pservers to load value vector from file.
- Parameters
dirName
: The directory that contains the value vector file.
-
void
saveValueVector
(const std::string &dirName)¶ Tell pservers to save value vector to file.
-
void
setTrainerId
(int trainerId)¶
-
void
setForwardbackwardTime
(uint64_t delta)¶
Public Static Functions
-
int
calcParameterBlockSize
(const std::vector<ParameterPtr> ¶meters, size_t serviceNum)¶
Protected Functions
- template <typename ProtoIn, typename ProtoOut>
-
void
multiCall
(const char *funcName, const ProtoIn &request, std::vector<ProtoOut> *responses)¶
Protected Attributes
-
int
port_
¶ start port number of pserver it deduce all ports for dense and sparse with some rules
-
int
trainerId_
¶ identify the trainer id using this client
-
uint64_t
forwardbackwordTime_
¶
-
std::unordered_map<size_t, ParameterPtr>
parameterMap_
¶ map id to parameter used for decoding protobuf data
-
std::vector<ParameterSegments>
allSegments_
¶ segments for all parameters that needed to sync
-
std::unique_ptr<SparseParameterDistribution>
sparseDistribution_
¶ module for sensing sparse parameters distribution on all pservers
-
std::unique_ptr<SyncThreadPool>
syncThreadPool_
¶ thread pool for parallelizing all connections to pservers
-
bool
passFinish_
¶
Private Functions
-
void
destroy
()¶
-
void
sendParallel
(int tid, size_t numThreads, ParameterType recvParameterType)¶ management function for parallelizing send/recv all connections to all pservers. it is called under one SyncThreadPool. it supports to use N thread to control M connections. the receiving actions can be started until all sending action to all connections owned by current thread are finished. Different connections controlled by different threads can transfer data asynchronously.
-
void
send
(int threadId)¶ sending thread routine for asynchronously send data
-
void
recv
(int threadId)¶ receiving thread routing for asynchronously receive data
-
void
prepareSendData
(ParameterUpdateMode updateMode, ParameterType parameterType, const std::vector<ParameterSegments> ¶meterSegments, int64_t numSamples, real cost, bool sendBackParameter, ParameterType sendBackParameterType, BatchStatus batchStatus, SendJob *sendJob)¶ main routine to build data for pserver
- Note
- it can prepare different kinds of parameter type data. it can be regarded as layer for bridging real parameters data and protobuf data for communication. TODO(yanfei): can abstract additional layer to encode and decode data to/from protobuf data.
-
void
initThreads
()¶ start necessary threads for threadPool
-