Client

class paddle::BaseClient

it manages all connections to pservers. it exists two modes to manage connections to all pservers. Firstly, one connection owns two threads that separately manage to send and receive data. Secondly, each thread uses one connection for all activation in it. the first solution arms with sendThreads_/recvThreads_ and sendJobQueue_/ recvJobQueue_. the second solution use some shared thread pool to manage connections. In addition to pserver, metric learning also uses network to exchange features within multi-machines, so this class just abstracts some basic threads and queue buffer creation for them

Subclassed by paddle::ParameterClient2

Public Types

typedef std::shared_ptr<SendJob> SendJobPtr
typedef Queue<SendJobPtr> SendQueue

Public Functions

BaseClient(bool separate = false, int numPorts = FLAGS_ports_num)
virtual ~BaseClient()
template <class DataType>
void putData(int clientId, SendDataType type, DataType *datas, size_t size, DataUpdateMode mode)

send data to server, support only synchronize

template <class DataType>
void putOwnData(int clientId, SendDataType type, DataType *datas, size_t size)
template <class DataType>
void getAllData(int clientId, SendDataType type, DataType *datas, size_t size)
template <class DataType>
void reduce(DataType *sendBuf, DataType *recvBuf, size_t size, int clientId, int rootId)

Reduces values on all clients. This reduce just support SUM. The results are saved in recvBuf of rootId client

virtual TransDataType getTransDtype(const std::type_info &info)

return trans data type according to the input type

Protected Types

typedef std::unique_ptr<std::thread> ThreadPtr
typedef std::vector<std::vector<iovec>> InputIovs
typedef std::vector<SendParameterRequest> SendRequest
typedef std::vector<SendDataRequest> SendDataRequestVec

Protected Functions

int calcClientId(int i, int serviceNum)
void startThreads()

start threads in sendThreads_ and recvThreads_

void finishThreads()

finish threads in sendThreads_ and recvThreads_

template <class DataType>
void prepareData(int clientId, SendDataType type, DataUpdateMode updateMode, DataType *datas, size_t size, SendJob *sendJob)
template <class DataType>
void sendData(int clientId, SendDataType type, DataUpdateMode updateMode, DataType *datas, size_t size)

send data to all data servers

Note
each trainer sends all its data to all data servers it’s for broadcast data synchronization, such as features synchronization in metric learning.

void recvData()

recv data from all data servers

Note
synchronize all recv threads

template <typename ProtoIn, typename ProtoOut>
void multiCall(const char *funcName, const ProtoIn &request, std::vector<ProtoOut> *responses)

send request, and recv responses

void synchronize(SyncObject syncObjectId = SYNC_DEFAULT)

synchronize all trainers and pservers

Note
used to ensure that data of all trainers have been received

virtual void send(int threadId) = 0

use multithread to separately send data

Note
each thread should read its own JobQueue to handle requests each thread should calcClientId() to retrieve connections managed by himself. send and recv are implemented in child class.

virtual void recv(int threadId) = 0

use multithread to separately receive data

Note
almost same as send()

Protected Attributes

bool stopping_
int serviceNum_

nodes * ports that means the number of real pservers

int threadNum_

threads num for managing all services. Normally the number of pservers are relatively less than several hundreds so that using thread-based parallelization can benifit traffic performance and pserver’s sgd optimization performance.

std::vector<ProtoClient> clients_

the connection manager at client end

std::vector<ThreadPtr> sendThreads_

send threads for parallelization

std::vector<ThreadPtr> recvThreads_

recv threads for parallelization

std::unique_ptr<ThreadBarrier> recvSyncBarrier_
std::vector<std::unique_ptr<SendQueue>> sendJobQueue_

send/recv queue cooperates with each other to accomplish overlapping communication with forwardBackward action.

std::vector<std::unique_ptr<SendQueue>> recvJobQueue_

queue for buffering recv request

SendJob sendJob_

specific for dserver

int numPorts_

port num for each node

bool separateSendAndRecv_

if set, overlapped optimization is disabled

std::vector<CpuMemHandlePtr> recvDataMems_

Protected Static Functions

static int divup(int a, int b)

for a > 0, b > 0: return the smallest x s.t. b*x >= a

struct SendJob

Public Members

InputIovs parallelInputIovs

store parameters related blocks data

SendRequest parallelRequests

store protobuf request

SendDataRequestVec parallelDataRequests

store data, such as features for metric learning

class paddle::ParameterClient2

The client interface for parameter server. ParameterClient2 supports 2 modes for managing connections to parameter servers, in the 1st mode one connection is shared by 2 threads that are separately responsible for sending and recieving activities, in the 2nd mode one connection is owned by only one thread, and all the sending and recieving activities run in that single thread. TODO(yanfei): Additional core idea to further optimizate pserver performance is to do sync-sgd based parameter level instead of pserver level. full-parallelization based parameter level for sync-sgd also can sense forwardbackward computation layer-by-layer for more deeper layer model. Firstly, pserver can do full-parallelization on all computation based parameter level instead of waiting for all gradients are finished and start to send back parameters value immediately if parameter is ready instead of waiting for all parameters value are ready Secondly, parameter client can write back parameters to GPU instead of waiting until all parameters are received to CPU host end.

Inherits from paddle::BaseClient

Public Functions

ParameterClient2(bool separate = false, int port = FLAGS_port, int numPorts = FLAGS_ports_num)

Constructor.

Parameters
  • separate -

    True if sending and recieving activities are separated into 2 threads, otherwise false.

  • port -

    Port number that parameter client runs on.

  • numPorts -

    Number of ports parameter clients occupies, numPorts * pserver number is the total number of connections the parameter client maintains.

~ParameterClient2()
bool init(const std::vector<ParameterPtr> &parameters)
void sendAndReceiveParameter(ParameterUpdateMode updateMode, ParameterType parameterType, const std::vector<ParameterSegments> &segments, int64_t numSamples, real cost, bool sendBackParameter, ParameterType sendBackParameterType, ParameterType recvParameterType)

service functions

Sends the segments in parameter to parameter servers, then receives the response from the servers.

Note
Only parameterType will be sent.
Parameters
  • updateMode -

    Indicates how parameters should be updated on the server side.

  • parameterType -

    Type of parameter that will be sent.

  • segments -

    Segments in the parameter that will be sent.

  • numSamples -

    Number of samples this update is based on.

  • cost -

    Cost of the batch, will be used to calculate global object value.

  • sendBackParameter -

    True if the updated parameters should be sent back, otherwise false.

  • sendBackParameterType -

    Send back parameter type on pserver, PARAMETER_VALUE by default

  • recvParameterType -

    pserver[sendBackParameterType] will be copy to client[recvParameterType]

void sendAndReceiveParameter(ParameterUpdateMode updateMode, ParameterType parameterType, int64_t numSamples, real cost, bool sendBackParameter, ParameterType sendBackParameterType = PARAMETER_VALUE, ParameterType recvParameterType = PARAMETER_VALUE)

Sends all parameters to parameter servers, and receives the response from the servers.

void sendParameter(ParameterUpdateMode updateMode, ParameterType parameterType, const std::vector<ParameterSegments> &segments, int64_t numSamples, real cost, bool sendBackParameter, BatchStatus batchStatus)

Sends the segments in parameter to parameter servers. Each sendParameter() must be paired with a recvParameter() in the future. Only parameterType will be sent.

Note
This function is non-blocking. This means that parameter should not change between this call and recvParameter()
Parameters
  • updateMode -

    Indicates how parameters should be updated on the server side.

  • parameterType -

    Type of parameter that will be sent.

  • segments -

    Segments in the parameter that will be sent.

  • numSamples -

    Number of samples this update is based on.

  • cost -

    Cost of the batch, will be used to calculate global object value.

  • sendBackParameter -

    True if the updated parameters should be sent back, otherwise false.

  • batchStatus -

    Status of the batch.

void recvParameter()
void sendParameter(ParameterUpdateMode updateMode, ParameterType parameterType, int64_t numSamples, real cost, bool sendBackParameter, BatchStatus batchStatus)

Sends all parameters to parameter servers, recvParameter() have to be invoked afterwards.

Note
This function is non-blocking. This means that if parameter should not changes between this call and recvParameter()

void getParameter(ParameterType recvParameterType = PARAMETER_VALUE, ParameterType sendBackParameterType = PARAMETER_VALUE)

Get all parameters from parameter servers.

void getParameterSparse(ParameterType recvParameterType = PARAMETER_VALUE, ParameterType sendBackParameterType = PARAMETER_VALUE)

Get parameters by sparse row ids from parameter servers.

void setParameter()

Set all parameters on parameter servers using the local parameters.

void setParameterZero()

Set all parameters on parameter servers, values will be zero means do not sending local parameters

void waitPassStart()

Wait until all gradient servers start one pass.

Note
This is now only used by the gradient servers for “sgd” algorithm. Calling this function means that the calling gradient server is ready to start a new pass.

void waitPassFinish()

Wait until all gradient servers finish one pass.

Note
This is now only used by the gradient servers for “sgd” algorithm. Calling this function means that the calling gradient server finishes one pass.

void synchronize(SyncObject syncObjectId = SYNC_DEFAULT)

Wait until all gradient servers call this function.

void asyncFinishPass(SyncObject syncObjectId = SYNC_DEFAULT)

Called when async-sgd finish pass.

void asyncStartPass(SyncObject syncObjectId = SYNC_DEFAULT)
void doOperation(PreparedOperations &ops, bool waitForGradient, bool sendBackParameter, bool releasePass = true)

Execute the prepared operations on pservers, fetch the results and aggregate results from different pservers.

Parameters
  • ops -

    Prepared operations that will be executed on pservers.

  • waitForGradient -

    If true, wait for gradient to be ready before starting the operations.

  • sendBackParameter -

    If true, send back the parameter to clients after the operations are finished.

  • If -

    true, and if all clients call waitPassFinish, signal all clients finish the pass.

void setConfig(const OptimizationConfig &optConfig, const std::string &saveDir = "", bool isSparseServer = false)

Set the configuration of pserver, including parameter config and optimization config

bool inStatus(PServerStatus status)

Return true if all pservers are in the given status.

bool isPassFinish()
void setStatus(PServerStatus status)

Set pserver status.

void waitForStatus(PServerStatus status)

Wait until all pservers are at status.

Note
This function is not suitable for frequent use, because it sleeps 1 second each time when condition is satisfied.

PServerVector createVector()

Create a column vector. The size is the dimension of parameter.

void releaseVector(PServerVector handle)

Release the PServerVector given handle.

PServerMatrix createMatrix(int32_t numCols)

Create a column major matrix. The number of rows is the dimension of parameter. The number of columns is specifed by numCols.

void releaseMatrix(PServerMatrix handle)

Release the PServerMatrix given handle.

real vectorDotProduct(PServerVector u, PServerVector v)

Calculate the dot product of u and v.

void vectorScale(PServerVector u, real a)

Scale u by a.

void vectorCopy(PServerVector src, PServerVector dst)

Copy from src to dest.

void vectorAddMult(PServerVector u, PServerVector v, real a)

u += v * a

void vectorAddMultInto(PServerVector u, PServerVector v, PServerVector w, real a)

u = v + w * a

void vectorScaleInto(PServerVector u, PServerVector v, real a)

u = v * a

PServerVector getPServerParameterValue()

Return pserver parameter value.

PServerVector getPServerParameterGradient()

Return pserver parameter gradient.

void loadValueVector(const std::string &dirName)

Tell pservers to load value vector from file.

Parameters
  • dirName -

    The directory that contains the value vector file.

void saveValueVector(const std::string &dirName)

Tell pservers to save value vector to file.

void setTrainerId(int trainerId)
void setForwardbackwardTime(uint64_t delta)

Public Static Functions

int calcParameterBlockSize(const std::vector<ParameterPtr> &parameters, size_t serviceNum)

Protected Functions

template <typename ProtoIn, typename ProtoOut>
void multiCall(const char *funcName, const ProtoIn &request, std::vector<ProtoOut> *responses)

Protected Attributes

int port_

start port number of pserver it deduce all ports for dense and sparse with some rules

int trainerId_

identify the trainer id using this client

uint64_t forwardbackwordTime_
std::unordered_map<size_t, ParameterPtr> parameterMap_

map id to parameter used for decoding protobuf data

std::vector<ParameterSegments> allSegments_

segments for all parameters that needed to sync

std::unique_ptr<SparseParameterDistribution> sparseDistribution_

module for sensing sparse parameters distribution on all pservers

std::unique_ptr<SyncThreadPool> syncThreadPool_

thread pool for parallelizing all connections to pservers

bool passFinish_

Private Functions

void destroy()
void sendParallel(int tid, size_t numThreads, ParameterType recvParameterType)

management function for parallelizing send/recv all connections to all pservers. it is called under one SyncThreadPool. it supports to use N thread to control M connections. the receiving actions can be started until all sending action to all connections owned by current thread are finished. Different connections controlled by different threads can transfer data asynchronously.

virtual void send(int threadId)

sending thread routine for asynchronously send data

virtual void recv(int threadId)

receiving thread routing for asynchronously receive data

void prepareSendData(ParameterUpdateMode updateMode, ParameterType parameterType, const std::vector<ParameterSegments> &parameterSegments, int64_t numSamples, real cost, bool sendBackParameter, ParameterType sendBackParameterType, BatchStatus batchStatus, SendJob *sendJob)

main routine to build data for pserver

Note
it can prepare different kinds of parameter type data. it can be regarded as layer for bridging real parameters data and protobuf data for communication. TODO(yanfei): can abstract additional layer to encode and decode data to/from protobuf data.

void initThreads()

start necessary threads for threadPool