diff --git a/demo/mnist/api_train.py b/demo/mnist/api_train.py index f301da382ff8a5bc16d9c18b956f78566ed4894f..8573d8143a085b8d2e0bcf7df17b1abe177029df 100644 --- a/demo/mnist/api_train.py +++ b/demo/mnist/api_train.py @@ -13,15 +13,7 @@ import numpy as np import random from mnist_util import read_from_mnist from paddle.trainer_config_helpers import * - - -def optimizer_config(): - settings( - learning_rate=1e-4, - learning_method=AdamOptimizer(), - batch_size=1000, - model_average=ModelAverage(average_window=0.5), - regularization=L2Regularization(rate=0.5)) +import paddle.v2 def network_config(): @@ -75,19 +67,23 @@ def input_order_converter(generator): def main(): api.initPaddle("-use_gpu=false", "-trainer_count=4") # use 4 cpu cores - # get enable_types for each optimizer. - # enable_types = [value, gradient, momentum, etc] - # For each optimizer(SGD, Adam), GradientMachine should enable different - # buffers. - opt_config_proto = parse_optimizer_config(optimizer_config) - opt_config = api.OptimizationConfig.createFromProto(opt_config_proto) - _temp_optimizer_ = api.ParameterOptimizer.create(opt_config) - enable_types = _temp_optimizer_.getParameterTypes() + optimizer = paddle.v2.optimizer.Adam( + learning_rate=1e-4, + batch_size=1000, + model_average=ModelAverage(average_window=0.5), + regularization=L2Regularization(rate=0.5)) + + # Create Local Updater. Local means not run in cluster. + # For a cluster training, here we can change to createRemoteUpdater + # in future. + updater = optimizer.create_local_updater() + assert isinstance(updater, api.ParameterUpdater) # Create Simple Gradient Machine. model_config = parse_network_config(network_config) - m = api.GradientMachine.createFromConfigProto( - model_config, api.CREATE_MODE_NORMAL, enable_types) + m = api.GradientMachine.createFromConfigProto(model_config, + api.CREATE_MODE_NORMAL, + optimizer.enable_types()) # This type check is not useful. Only enable type hint in IDE. # Such as PyCharm @@ -96,12 +92,6 @@ def main(): # Initialize Parameter by numpy. init_parameter(network=m) - # Create Local Updater. Local means not run in cluster. - # For a cluster training, here we can change to createRemoteUpdater - # in future. - updater = api.ParameterUpdater.createLocalUpdater(opt_config) - assert isinstance(updater, api.ParameterUpdater) - # Initialize ParameterUpdater. updater.init(m) diff --git a/doc/howto/usage/cmd_parameter/arguments_cn.md b/doc/howto/usage/cmd_parameter/arguments_cn.md index 833e21dd19ef3c01f5ef990bd12c3fc3b41ba483..2e2a2fcc54a09f4f41e4ebbc317e1409591ddd9c 100644 --- a/doc/howto/usage/cmd_parameter/arguments_cn.md +++ b/doc/howto/usage/cmd_parameter/arguments_cn.md @@ -127,11 +127,6 @@ √√ - -allow_inefficient_sparse_update -√√ - - start_pass √√ diff --git a/doc/howto/usage/cmd_parameter/arguments_en.md b/doc/howto/usage/cmd_parameter/arguments_en.md index 013edbc9047817d7f6b82c4d5188412bd2ce41d6..e5546f0ddc78a9f8bdc306a19c2fe9a415463e5a 100644 --- a/doc/howto/usage/cmd_parameter/arguments_en.md +++ b/doc/howto/usage/cmd_parameter/arguments_en.md @@ -127,11 +127,6 @@ It looks like there are a lot of arguments. However, most of them are for develo √√ - -allow_inefficient_sparse_update -√√ - - start_pass √√ diff --git a/doc/howto/usage/cmd_parameter/detail_introduction_cn.md b/doc/howto/usage/cmd_parameter/detail_introduction_cn.md index dbf7c6f00b8ba5c62d86fb2143221a27330b9506..3b573a324d541b024600a254d5266e517db229c5 100644 --- a/doc/howto/usage/cmd_parameter/detail_introduction_cn.md +++ b/doc/howto/usage/cmd_parameter/detail_introduction_cn.md @@ -306,10 +306,6 @@ - 指示是否显示参数服务器上的稀疏参数分布的日志细节. - 类型: bool (默认: 0). -* `--allow_inefficient_sparse_update` - - 指示是否允许低效率的稀疏更新. - - 类型: bool (默认: 0). - * `--check_sparse_distribution_batches` - 每运行多少个批次执行一次稀疏参数分布的检查. - 类型: int32 (默认: 100). diff --git a/doc/howto/usage/cmd_parameter/detail_introduction_en.md b/doc/howto/usage/cmd_parameter/detail_introduction_en.md index aa69a3bd5423c4f3223242bdafda251271925f2d..33b7ec0d51a96ee126197e7aa819fdae0d3dc353 100644 --- a/doc/howto/usage/cmd_parameter/detail_introduction_en.md +++ b/doc/howto/usage/cmd_parameter/detail_introduction_en.md @@ -310,10 +310,6 @@ - show log details for sparse parameter distribution in pserver. - type: bool (default: 0). -* `--allow_inefficient_sparse_update` - - Whether to allow inefficient sparse update. - - type: bool (default: 0). - * `--check_sparse_distribution_batches` - Running sparse parameter distribution check every so many batches. - type: int32 (default: 100). diff --git a/doc/howto/usage/k8s/k8s_aws_en.md b/doc/howto/usage/k8s/k8s_aws_en.md index a6422b9be00e210a6a305260585520acd72fb2f1..ce5ccbca5ee33b334872d9bdde42ac2ede5d041c 100644 --- a/doc/howto/usage/k8s/k8s_aws_en.md +++ b/doc/howto/usage/k8s/k8s_aws_en.md @@ -1,6 +1,56 @@ -# Kubernetes on AWS -## Create AWS Account and IAM Account +# Distributed PaddlePaddle Training on AWS with Kubernetes + +We will show you step by step on how to run distributed PaddlePaddle training on AWS cluster with Kubernetes. Let's start from core concepts. + +## Distributed PaddlePaddle Training Core Concepts + +### Distributed Training Job + +A distributed training job is represented by a [Kubernetes job](https://kubernetes.io/docs/user-guide/jobs/#what-is-a-job). + +Each Kuberentes job is described by a job config file, which specifies the information like the number of [pods](https://kubernetes.io/docs/user-guide/pods/#what-is-a-pod) in the job and environment variables. + +In a distributed training job, we would: + +1. prepare partitioned training data and configuration file on a distributed file system (in this tutorial we use Amazon Elastic File System), and +1. create and submit the Kubernetes job config to the Kubernetes cluster to start the training job. + +### Parameter Servers and Trainers + +There are two roles in a PaddlePaddle cluster: *parameter server (pserver)* and *trainer*. Each parameter server process maintains a shard of the global model. Each trainer has its local copy of the model, and uses its local data to update the model. During the training process, trainers send model updates to parameter servers, parameter servers are responsible for aggregating these updates, so that trainers can synchronize their local copy with the global model. + +
![Model is partitioned into two shards. Managed by two parameter servers respectively.](src/pserver_and_trainer.png)
+ +In order to communicate with pserver, trainer needs to know the ip address of each pserver. In kubernetes it's better to use a service discovery mechanism (e.g., DNS hostname) rather than static ip address, since any pserver's pod may be killed and a new pod could be schduled onto another node of different ip address. However, now we are using static ip. This will be improved. + +Parameter server and trainer are packaged into a same docker image. They will run once pod is scheduled by kubernetes job. + +### Trainer ID + +Each trainer process requires a trainer ID, a zero-based index value, passed in as a command-line parameter. The trainer process thus reads the data partition indexed by this ID. + +### Training + +The entry-point of a container is a shell script. It can see some environment variables pre-defined by Kubernetes. This includes one that gives the job's identity, which can be used in a remote call to the Kubernetes apiserver that lists all pods in the job. + +We rank each pod by sorting them by their ips. The rank of each pod could be the "pod ID". Because we run one trainer and one parameter server in each pod, we can use this "pod ID" as the trainer ID. A detailed workflow of the entry-point script is as follows: + +1. Query the api server to get pod information, and assign the `trainer_id` by sorting the ip. +1. Copy the training data from EFS persistent volume into container. +1. Parse the `paddle pserver` and `paddle trainer` startup parameters from environment variables, and then start up the processes. +1. Trainer with `train_id` 0 will automatically write results onto EFS volume. + + +## PaddlePaddle on AWS with Kubernetes + +### Choose AWS Service Region +This tutorial requires several AWS services work in the same region. Before we create anything in AWS, please check the following link +https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services/ +Choose a region which has the following services available: EC2, EFS, VPS, CloudFormation, KMS, VPC, S3. +In this tutorial, we use "Oregon(us-west-2)" as example. + +### Create AWS Account and IAM Account Under each AWS account, we can create multiple [IAM](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html) users. This allows us to grant some privileges to each IAM user and to create/operate AWS clusters as an IAM user. @@ -25,17 +75,13 @@ Please be aware that this tutorial needs the following privileges for the user i - AWSKeyManagementServicePowerUser -## PaddlePaddle on AWS - -Here we will show you step by step on how to run PaddlePaddle training on AWS cluster. - - ### Download kube-aws and kubectl #### kube-aws [kube-aws](https://github.com/coreos/kube-aws) is a CLI tool to automate cluster deployment to AWS. - +##### Verify kube-aws integrity +Note: if you are using a non-official release (e.g RC release) kube-aws, you can skip this setp. Import the CoreOS Application Signing Public Key: ``` @@ -60,7 +106,7 @@ PLATFORM=darwin-amd64 gpg2 --verify kube-aws-${PLATFORM}.tar.gz.sig kube-aws-${PLATFORM}.tar.gz ``` - +##### Install kube-aws Extract the binary: ``` @@ -103,7 +149,6 @@ And then configure your AWS account information: ``` aws configure - ``` @@ -113,7 +158,7 @@ Fill in the required fields: ``` AWS Access Key ID: YOUR_ACCESS_KEY_ID AWS Secrete Access Key: YOUR_SECRETE_ACCESS_KEY -Default region name: us-west-1 +Default region name: us-west-2 Default output format: json ``` @@ -131,25 +176,28 @@ aws ec2 describe-instances The keypair that will authenticate SSH access to your EC2 instances. The public half of this key pair will be configured on each CoreOS node. -Follow [EC2 Keypair docs](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html) to create a EC2 key pair +Follow [EC2 Keypair User Guide](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html) to create a EC2 key pair After creating a key pair, you will use the key pair name to configure the cluster. -Key pairs are only available to EC2 instances in the same region. We are using us-west-1 in our tutorial, so make sure to creat key pairs in that region (N. California). +Key pairs are only available to EC2 instances in the same region. We are using us-west-2 in our tutorial, so make sure to creat key pairs in that region (Oregon). + +Your browser will download a `key-name.pem` file which is the key to access the EC2 instances. We will use it later. + #### KMS key Amazon KMS keys are used to encrypt and decrypt cluster TLS assets. If you already have a KMS Key that you would like to use, you can skip creating a new key and provide the Arn string for your existing key. -You can create a KMS key in the AWS console, or with the aws command line tool: +You can create a KMS key with the aws command line tool: ``` -aws kms --region=us-west-1 create-key --description="kube-aws assets" +aws kms --region=us-west-2 create-key --description="kube-aws assets" { "KeyMetadata": { "CreationDate": 1458235139.724, "KeyState": "Enabled", - "Arn": "arn:aws:kms:us-west-1:aaaaaaaaaaaaa:key/xxxxxxxxxxxxxxxxxxx", + "Arn": "arn:aws:kms:us-west-2:aaaaaaaaaaaaa:key/xxxxxxxxxxxxxxxxxxx", "AWSAccountId": "xxxxxxxxxxxxx", "Enabled": true, "KeyUsage": "ENCRYPT_DECRYPT", @@ -161,14 +209,14 @@ aws kms --region=us-west-1 create-key --description="kube-aws assets" We will need to use the value of `Arn` later. -And then you need to add several inline policies in your user permission. +And then let's add several inline policies in your IAM user permission. -Go to IAM user page, click on `Add inline policy` button, and then select `Custom Policy` +Go to [IAM Console](https://console.aws.amazon.com/iam/home?region=us-west-2#/home). Click on button `Users`, click user that we just created, and then click on `Add inline policy` button, and select `Custom Policy`. -paste into following inline policies: +Paste into following inline policies: ``` -{ + (Caution: node_0, node_1, node_2 directories represents PaddlePaddle node and train_id, not the Kubernetes node){ "Version": "2012-10-17", "Statement": [ { @@ -195,39 +243,40 @@ paste into following inline policies: "cloudformation:DescribeStackEvents" ], "Resource": [ - "arn:aws:cloudformation:us-west-1:AWS_ACCOUNT_ID:stack/MY_CLUSTER_NAME/*" + "arn:aws:cloudformation:us-west-2:AWS_ACCOUNT_ID:stack/MY_CLUSTER_NAME/*" ] } ] } ``` - +`Version` : Its value has to be exactly "2012-10-17". `AWS_ACCOUNT_ID`: You can get it from following command line: ``` aws sts get-caller-identity --output text --query Account ``` -`MY_CLUSTER_NAME`: Pick a MY_CLUSTER_NAME that you like, you will use it later as well. +`MY_CLUSTER_NAME`: Pick a MY_CLUSTER_NAME that you like, you will use it later as well. +Please note, stack name must satisfy regular expression pattern: [a-zA-Z][-a-zA-Z0-9*]*, which means no "_" or "-" in stack name, or kube-aws will throw error in later steps. #### External DNS name When the cluster is created, the controller will expose the TLS-secured API on a DNS name. -The A record of that DNS name needs to be point to the cluster ip address. +DNS name should have a CNAME points to cluster DNS name or an A record points to the cluster IP address. -We will need to use DNS name later in tutorial. If you don't already own one, you can choose any DNS name (e.g., `paddle`) and modify `/etc/hosts` to associate cluster ip with that DNS name. +We will need to use DNS name later in tutorial. If you don't already own one, you can choose any DNS name (e.g., `paddle`) and modify `/etc/hosts` to associate cluster IP with that DNS name for your local machine. And add name service (route53) in aws to associate the IP to paddle for cluster. We will find the cluster IP in later steps. #### S3 bucket You need to create an S3 bucket before startup the Kubernetes cluster. -There are some bugs in aws cli in creating S3 bucket, so let's use the [Web console](https://console.aws.amazon.com/s3/home?region=us-west-1). +There are some bugs in aws cli in creating S3 bucket, so let's use the [S3 Console](https://console.aws.amazon.com/s3/home?region=us-west-2). -Click on `Create Bucket`, fill in a unique BUCKET_NAME, and make sure region is us-west-1 (Northern California). +Click on `Create Bucket`, fill in a unique BUCKET_NAME, and make sure region is us-west-2 (Oregon). -#### Initialize an asset directory +#### Initialize Assets Create a directory on your local machine to hold the generated assets: @@ -242,10 +291,10 @@ Initialize the cluster CloudFormation stack with the KMS Arn, key pair name, and kube-aws init \ --cluster-name=MY_CLUSTER_NAME \ --external-dns-name=MY_EXTERNAL_DNS_NAME \ ---region=us-west-1 \ ---availability-zone=us-west-1a \ +--region=us-west-2 \ +--availability-zone=us-west-2a \ --key-name=KEY_PAIR_NAME \ ---kms-key-arn="arn:aws:kms:us-west-1:xxxxxxxxxx:key/xxxxxxxxxxxxxxxxxxx" +--kms-key-arn="arn:aws:kms:us-west-2:xxxxxxxxxx:key/xxxxxxxxxxxxxxxxxxx" ``` `MY_CLUSTER_NAME`: the one you picked in [KMS key](#kms-key) @@ -256,14 +305,15 @@ kube-aws init \ `--kms-key-arn`: the "Arn" in [KMS key](#kms-key) -Here `us-west-1a` is used for parameter `--availability-zone`, but supported availability zone varies among AWS accounts. +Here `us-west-2a` is used for parameter `--availability-zone`, but supported availability zone varies among AWS accounts. -Please check if `us-west-1a` is supported by `aws ec2 --region us-west-1 describe-availability-zones`, if not switch to other supported availability zone. (e.g., `us-west-1a`, or `us-west-1b`) +Please check if `us-west-2a` is supported by `aws ec2 --region us-west-2 describe-availability-zones`, if not switch to other supported availability zone. (e.g., `us-west-2a`, or `us-west-2b`) -Note: please don't use `us-west-1c`. Subnets can currently only be created in the following availability zones: us-west-1b, us-west-1a. There will now be a cluster.yaml file in the asset directory. This is the main configuration file for your cluster. +By default `kube-aws` will only create one worker node. Let's edit `cluster.yaml` and change `workerCount` from 1 to 3. + #### Render contents of the asset directory @@ -278,41 +328,14 @@ The next command generates the default set of cluster assets in your asset direc ``` kube-aws render stack ``` - -Here's what the directory structure looks like: - -``` -$ tree -. -├── cluster.yaml -├── credentials -│ ├── admin-key.pem -│ ├── admin.pem -│ ├── apiserver-key.pem -│ ├── apiserver.pem -│ ├── ca-key.pem -│ ├── ca.pem -│ ├── worker-key.pem -│ └── worker.pem -│ ├── etcd-key.pem -│ └── etcd.pem -│ ├── etcd-client-key.pem -│ └── etcd-client.pem -├── kubeconfig -├── stack-template.json -└── userdata - ├── cloud-config-controller - └── cloud-config-worker -``` - -These assets (templates and credentials) are used to create, update and interact with your Kubernetes cluster. +Assets (templates and credentials) that are used to create, update and interact with your Kubernetes cluster will be created under your current folder. ### Kubernetes Cluster Start Up #### Create the instances defined in the CloudFormation template -Now let's create your cluster (choose any PREFIX for the command below): +Now let's create your cluster (choose any `PREFIX` for the command below): ``` kube-aws up --s3-uri s3://BUCKET_NAME/PREFIX @@ -328,239 +351,178 @@ You can invoke `kube-aws status` to get the cluster API endpoint after cluster c ``` $ kube-aws status Cluster Name: paddle-cluster -Controller DNS Name: paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-1.elb.amazonaws.com +Controller DNS Name: paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-2.elb.amazonaws.com ``` +If you own a DNS name, set the A record to any of the above ip. __Or__ you can set up CNAME point to `Controller DNS Name` (`paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-2.elb.amazonaws.com`) + +##### Find IP address + Use command `dig` to check the load balancer hostname to get the ip address. ``` -$ dig paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-1.elb.amazonaws.com +$ dig paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-2.elb.amazonaws.com ;; QUESTION SECTION: -;paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-1.elb.amazonaws.com. IN A +;paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-2.elb.amazonaws.com. IN A ;; ANSWER SECTION: -paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-1.elb.amazonaws.com. 59 IN A 54.241.164.52 -paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-1.elb.amazonaws.com. 59 IN A 54.67.102.112 +paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-2.elb.amazonaws.com. 59 IN A 54.241.164.52 +paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-2.elb.amazonaws.com. 59 IN A 54.67.102.112 ``` In the above output, both ip `54.241.164.52`, `54.67.102.112` will work. -If you own a DNS name, set the A record to any of the above ip. Otherwise you can edit `/etc/hosts` to associate ip with the DNS name. +*If you own a DNS name*, set the A record to any of the above ip. Then you can skip to the step "Access the cluster". + +*If you do not own a DNS name*: +##### Update local DNS association +Edit `/etc/hosts` to associate above ip with the DNS name. +##### Add Route53 private name service in VPC + - Open [Route53 Console](https://console.aws.amazon.com/route53/home) + - Create hosted zone with following config + - Domain name: "paddle" + - Type: "Private hosted zone for amazon VPC" + - VPC ID: + - Add A record + - Click on the zone "paddle" just created + - Click the button "Create record set" + - Name : leave blank + - type: "A" + - Value: + - Verify name service + - Connect to any instance created by kube-aws via ssh + - Run command "host paddle", see if the ip returned is the private ip of kube-controller #### Access the cluster Once the API server is running, you should see: ``` -$ kubectl --kubeconfig=kubeconfig get nodes -NAME STATUS AGE -ip-10-0-0-xxx.us-west-1.compute.internal Ready 5m -ip-10-0-0-xxx.us-west-1.compute.internal Ready 5m -ip-10-0-0-xx.us-west-1.compute.internal Ready,SchedulingDisabled 5m +$ kubectl --kubeconfig=kubeconfig get nodes +NAME STATUS AGE +ip-10-0-0-134.us-west-2.compute.internal Ready 6m +ip-10-0-0-238.us-west-2.compute.internal Ready 6m +ip-10-0-0-50.us-west-2.compute.internal Ready 6m +ip-10-0-0-55.us-west-2.compute.internal Ready 6m ``` ### Setup Elastic File System for Cluster -Training data is usually served on a distributed filesystem, we use Elastic File System (EFS) on AWS. Ceph might be a better solution, but it requires high version of Linux kernel that might not be stable enough at this moment. We haven't automated the EFS setup at this moment, so please do the following steps: +Training data is usually served on a distributed filesystem, we use Elastic File System (EFS) on AWS. +1. Create security group for EFS in [security group console](https://us-west-2.console.aws.amazon.com/ec2/v2/home?region=us-west-2#SecurityGroups:sort=groupId) + 1. Look up security group id for `paddle-cluster-sg-worker` (`sg-055ee37d` in the image below) +
![](src/worker_security_group.png)
+ 2. Add security group `paddle-efs` with `ALL TCP` inbound rule and custom source as group id of `paddle-cluster-sg-worker`. And VPC of `paddle-cluster-vpc`. Make sure availability zone is same as the one you used in [Initialize Assets](#initialize-assets). +
![](src/add_security_group.png)
-1. Make sure you added AmazonElasticFileSystemFullAccess policy in your group. - -1. Create the Elastic File System in AWS console, and attach the new VPC with it. +2. Create the Elastic File System in [EFS console](https://us-west-2.console.aws.amazon.com/efs/home?region=us-west-2#/wizard/1) with `paddle-cluster-vpc` VPC. Make sure subnet is `paddle-cluster-Subnet0` andd security group is `paddle-efs`.
![](src/create_efs.png)
-1. Modify the Kubernetes security group under ec2/Security Groups, add additional inbound policy "All TCP TCP 0 - 65535 0.0.0.0/0" for Kubernetes default VPC security group. -
![](src/add_security_group.png)
- - -1. Follow the EC2 mount instruction to mount the disk onto all the Kubernetes nodes, we recommend to mount EFS disk onto ~/efs. -
![](src/efs_mount.png)
- - -We will place user config and divided training data onto EFS. Training task will cache related files by copying them from EFS into container. It will also write the training results back onto EFS. We will show you how to place the data later in this article. - - - -### Core Concepts of PaddlePaddle Training on AWS - -Now we've already setup a 3 nodes distributed Kubernetes cluster, and on each node we've attached the EFS volume. In this training demo, we will create three Kubernetes pods and schedule them on three nodes. Each pod contains a PaddlePaddle container. When container gets created, it will start parameter server (pserver) and trainer process, load the training data from EFS volume and start the distributed training task. - -#### Distributed Training Job - -A distributed training job is represented by a [kubernetes job](https://kubernetes.io/docs/user-guide/jobs/#what-is-a-job). - -Each Kuberentes job is described by a job config file, which specifies the information like the number of pods in the job and environment variables. - -In a distributed training job, we would: - -1. upload the partitioned training data and configuration file onto EFS volume, and -1. create and submit the Kubernetes job config to the Kubernetes cluster to start the training job. - -#### Parameter Servers and Trainers - -There are two roles in a PaddlePaddle cluster: `parameter server` and `trainer`. Each parameter server process maintains a shard of the global model. Each trainer has its local copy of the model, and uses its local data to update the model. During the training process, trainers send model updates to parameter servers, parameter servers are responsible for aggregating these updates, so that trainers can synchronize their local copy with the global model. - -
![Model is partitioned into two shards. Managed by two parameter servers respectively.](src/pserver_and_trainer.png)
- -In order to communicate with pserver, trainer needs to know the ip address of each pserver. In kubernetes it's better to use a service discovery mechanism (e.g., DNS hostname) rather than static ip address, since any pserver's pod may be killed and a new pod could be schduled onto another node of different ip address. We will improve paddlepaddle's service discovery ability. For now we will use static ip. - -Parameter server and trainer are packaged into a same docker image. They will run once pod is scheduled by kubernetes job. - -#### Trainer ID - -Each trainer process requires a trainer ID, a zero-based index value, passed in as a command-line parameter. The trainer process thus reads the data partition indexed by this ID. - -#### Training - -The entry-point of a container is a Python script. As it runs in a pod, it can see some environment variables pre-defined by Kubernetes. This includes one that gives the job's identity, which can be used in a remote call to the Kubernetes apiserver that lists all pods in the job. - -We rank each pod by sorting them by their ips. The rank of each pod could be the "pod ID". Because we run one trainer and one parameter server in each pod, we can use this "pod ID" as the trainer ID. A detailed workflow of the entry-point script is as follows: - -1. Query the api server to get pod information, and assign the `trainer_id` by sorting the ip. -1. Copy the training data from EFS sharing volume into container. -1. Parse the `paddle pserver` and `paddle trainer` startup parameters from environment variables, and then start up the processes. -1. Trainer with `train_id` 0 will automatically write results onto EFS volume. - - ### Start PaddlePaddle Training Demo on AWS -Now we'll start a PaddlePaddle training demo on AWS, steps are as follows: - -1. Build PaddlePaddle Docker image. -1. Divide the training data file and upload it onto the EFS sharing volume. -1. Create the training job config file, and start up the job. -1. Check the result after training. - -#### Build PaddlePaddle Docker Image - -PaddlePaddle docker image need to provide the runtime environment for `pserver` and `trainer`, so the container use this image should have two main function: +#### Configure Kubernetes Volume that Points to EFS -1. Copy the training data into container. -1. Generate the startup parameter for `pserver` and `trainer` process, and startup the training. - - -We need to create a new image since official `paddledev/paddle:cpu-latest` only have PaddlePaddle binary, but lack of the above functionalities. - -Dockerfile for creating the new image is as follows: +First we need to create a [PersistentVolume](https://kubernetes.io/docs/user-guide/persistent-volumes/) to provision EFS volumn. +Save following snippet as `pv.yaml` ``` -FROM paddledev/paddle:cpu-latest - -MAINTAINER zjsxzong89@gmail.com - -COPY start.sh /root/ -COPY start_paddle.py /root/ -CMD ["bash"," -c","/root/start.sh"] +apiVersion: v1 +kind: PersistentVolume +metadata: + name: efsvol +spec: + capacity: + storage: 100Gi + accessModes: + - ReadWriteMany + nfs: + server: EFS_DNS_NAME + path: "/" ``` -At this point, we will copy our `start.sh` and `start_paddle.py` file into container, and then exec `start_paddle.py` script to start up the training, all the steps like assigning trainer_id, getting other nodes' ip are implemented in `start_paddle.py`. - -`start_paddle.py` will start parsing the parameters. +`EFS_DNS_NAME`: DNS name as shown in description of `paddle-efs` that we created. Looks similar to `fs-2cbf7385.efs.us-west-2.amazonaws.com` +Run following command to create a persistent volumn: ``` -parser = argparse.ArgumentParser(prog="start_paddle.py", - description='simple tool for k8s') - args, train_args_list = parser.parse_known_args() - train_args = refine_unknown_args(train_args_list) - train_args_dict = dict(zip(train_args[:-1:2], train_args[1::2])) - podlist = getPodList() +kubectl --kubeconfig=kubeconfig create -f pv.yaml ``` -And then using function `getPodList()` to query all the pod information from the job name through Kubernetes api server. When all the pods are in the running status, using `getIdMap(podlist)` to get the trainer_id. +Next let's create a [PersistentVolumeClaim](https://kubernetes.io/docs/user-guide/persistent-volumes/) to claim the persistent volume. +Save following snippet as `pvc.yaml`. ``` - podlist = getPodList() - # need to wait until all pods are running - while not isPodAllRunning(podlist): - time.sleep(10) - podlist = getPodList() - idMap = getIdMap(podlist) +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: efsvol +spec: + accessModes: + - ReadWriteMany + resources: + requests: + storage: 50Gi ``` -In function `getIdMap(podlist)`, we use podlist to get the ip address for each pod and sort them, use the index as the trainer_id. - +Run following command to create a persistent volumn claim: ``` -def getIdMap(podlist): - ''' - generate tainer_id by ip - ''' - ips = [] - for pod in podlist["items"]: - ips.append(pod["status"]["podIP"]) - ips.sort() - idMap = {} - for i in range(len(ips)): - idMap[ips[i]] = i - return idMap +kubectl --kubeconfig=kubeconfig create -f pvc.yaml ``` -After getting `idMap`, we use function `startPaddle(idMap, train_args_dict)` to generate `paddle pserver` and `paddle train` start up parameters and then start up the processes. - -In function `startPaddle`, the most important work is to generate `paddle pserver` and `paddle train` start up parameters. For example, `paddle train` parameter parsing, we will get parameters like `PADDLE_NIC`, `PADDLE_PORT`, `PADDLE_PORTS_NUM`, and get the `trainer_id` from `idMap`. - -``` - program = 'paddle train' - args = " --nics=" + PADDLE_NIC - args += " --port=" + str(PADDLE_PORT) - args += " --ports_num=" + str(PADDLE_PORTS_NUM) - args += " --comment=" + "paddle_process_by_paddle" - ip_string = "" - for ip in idMap.keys(): - ip_string += (ip + ",") - ip_string = ip_string.rstrip(",") - args += " --pservers=" + ip_string - args_ext = "" - for key, value in train_args_dict.items(): - args_ext += (' --' + key + '=' + value) - localIP = socket.gethostbyname(socket.gethostname()) - trainerId = idMap[localIP] - args += " " + args_ext + " --trainer_id=" + \ - str(trainerId) + " --save_dir=" + JOB_PATH_OUTPUT -``` +#### Prepare Training Data -Use `docker build` to build toe Docker Image: +We will now launch a kubernetes job that downloads, saves and evenly splits training data into 3 shards on the persistent volumn that we just created. +save following snippet as `paddle-data-job.yaml` ``` -docker build -t your_repo/paddle:mypaddle . +apiVersion: batch/v1 +kind: Job +metadata: + name: paddle-data +spec: + template: + metadata: + name: pi + spec: + containers: + - name: paddle-data + image: paddledev/paddle-tutorial:k8s_data + imagePullPolicy: Always + volumeMounts: + - mountPath: "/efs" + name: efs + env: + - name: OUT_DIR + value: /efs/paddle-cluster-job + - name: SPLIT_COUNT + value: "3" + volumes: + - name: efs + persistentVolumeClaim: + claimName: efsvol + restartPolicy: Never ``` -And then push the built image onto docker registry. - +Run following command to launch the job: ``` -docker push your_repo/paddle:mypaddle +kubectl --kubeconfig=kubeconfig create -f paddle-data-job.yaml ``` -#### Upload Training Data File - -Here we will use PaddlePaddle's official recommendation demo as the content for this training, we put the training data file into a directory named by job name, which located in EFS sharing volume, the tree structure for the directory looks like: - +Job may take 7 min to finish, use following command to check job status. Do not proceed until `SUCCESSFUL` for `paddle-data` job is `1` ``` -efs -└── paddle-cluster-job - ├── data - │ ├── 0 - │ │ - │ ├── 1 - │ │ - │ └── 2 - ├── output - └── recommendation +$ kubectl --kubeconfig=kubeconfig get jobs +NAME DESIRED SUCCESSFUL AGE +paddle-data 1 1 6m ``` -The `paddle-cluster-job` directory is the job name for this training, this training includes 3 PaddlePaddle node, we store the partitioned data under `paddle-cluster-job/data` directory, directory 0, 1, 2 each represent 3 nodes' trainer_id. the training data in in recommendation directory, the training results and logs will be in the output directory. - - -#### Create Kubernetes Job - -Kubernetes use yaml file to describe job details, and then use command line tool to create the job in Kubernetes cluster. - -In yaml file, we describe the Docker image we use for this training, the node number we need to startup, the volume mounting information and all the necessary parameters we need for `paddle pserver` and `paddle train` processes. +Data preparation is done by docker image `paddledev/paddle-tutorial:k8s_data`, see [here](src/k8s_data/README.md) for how to build this docker image and source code. -The yaml file content is as follows: +#### Start Training +Now we are ready to start paddle training job. Save following snippet as `paddle-cluster-job.yaml` ``` apiVersion: batch/v1 kind: Job @@ -574,12 +536,12 @@ spec: name: paddle-cluster-job spec: volumes: - - name: jobpath - hostPath: - path: /home/admin/efs + - name: efs + persistentVolumeClaim: + claimName: efsvol containers: - name: trainer - image: drinkcode/paddle:k8s-job + image: paddledev/paddle-tutorial:k8s_train command: ["bin/bash", "-c", "/root/start.sh"] env: - name: JOB_NAME @@ -589,7 +551,7 @@ spec: - name: JOB_NAMESPACE value: default - name: TRAIN_CONFIG_DIR - value: recommendation + value: quick_start - name: CONF_PADDLE_NIC value: eth0 - name: CONF_PADDLE_PORT @@ -600,106 +562,124 @@ spec: value: "2" - name: CONF_PADDLE_GRADIENT_NUM value: "3" + - name: TRAINER_COUNT + value: "3" volumeMounts: - - name: jobpath - mountPath: /home/jobpath + - mountPath: "/home/jobpath" + name: efs ports: - - name: jobport - hostPort: 30001 - containerPort: 30001 + - name: jobport0 + hostPort: 7164 + containerPort: 7164 + - name: jobport1 + hostPort: 7165 + containerPort: 7165 + - name: jobport2 + hostPort: 7166 + containerPort: 7166 + - name: jobport3 + hostPort: 7167 + containerPort: 7167 restartPolicy: Never - ``` -In yaml file, the metadata's name is the job's name. `parallelism, completions` means this job will simultaneously start up 3 PaddlePaddle nodes, and this job will be finished when there are 3 finished pods. For the data store volume, we declare the path jobpath, it mount the /home/admin/efs on host machine into the container with path /home/jobpath. So in container, the /home/jobpath actually stores the data onto EFS sharing volume. - -`env` field represents container's environment variables, we pass the PaddlePaddle parameters into containers by using the `env` field. +`parallelism: 3, completions: 3` means this job will simultaneously start 3 PaddlePaddle pods, and this job will be finished when there are 3 finished pods. -`JOB_PATH` represents the sharing volume path, `JOB_NAME` represents job name, `TRAIN_CONFIG_DIR` represents the training data file directory, we can these three parameters to get the file path for this training. +`env` field represents container's environment variables, we specify PaddlePaddle parameters by environment variables. -`CONF_PADDLE_NIC` represents `paddle pserver` process's `--nics` parameters, the NIC name. +`ports` indicates that TCP port 7164 - 7167 are exposed for communication between `pserver` ans trainer. port starts continously from `CONF_PADDLE_PORT` (7164) to `CONF_PADDLE_PORT + CONF_PADDLE_PORTS_NUM + CONF_PADDLE_PORTS_NUM_SPARSE - 1` (7167). We use multiple ports for dense and sparse paramter updates to improve latency. -`CONF_PADDLE_PORT` represents `paddle pserver` process's `--port` parameters, `CONF_PADDLE_PORTS_NUM` represents `--port_num` parameter. - -`CONF_PADDLE_PORTS_NUM_SPARSE` represents the sparse updated port number, `--ports_num_for_sparse` parameter. +Run following command to launch the job. +``` +kubectl --kubeconfig=kubeconfig create -f paddle-claster-job.yaml +``` -`CONF_PADDLE_GRADIENT_NUM` represents the training node number, `--num_gradient_servers` parameter. +Inspect individual pods -After we create the yaml file, we can use Kubernetes command line tool to create the job onto the cluster. +``` +$ kubectl --kubeconfig=kubeconfig get pods +NAME READY STATUS RESTARTS AGE +paddle-cluster-job-cm469 1/1 Running 0 9m +paddle-cluster-job-fnt03 1/1 Running 0 9m +paddle-cluster-job-jx4xr 1/1 Running 0 9m +``` +Inspect individual console output ``` -kubectl create -f job.yaml +kubectl --kubeconfig=kubeconfig log -f POD_NAME ``` -After we execute the above command, Kubernetes will create 3 pods and then pull the PaddlePaddle image, then start up the containers for training. +`POD_NAME`: name of any pod (e.g., `paddle-cluster-job-cm469`). +Run `kubectl --kubeconfig=kubeconfig describe job paddle-cluster-job` to check training job status. It will complete in around 20 minutes. +The details for start `pserver` and `trainer` are hidden inside docker image `paddledev/paddle-tutorial:k8s_train`, see [here](src/k8s_train/README.md) for how to build the docker image and source code. -#### Check Training Results +#### Inspect Training Output -During the training, we can see the logs and models on EFS sharing volume, the output directory contains the training results. (Caution: node_0, node_1, node_2 directories represents PaddlePaddle node and train_id, not the Kubernetes node) +Training output (model snapshot and logs) will be saved in EFS. We can ssh into worker EC2 instance, mount EFS and check training output. +1. ssh Into Worker EC2 instance ``` -[root@paddle-kubernetes-node0 output]# tree -d -. -├── node_0 -│ ├── server.log -│ └── train.log -├── node_1 -│ ├── server.log -│ └── train.log -├── node_2 -...... -├── pass-00002 -│ ├── done -│ ├── ___embedding_0__.w0 -│ ├── ___embedding_1__.w0 -...... +chmod 400 key-name.pem +ssh -i key-name.pem core@INSTANCE_IP ``` -We can always check the container training status through logs, for example: +`INSTANCE_IP`: public IP address of EC2 kubernetes worker node. Go to [EC2 console](https://us-west-2.console.aws.amazon.com/ec2/v2/home?region=us-west-2#Instances:sort=instanceId) and check `public IP` of any `paddle-cluster-kube-aws-worker` instance. +2. Mount EFS ``` -[root@paddle-kubernetes-node0 node_0]# cat train.log -I1116 09:10:17.123121 50 Util.cpp:155] commandline: - /usr/local/bin/../opt/paddle/bin/paddle_trainer - --nics=eth0 --port=7164 - --ports_num=2 --comment=paddle_process_by_paddle - --pservers=192.168.129.66,192.168.223.143,192.168.129.71 - --ports_num_for_sparse=2 --config=./trainer_config.py - --trainer_count=4 --num_passes=10 --use_gpu=0 - --log_period=50 --dot_period=10 --saving_period=1 - --local=0 --trainer_id=0 - --save_dir=/home/jobpath/paddle-cluster-job/output -I1116 09:10:17.123440 50 Util.cpp:130] Calling runInitFunctions -I1116 09:10:17.123764 50 Util.cpp:143] Call runInitFunctions done. -[WARNING 2016-11-16 09:10:17,227 default_decorators.py:40] please use keyword arguments in paddle config. -[INFO 2016-11-16 09:10:17,239 networks.py:1282] The input order is [movie_id, title, genres, user_id, gender, age, occupation, rating] -[INFO 2016-11-16 09:10:17,239 networks.py:1289] The output order is [__regression_cost_0__] -I1116 09:10:17.392917 50 Trainer.cpp:170] trainer mode: Normal -I1116 09:10:17.613910 50 PyDataProvider2.cpp:257] loading dataprovider dataprovider::process -I1116 09:10:17.680917 50 PyDataProvider2.cpp:257] loading dataprovider dataprovider::process -I1116 09:10:17.681543 50 GradientMachine.cpp:134] Initing parameters.. -I1116 09:10:18.012390 50 GradientMachine.cpp:141] Init parameters done. -I1116 09:10:18.018641 50 ParameterClient2.cpp:122] pserver 0 192.168.129.66:7164 -I1116 09:10:18.018950 50 ParameterClient2.cpp:122] pserver 1 192.168.129.66:7165 -I1116 09:10:18.019069 50 ParameterClient2.cpp:122] pserver 2 192.168.223.143:7164 -I1116 09:10:18.019492 50 ParameterClient2.cpp:122] pserver 3 192.168.223.143:7165 -I1116 09:10:18.019716 50 ParameterClient2.cpp:122] pserver 4 192.168.129.71:7164 -I1116 09:10:18.019836 50 ParameterClient2.cpp:122] pserver 5 192.168.129.71:7165 +mkdir efs +sudo mount -t nfs4 -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2 EFS_DNS_NAME:/ efs ``` -It'll take around 8 hours to finish this PaddlePaddle recommendation training demo on three 2 core 8 GB EC2 machine (m3.large). +`EFS_DNS_NAME`: DNS name as shown in description of `paddle-efs` that we created. Look similar to `fs-2cbf7385.efs.us-west-2.amazonaws.com`. +Now folder `efs` will have structure similar to: +``` +-- paddle-cluster-job + |-- ... + |-- output + | |-- node_0 + | | |-- server.log + | | `-- train.log + | |-- node_1 + | | |-- server.log + | | `-- train.log + | |-- node_2 + | | |-- server.log + | | `-- train.log + | |-- pass-00000 + | | |-- ___fc_layer_0__.w0 + | | |-- ___fc_layer_0__.wbias + | | |-- done + | | |-- path.txt + | | `-- trainer_config.lr.py + | |-- pass-00001... +``` +`server.log` contains log for `pserver`. `train.log` contains log for `trainer`. Model description and snapshot is stored in `pass-0000*`. ### Kubernetes Cluster Tear Down +#### Delete EFS + +Go to [EFS Console](https://us-west-2.console.aws.amazon.com/efs/home?region=us-west-2) and delete the EFS volumn that we created. + +#### Delete security group + +Go to [Security Group Console](https://us-west-2.console.aws.amazon.com/ec2/v2/home?region=us-west-2#SecurityGroups:sort=groupId) and delete security group `paddle-efs`. -If you want to tear down the whole Kubernetes cluster, make sure to *delete* the EFS volume first (otherwise, you will get stucked on following steps), and then use the following command: + +#### Delete S3 Bucket + +Go to [S3 Console](https://console.aws.amazon.com/s3/home?region=us-west-2#) and delete the S3 bucket that we created. + +#### Destroy Cluster ``` kube-aws destroy ``` -It's an async call, it might take 5 min to tear down the whole cluster. -If you created any Kubernetes Services of type LoadBalancer, you must delete these first, as the CloudFormation cannot be fully destroyed if any externally-managed resources still exist. +The command will return immediately, but it might take 5 min to tear down the whole cluster. + +You can go to [CludFormation Console](https://us-west-2.console.aws.amazon.com/cloudformation/home?region=us-west-2#/stacks?filter=active) to check destroy process. diff --git a/doc/howto/usage/k8s/src/add_security_group.png b/doc/howto/usage/k8s/src/add_security_group.png index 50eed4c6573a18d6ae0f9df9bd6a3cae05493e3c..bd34f46c9b0ada7027fd53e553e7d033255d25fc 100644 Binary files a/doc/howto/usage/k8s/src/add_security_group.png and b/doc/howto/usage/k8s/src/add_security_group.png differ diff --git a/doc/howto/usage/k8s/src/create_efs.png b/doc/howto/usage/k8s/src/create_efs.png index f4d448d1518e11a11d535efb9c3a78b56cc13149..e5f1526033d1daf401700989af1d25919bcb7675 100644 Binary files a/doc/howto/usage/k8s/src/create_efs.png and b/doc/howto/usage/k8s/src/create_efs.png differ diff --git a/doc/howto/usage/k8s/src/job.yaml b/doc/howto/usage/k8s/src/job.yaml deleted file mode 100644 index 488aad0bede4f940b25c7be04259f209c3de9f52..0000000000000000000000000000000000000000 --- a/doc/howto/usage/k8s/src/job.yaml +++ /dev/null @@ -1,43 +0,0 @@ -apiVersion: batch/v1 -kind: Job -metadata: - name: paddle-cluster-job -spec: - parallelism: 3 - completions: 3 - template: - metadata: - name: paddle-cluster-job - spec: - volumes: - - name: jobpath - hostPath: - path: /home/work/paddle_output - containers: - - name: trainer - image: registry.baidu.com/public/paddle:mypaddle - command: ["bin/bash", "-c", "/root/start.sh"] - env: - - name: JOB_NAME - value: paddle-cluster-job - - name: JOB_PATH - value: /home/jobpath - - name: JOB_NAMESPACE - value: default - - name: TRAIN_CONFIG_DIR - value: recommendation - - name: CONF_PADDLE_NIC - value: eth0 - - name: CONF_PADDLE_PORT - value: "7164" - - name: CONF_PADDLE_PORTS_NUM - value: "2" - - name: CONF_PADDLE_PORTS_NUM_SPARSE - value: "2" - - name: CONF_PADDLE_GRADIENT_NUM - value: "3" - volumeMounts: - - name: jobpath - mountPath: /home/jobpath - restartPolicy: Never - diff --git a/doc/howto/usage/k8s/src/k8s_data/Dockerfile b/doc/howto/usage/k8s/src/k8s_data/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..6d3a12ae393aa594b8e6e9a5f726109426937284 --- /dev/null +++ b/doc/howto/usage/k8s/src/k8s_data/Dockerfile @@ -0,0 +1,7 @@ +FROM alpine + +RUN apk update && apk upgrade && apk add coreutils +ADD quick_start /quick_start +ADD get_data.sh /bin/ +RUN chmod +x /bin/get_data.sh +ENTRYPOINT ["/bin/get_data.sh"] diff --git a/doc/howto/usage/k8s/src/k8s_data/README.md b/doc/howto/usage/k8s/src/k8s_data/README.md new file mode 100644 index 0000000000000000000000000000000000000000..83cef7affd0ac4d3a1ca08ea5b046fa81e1bc630 --- /dev/null +++ b/doc/howto/usage/k8s/src/k8s_data/README.md @@ -0,0 +1,6 @@ +To build PaddlePaddle data preparation image in tutorial [Distributed PaddlePaddle Training on AWS with Kubernetes](../../k8s_aws_en.md), run following commands: + +``` +cp -r ../../../../../../demo/quick_start . +docker build . -t prepare-data-image-name +``` diff --git a/doc/howto/usage/k8s/src/k8s_data/get_data.sh b/doc/howto/usage/k8s/src/k8s_data/get_data.sh new file mode 100755 index 0000000000000000000000000000000000000000..d187ba5ac8d03f69dfdefd4f63610ed7921575be --- /dev/null +++ b/doc/howto/usage/k8s/src/k8s_data/get_data.sh @@ -0,0 +1,26 @@ +#!/bin/sh + +out_dir=$OUT_DIR +split_count=$SPLIT_COUNT + +set -e + +mkdir -p $out_dir +cp -r /quick_start $out_dir/ + +mkdir -p $out_dir/0/data +cd $out_dir/0/data +wget http://paddlepaddle.bj.bcebos.com/demo/quick_start_preprocessed_data/preprocessed_data.tar.gz +tar zxvf preprocessed_data.tar.gz +rm preprocessed_data.tar.gz + +split -d --number=l/$split_count -a 5 train.txt train. +mv train.00000 train.txt + +cd $out_dir +end=$(expr $split_count - 1) +for i in $(seq 1 $end); do + mkdir -p $i/data + cp -r 0/data/* $i/data + mv $i/data/train.`printf %05d $i` $i/data/train.txt +done; diff --git a/doc/howto/usage/k8s/src/k8s_train/Dockerfile b/doc/howto/usage/k8s/src/k8s_train/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..c0fca1f9a945921e6e8899fee2db8845e66136a1 --- /dev/null +++ b/doc/howto/usage/k8s/src/k8s_train/Dockerfile @@ -0,0 +1,6 @@ +FROM paddledev/paddle:cpu-latest + +COPY start.sh /root/ +COPY start_paddle.py /root/ +RUN chmod +x /root/start.sh +CMD ["bash"," -c","/root/start.sh"] diff --git a/doc/howto/usage/k8s/src/k8s_train/README.md b/doc/howto/usage/k8s/src/k8s_train/README.md new file mode 100644 index 0000000000000000000000000000000000000000..96bf65497ffa23e90c4c9350504f86367b48daf2 --- /dev/null +++ b/doc/howto/usage/k8s/src/k8s_train/README.md @@ -0,0 +1,5 @@ +To build PaddlePaddle training image in tutorial [Distributed PaddlePaddle Training on AWS with Kubernetes](../../k8s_aws_en.md), run following command: + +``` +docker build . -t train-image-name +``` diff --git a/doc/howto/usage/k8s/src/start.sh b/doc/howto/usage/k8s/src/k8s_train/start.sh similarity index 55% rename from doc/howto/usage/k8s/src/start.sh rename to doc/howto/usage/k8s/src/k8s_train/start.sh index b3a1334174a20b018d35de3b01b149fc5b10d49d..12dfe1e6386885a6989d3887f21c6922f137a9ae 100755 --- a/doc/howto/usage/k8s/src/start.sh +++ b/doc/howto/usage/k8s/src/k8s_train/start.sh @@ -1,19 +1,19 @@ #!/bin/sh + set -eu jobconfig=${JOB_PATH}"/"${JOB_NAME}"/"${TRAIN_CONFIG_DIR} cd /root -cp -rf $jobconfig . -cd $TRAIN_CONFIG_DIR - +cp -rf $jobconfig/* . python /root/start_paddle.py \ --dot_period=10 \ - --ports_num_for_sparse=$CONF_PADDLE_PORTS_NUM \ + --ports_num=$CONF_PADDLE_PORTS_NUM \ + --ports_num_for_sparse=$CONF_PADDLE_PORTS_NUM_SPARSE \ --log_period=50 \ --num_passes=10 \ - --trainer_count=4 \ + --trainer_count=$TRAINER_COUNT \ --saving_period=1 \ --local=0 \ - --config=./trainer_config.py \ + --config=trainer_config.lr.py \ --use_gpu=0 diff --git a/doc/howto/usage/k8s/src/start_paddle.py b/doc/howto/usage/k8s/src/k8s_train/start_paddle.py similarity index 84% rename from doc/howto/usage/k8s/src/start_paddle.py rename to doc/howto/usage/k8s/src/k8s_train/start_paddle.py index df00d82919faa2acecc79c28e3d773ba3de9672a..f1a770ccb54fbd7d4c3cf6bf134d00d7bf5961ca 100755 --- a/doc/howto/usage/k8s/src/start_paddle.py +++ b/doc/howto/usage/k8s/src/k8s_train/start_paddle.py @@ -23,7 +23,6 @@ import argparse API = "/api/v1/namespaces/" JOBSELECTOR = "labelSelector=job-name=" JOB_PATH = os.getenv("JOB_PATH") + "/" + os.getenv("JOB_NAME") -JOB_PATH_DATA = JOB_PATH + "/data" JOB_PATH_OUTPUT = JOB_PATH + "/output" JOBNAME = os.getenv("JOB_NAME") NAMESPACE = os.getenv("JOB_NAMESPACE") @@ -33,6 +32,8 @@ PADDLE_PORTS_NUM = os.getenv("CONF_PADDLE_PORTS_NUM") PADDLE_PORTS_NUM_SPARSE = os.getenv("CONF_PADDLE_PORTS_NUM_SPARSE") PADDLE_SERVER_NUM = os.getenv("CONF_PADDLE_GRADIENT_NUM") +tokenpath = '/var/run/secrets/kubernetes.io/serviceaccount/token' + def refine_unknown_args(cmd_args): ''' @@ -64,6 +65,7 @@ def isPodAllRunning(podlist): for pod in podlist["items"]: if pod["status"]["phase"] == "Running": running += 1 + print "waiting for pods running, require:", require, "running:", running if require == running: return True return False @@ -79,8 +81,17 @@ def getPodList(): pod = API + NAMESPACE + "/pods?" job = JOBNAME - return requests.get(apiserver + pod + JOBSELECTOR + job, - verify=False).json() + if os.path.isfile(tokenpath): + tokenfile = open(tokenpath, mode='r') + token = tokenfile.read() + Bearer = "Bearer " + token + headers = {"Authorization": Bearer} + return requests.get(apiserver + pod + JOBSELECTOR + job, + headers=headers, + verify=False).json() + else: + return requests.get(apiserver + pod + JOBSELECTOR + job, + verify=False).json() def getIdMap(podlist): @@ -122,8 +133,8 @@ def startPaddle(idMap={}, train_args_dict=None): if not os.path.exists(JOB_PATH_OUTPUT): os.makedirs(JOB_PATH_OUTPUT) os.mkdir(logDir) - copyCommand = 'cp -rf ' + JOB_PATH_DATA + \ - "/" + str(trainerId) + " ./data" + copyCommand = 'cp -rf ' + JOB_PATH + \ + "/" + str(trainerId) + "/data/*" + " ./data/" os.system(copyCommand) startPserver = 'nohup paddle pserver' + \ " --port=" + str(PADDLE_PORT) + \ @@ -136,9 +147,9 @@ def startPaddle(idMap={}, train_args_dict=None): print startPserver os.system(startPserver) # wait until pservers completely start - time.sleep(10) - startTrainer = program + args + " > " + \ - logDir + "/train.log 2>&1 < /dev/null" + time.sleep(20) + startTrainer = program + args + " 2>&1 | tee " + \ + logDir + "/train.log" print startTrainer os.system(startTrainer) @@ -152,7 +163,7 @@ if __name__ == '__main__': podlist = getPodList() # need to wait until all pods are running while not isPodAllRunning(podlist): - time.sleep(10) + time.sleep(20) podlist = getPodList() idMap = getIdMap(podlist) startPaddle(idMap, train_args_dict) diff --git a/doc/howto/usage/k8s/src/worker_security_group.png b/doc/howto/usage/k8s/src/worker_security_group.png new file mode 100644 index 0000000000000000000000000000000000000000..57eb0265a34ad4223b69600d2a3dd355482e0bf5 Binary files /dev/null and b/doc/howto/usage/k8s/src/worker_security_group.png differ diff --git a/paddle/function/BufferArg.cpp b/paddle/function/BufferArg.cpp index 5d595deb12c6c8ea419dd1f31b3c131a2f6a587a..2b70036e3ff7de9e8786bade03e220a4916db4c2 100644 --- a/paddle/function/BufferArg.cpp +++ b/paddle/function/BufferArg.cpp @@ -32,14 +32,20 @@ const SparseMatrixArg& BufferArg::sparse() const { SparseMatrixArg::SparseMatrixArg(const CpuSparseMatrix& sparse, ArgType argType) : BufferArg(sparse, argType), row_(reinterpret_cast(sparse.getRows()), VALUE_TYPE_INT32), - col_(reinterpret_cast(sparse.getCols()), VALUE_TYPE_INT32) { + col_(reinterpret_cast(sparse.getCols()), VALUE_TYPE_INT32), + nnz_(sparse.getElementCnt()), + format_(static_cast(sparse.getFormat())), + type_(static_cast(sparse.getValueType())) { bufferType_ = TENSOR_SPARSE; } SparseMatrixArg::SparseMatrixArg(const GpuSparseMatrix& sparse, ArgType argType) : BufferArg(sparse, argType), row_(reinterpret_cast(sparse.getRows()), VALUE_TYPE_INT32), - col_(reinterpret_cast(sparse.getCols()), VALUE_TYPE_INT32) { + col_(reinterpret_cast(sparse.getCols()), VALUE_TYPE_INT32), + nnz_(sparse.getElementCnt()), + format_(static_cast(sparse.getFormat())), + type_(static_cast(sparse.getValueType())) { bufferType_ = TENSOR_SPARSE; } diff --git a/paddle/function/BufferArg.h b/paddle/function/BufferArg.h index 84209265ce7634121e3e4dde609cd787093c45ec..177d20005b54b8f259f257a01e7aba7dda3e206a 100644 --- a/paddle/function/BufferArg.h +++ b/paddle/function/BufferArg.h @@ -30,13 +30,6 @@ enum BufferType { TENSOR_SPARSE = 4 }; -enum SparseDataType { - SPARSE_NO_VALUE = 0, // do not need value pointer, all values are 1 - SPARSE_FLOAT_VALUE = 1 -}; - -enum SparseDataFormat { SPARSE_CSR_FORMAT = 0, SPARSE_CSC_FORMAT = 1 }; - class BufferArg; class SequenceArg; class SparseMatrixArg; @@ -79,19 +72,21 @@ public: BufferArg(ValueType valueType, const TensorShape& shape, ArgType argType = UNSPECIFIED) - : buf_(nullptr), - valueType_(valueType), - shape_(shape), - argType_(argType) {} + : buf_(nullptr), valueType_(valueType), shape_(shape), argType_(argType) { + bufferType_ = TENSOR_NORMAL; + } BufferArg(void* buf, ValueType valueType, const TensorShape& shape, ArgType argType = UNSPECIFIED) - : buf_(buf), valueType_(valueType), shape_(shape), argType_(argType) {} + : buf_(buf), valueType_(valueType), shape_(shape), argType_(argType) { + bufferType_ = TENSOR_NORMAL; + } - BufferArg(void* buf, ValueType valueType) - : buf_(buf), valueType_(valueType) {} + BufferArg(void* buf, ValueType valueType) : buf_(buf), valueType_(valueType) { + bufferType_ = TENSOR_NORMAL; + } BufferArg(const Matrix& matrix, ArgType argType = UNSPECIFIED) : buf_( @@ -167,8 +162,9 @@ public: ValueType valueType() const { return valueType_; } BufferType bufferType() const { return bufferType_; } const TensorShape& shape() const { return shape_; } - bool isSparse() const { return (TENSOR_SPARSE == bufferType_); } + bool isSparseArg() const { return TENSOR_SPARSE == bufferType_; } bool isSequenceArg() const { return TENSOR_SEQUENCE_DATA == bufferType_; } + virtual size_t numElements() const { return shape_.getElements(); } const SequenceArg& sequence() const; const SparseMatrixArg& sparse() const; @@ -179,6 +175,7 @@ protected: TensorShape shape_; BufferType bufferType_{TENSOR_UNKNOWN}; ArgType argType_{UNSPECIFIED}; + // TODO(tianbing), add deviceType_ // leading dimensions. The size is dims_.size() // Dims lds_; }; @@ -191,6 +188,7 @@ class SequenceIdArg : public BufferArg { public: SequenceIdArg(const TensorShape& shape, ArgType argType = UNSPECIFIED) : BufferArg(VALUE_TYPE_INT32, shape, argType) { + bufferType_ = TENSOR_SEQUENCE_ID; CHECK_EQ(shape_.ndims(), (size_t)1); CHECK_GT(shape_[0], 1); numSeqs_ = shape_[0] - 1; @@ -228,7 +226,9 @@ public: SequenceArg(ValueType valueType, const TensorShape& shape, ArgType argType = UNSPECIFIED) - : BufferArg(valueType, shape, argType), startPositions_(TensorShape()) {} + : BufferArg(valueType, shape, argType), startPositions_(TensorShape()) { + bufferType_ = TENSOR_SEQUENCE_DATA; + } SequenceArg(void* buf, ValueType valueType, @@ -269,31 +269,75 @@ public: const BufferArg& row, const BufferArg& col, size_t nnz, - SparseDataFormat format, - SparseDataType type, + SparseFormat format, + SparseValueType type, ArgType argType = UNSPECIFIED) : BufferArg(buf, valueType, shape, argType), row_(row), col_(col), nnz_(nnz), - format_(format), - type_(type) { + format_(static_cast(format)), + type_(static_cast(type)) { bufferType_ = TENSOR_SPARSE; CHECK((valueType == VALUE_TYPE_FLOAT) || (valueType == VALUE_TYPE_DOUBLE)); CHECK_EQ(shape_.ndims(), (size_t)2); CHECK_EQ(row_.shape().ndims(), (size_t)1); CHECK_EQ(col_.shape().ndims(), (size_t)1); - if (format == SPARSE_CSR_FORMAT) { + if (format_ == T_SPARSE_CSR) { CHECK_EQ(nnz, col.shape()[0]); - } else if (format == SPARSE_CSC_FORMAT) { + } else if (format_ == T_SPARSE_CSC) { CHECK_EQ(nnz, row.shape()[0]); } } + SparseMatrixArg(ValueType valueType, + const TensorShape& shape, + size_t nnz, + SparseFormat format, + SparseValueType type, + ArgType argType = UNSPECIFIED) + : BufferArg(valueType, shape, argType), + row_(BufferArg(nullptr, VALUE_TYPE_INT32)), + col_(BufferArg(nullptr, VALUE_TYPE_INT32)), + nnz_(nnz), + format_(static_cast(format)), + type_(static_cast(type)) { + bufferType_ = TENSOR_SPARSE; + CHECK((valueType == VALUE_TYPE_FLOAT) || (valueType == VALUE_TYPE_DOUBLE)); + CHECK_EQ(shape_.ndims(), (size_t)2); + + /// len of row_ : height + 1 (CSR) or nnz (CSC), buf_ == nullptr + row_ = (format_ == T_SPARSE_CSR + ? BufferArg(VALUE_TYPE_INT32, TensorShape{shape_[0] + 1}) + : BufferArg(VALUE_TYPE_INT32, TensorShape{nnz})); + /// len of col_ : width + 1 (CSC) or nnz (CSR), buf_ == nullptr + col_ = (format_ == T_SPARSE_CSR + ? BufferArg(VALUE_TYPE_INT32, TensorShape{nnz}) + : BufferArg(VALUE_TYPE_INT32, TensorShape{shape_[1] + 1})); + } + SparseMatrixArg(const CpuSparseMatrix& sparse, ArgType argType = UNSPECIFIED); SparseMatrixArg(const GpuSparseMatrix& sparse, ArgType argType = UNSPECIFIED); + template + typename Tensor::SparseMatrix SparseMatrix() const { + CHECK(buf_); + CHECK(valueType_ == DataType::value); + // CHECK(deviceType_ == DType); + CHECK_EQ(2, shape_.ndims()); + return typename Tensor::SparseMatrix( + reinterpret_cast(buf_), + reinterpret_cast(row_.data()), + reinterpret_cast(col_.data()), + shape_[0], + shape_[1], + nnz_, + static_cast(type_), + static_cast(format_), + false); + } + ~SparseMatrixArg() {} void* getRowBuf() const { return row_.data(); } @@ -302,6 +346,8 @@ public: size_t nnz() const { return nnz_; } + size_t numElements() const override { return nnz_; } + SparseDataFormat dataFormat() const { return format_; } SparseDataType dataType() const { return type_; } diff --git a/paddle/function/CMakeLists.txt b/paddle/function/CMakeLists.txt index 6d20868072c3acaab2c5f9381bad5ea99d841d26..fae3b7b20a70b56dc44ea2df637281afe01a7e5a 100644 --- a/paddle/function/CMakeLists.txt +++ b/paddle/function/CMakeLists.txt @@ -26,6 +26,7 @@ if(WITH_TESTING) add_simple_unittest(FunctionTest) add_simple_unittest(ContextProjectionOpTest) add_simple_unittest(PadOpTest) + add_simple_unittest(MulOpTest) endif() endif() diff --git a/paddle/function/CrossMapNormalOp.cpp b/paddle/function/CrossMapNormalOp.cpp index 5c0bdd933b1e4a62e49981798c56c70907d16424..ef878bfbba961bdd3d5212e19fb83bb1e285e47f 100644 --- a/paddle/function/CrossMapNormalOp.cpp +++ b/paddle/function/CrossMapNormalOp.cpp @@ -162,38 +162,64 @@ template class CrossMapNormalFunc : public FunctionBase { public: void init(const FuncConfig& config) override { + // function arguments size_ = config.get("size"); scale_ = config.get("scale"); pow_ = config.get("pow"); + + // number of inputs and outputs + numInputs_ = 1; + numOutputs_ = 2; } void calc(const BufferArgs& inputs, const BufferArgs& outputs) override { - CHECK_EQ((size_t)1, inputs.size()); - CHECK_EQ((size_t)2, outputs.size()); - - CHECK_EQ(inputs[0].shape().ndims(), (size_t)4); - CHECK(inputs[0].shape() == outputs[0].shape()); - CHECK(inputs[0].shape() == outputs[1].shape()); - + check(inputs, outputs); + // ArgType check still on here, + // not sure whether it is better to put inside the check. CHECK_EQ(outputs[0].getArgType(), ASSIGN_TO); CHECK_EQ(outputs[1].getArgType(), ASSIGN_TO); - size_t samples = inputs[0].shape()[0]; - size_t channels = inputs[0].shape()[1]; - size_t height = inputs[0].shape()[2]; - size_t width = inputs[0].shape()[3]; + size_t batchSize = inputs[0].shape()[0]; + size_t maps = inputs[0].shape()[1]; + size_t rows = inputs[0].shape()[2]; + size_t columns = inputs[0].shape()[3]; CrossMapNormal(outputs[0].data(), outputs[1].data(), inputs[0].data(), - samples, - channels, - height, - width, + batchSize, + maps, + rows, + columns, size_, scale_, pow_); } + void check(const BufferArgs& inputs, const BufferArgs& outputs) override { + CHECK_EQ(numInputs_, inputs.size()); + CHECK_EQ(numOutputs_, outputs.size()); + + CHECK_EQ(inputs[0].shape().ndims(), (size_t)4); + CHECK(inputs[0].shape() == outputs[0].shape()); + CHECK(inputs[0].shape() == outputs[1].shape()); + } + + // Only need the shape of the input, can calculate the + // floating-point operation. + size_t ops(const BufferArgs& inputs, const BufferArgs& outputs) override { + CHECK_EQ((size_t)numInputs_, inputs.size()); + size_t batchSize = inputs[0].shape()[0]; + size_t maps = inputs[0].shape()[1]; + size_t rows = inputs[0].shape()[2]; + size_t columns = inputs[0].shape()[3]; + + // number of floating-point operations + // an approximate value + size_t ops = batchSize * maps * rows * columns * (size_ * 2 + 3); + + return ops; + } + private: size_t size_; real scale_; @@ -236,21 +262,18 @@ template class CrossMapNormalGradFunc : public FunctionBase { public: void init(const FuncConfig& config) override { + // function arguments size_ = config.get("size"); scale_ = config.get("scale"); pow_ = config.get("pow"); + + // number of inputs and outputs + numInputs_ = 4; + numOutputs_ = 1; } void calc(const BufferArgs& inputs, const BufferArgs& outputs) override { - CHECK_EQ((size_t)4, inputs.size()); - CHECK_EQ((size_t)1, outputs.size()); - - CHECK_EQ(inputs[0].shape().ndims(), (size_t)4); - CHECK(inputs[0].shape() == inputs[1].shape()); - CHECK(inputs[0].shape() == inputs[2].shape()); - CHECK(inputs[0].shape() == inputs[3].shape()); - CHECK(inputs[0].shape() == outputs[0].shape()); - + check(inputs, outputs); if (outputs[0].getArgType() != ADD_TO) { // Currently, some algorithm implementations are ASSIGN_TO mode, // if need to support the ADD_TO calculation, need to clear the output. @@ -259,25 +282,52 @@ public: tmp.zero(); } - size_t samples = inputs[0].shape()[0]; - size_t channels = inputs[0].shape()[1]; - size_t height = inputs[0].shape()[2]; - size_t width = inputs[0].shape()[3]; + size_t batchSize = inputs[0].shape()[0]; + size_t maps = inputs[0].shape()[1]; + size_t rows = inputs[0].shape()[2]; + size_t columns = inputs[0].shape()[3]; CrossMapNormalGrad(outputs[0].data(), inputs[0].data(), inputs[1].data(), inputs[2].data(), inputs[3].data(), - samples, - channels, - height, - width, + batchSize, + maps, + rows, + columns, size_, scale_, pow_); } + void check(const BufferArgs& inputs, const BufferArgs& outputs) override { + CHECK_EQ(numInputs_, inputs.size()); + CHECK_EQ(numOutputs_, outputs.size()); + + CHECK_EQ(inputs[0].shape().ndims(), (size_t)4); + CHECK(inputs[0].shape() == inputs[1].shape()); + CHECK(inputs[0].shape() == inputs[2].shape()); + CHECK(inputs[0].shape() == inputs[3].shape()); + CHECK(inputs[0].shape() == outputs[0].shape()); + } + + // Only need the shape of one input, can calculate the + // floating-point operation. + size_t ops(const BufferArgs& inputs, const BufferArgs& outputs) override { + CHECK_LT((size_t)1, inputs.size()); + size_t batchSize = inputs[0].shape()[0]; + size_t maps = inputs[0].shape()[1]; + size_t rows = inputs[0].shape()[2]; + size_t columns = inputs[0].shape()[3]; + + // number of floating-point operations + // an approximate value + size_t ops = batchSize * maps * rows * columns * (size_ * 4 + 2); + + return ops; + } + private: size_t size_; real scale_; diff --git a/paddle/function/Function.h b/paddle/function/Function.h index 9215c137eb8e85a9a03575104d7f89bbce441eba..3bbeb6e525f85bdde9a54c8d60146eaa30a1bb4d 100644 --- a/paddle/function/Function.h +++ b/paddle/function/Function.h @@ -153,7 +153,36 @@ public: virtual void calc(const BufferArgs& inputs, const BufferArgs& outputs) {} + // This member function is used to check whether the BufferType and shape of + // the inputs and outputs arguments of the Function are correct. + // General calc function which will call this check to do arguments check. + // And before the calc called, the caller can also check their own arguments. + virtual void check(const BufferArgs& inputs, const BufferArgs& outputs) {} + + // Calculate the number of floating-point operations of this Function. + // The inputs and outputs arguments do not need to contain the actual data, + // only the shape. + // And some Functions have the same input and output shapes, + // so you may not need to enter the complete number of arguments. + // But entering the full arguments is always correct for this interface. + virtual size_t ops(const BufferArgs& inputs, const BufferArgs& outputs) { + return 0; + } + + int getNumInputs() const { return numInputs_; } + + int getNumOutputs() const { return numOutputs_; } + static ClassRegistrar funcRegistrar_; + +protected: + // numInputs_ and numOutputs_ represents the maximum + // input and output supported by Function. + // Some functions are optimized for input and output, + // so when comparing the number of arguments, for these functions + // inputs.size() <= numInputs_ or outputs.size() <= numOutputs_ + size_t numInputs_; + size_t numOutputs_; }; #define FUNC_NAME(typeName, deviceName) #typeName "-" #deviceName diff --git a/paddle/function/FunctionTest.h b/paddle/function/FunctionTest.h index 24e7a36a43cfa8941535cb778aa1557ec5a0a6f4..00f59f97d4c8c1076abe00866b786615a9801a5d 100644 --- a/paddle/function/FunctionTest.h +++ b/paddle/function/FunctionTest.h @@ -13,7 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "Function.h" -#include "paddle/math/Vector.h" +#include "paddle/math/Matrix.h" +#include "paddle/math/SparseMatrix.h" #include "paddle/math/tests/TensorCheck.h" #include "paddle/testing/TestUtil.h" @@ -69,7 +70,7 @@ public: } // output need only contains shape, do not contains data. - void addOutputs(const BufferArg& output) { + void addOutputs(const BufferArg& output, ArgType argType = ASSIGN_TO) { size_t size = output.shape().getElements() * sizeOfValuType(output.valueType()); cpuMemory_.emplace_back(std::make_shared(size)); @@ -79,12 +80,40 @@ public: std::make_shared(cpuMemory_.back()->getBuf(), output.valueType(), output.shape(), - ASSIGN_TO)); + argType)); gpuOutputs_.emplace_back( std::make_shared(gpuMemory_.back()->getBuf(), output.valueType(), output.shape(), - ASSIGN_TO)); + argType)); + } + + /// add and init output sparse matrix + void addOutputs(const SparseMatrixArg& output, ArgType argType = ASSIGN_TO) { + cpuSparse_ = std::make_shared( + output.shape()[0], + output.shape()[1], + output.nnz(), + static_cast(output.dataType()), + static_cast(output.dataFormat())); + + gpuSparse_ = std::make_shared( + output.shape()[0], + output.shape()[1], + output.nnz(), + static_cast(output.dataType()), + static_cast(output.dataFormat())); + + /// init sparse matrix + hl_stream_t stream(HPPL_STREAM_1); + cpuSparse_->randomizeUniform(); + gpuSparse_->copyFrom(*cpuSparse_, stream); + hl_stream_synchronize(stream); + + cpuOutputs_.emplace_back( + std::make_shared(*cpuSparse_, argType)); + gpuOutputs_.emplace_back( + std::make_shared(*gpuSparse_, argType)); } void addInputs(const SequenceArg& input) { @@ -107,10 +136,36 @@ public: // TODO: need be implemented. } + void addInputs(const SparseMatrixArg& input) { + cpuSparse_ = std::make_shared( + input.shape()[0], + input.shape()[1], + input.nnz(), + static_cast(input.dataType()), + static_cast(input.dataFormat())); + + gpuSparse_ = std::make_shared( + input.shape()[0], + input.shape()[1], + input.nnz(), + static_cast(input.dataType()), + static_cast(input.dataFormat())); + + /// init sparse matrix + hl_stream_t stream(HPPL_STREAM_1); + cpuSparse_->randomizeUniform(); + gpuSparse_->copyFrom(*cpuSparse_, stream); + hl_stream_synchronize(stream); + + cpuInputs_.emplace_back(std::make_shared(*cpuSparse_)); + gpuInputs_.emplace_back(std::make_shared(*gpuSparse_)); + } + void run() { // prepare cpu/gpu arguments initInputs(); + initOutputs(); // function calculate auto callFunction = [](FunctionBase* function, std::vector& inputs, @@ -129,7 +184,7 @@ public: callFunction(cpuFunc_.get(), cpuInputs_, cpuOutputs_); callFunction(gpuFunc_.get(), gpuInputs_, gpuOutputs_); - // check outputs and inouts + // check outputs compareOutputs(); } @@ -140,6 +195,10 @@ public: protected: void initInputs() { for (size_t i = 0; i < cpuInputs_.size(); i++) { + if (cpuInputs_[i]->isSparseArg()) { + continue; /// sparse matrix already init + } + initArg(*cpuInputs_[i]); // TODO: Need a BufferCopy used to copy from one BufferArg to another. @@ -152,14 +211,32 @@ protected: } } + void initOutputs() { + for (size_t i = 0; i < cpuOutputs_.size(); i++) { + if (cpuOutputs_[i]->isSparseArg()) { + continue; /// sparse matrix already init + } + + initArg(*cpuOutputs_[i]); + + // TODO: Need a BufferCopy used to copy from one BufferArg to another. + CpuVector cpuVector(cpuOutputs_[i]->shape().getElements(), + (real*)cpuOutputs_[i]->data()); + GpuVector gpuVector(gpuOutputs_[i]->shape().getElements(), + (real*)gpuOutputs_[i]->data()); + + gpuVector.copyFrom(cpuVector); + } + } + void compareOutputs() { for (size_t i = 0; i < cpuOutputs_.size(); i++) { // TODO, Need a BufferCheck used to compare the two buffers. - auto cpu = cpuOutputs_[i]; - auto gpu = gpuOutputs_[i]; - CpuVector cpuVector(cpu->shape().getElements(), (real*)cpu->data()); - GpuVector gpuVector(cpu->shape().getElements(), (real*)gpu->data()); - + const auto cpu = cpuOutputs_[i]; + const auto gpu = gpuOutputs_[i]; + CHECK_EQ(cpu->numElements(), gpu->numElements()); + CpuVector cpuVector(cpu->numElements(), (real*)cpu->data()); + GpuVector gpuVector(gpu->numElements(), (real*)gpu->data()); autotest::TensorCheckErr(cpuVector, gpuVector); } } @@ -195,6 +272,8 @@ protected: std::vector cpuOutputs_; std::vector gpuInputs_; std::vector gpuOutputs_; + std::shared_ptr cpuSparse_; + std::shared_ptr gpuSparse_; }; } // namespace paddle diff --git a/paddle/function/MulOp.cpp b/paddle/function/MulOp.cpp new file mode 100644 index 0000000000000000000000000000000000000000..91b4b8ed91b6055babcfbab8f7adb2c55e2747d0 --- /dev/null +++ b/paddle/function/MulOp.cpp @@ -0,0 +1,354 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "MulOp.h" +/// todo(tianbing), delete it +#include +#include "paddle/math/MathFunctions.h" +#include "paddle/math/SIMDFunctions.h" +#include "paddle/utils/ThreadLocal.h" + +#ifndef PADDLE_TYPE_DOUBLE +#define GEMM paddle::gemm +#else +#define GEMM paddle::gemm +#endif + +namespace { +inline void vecAddTo(real* a, const real* b, real scaleB, size_t len) { + for (unsigned int i = 0; i < len; ++i) { + a[i] += (1.0 == scaleB) ? b[i] : scaleB * b[i]; + } +} + +inline void colVecAddTo( + real* a, real* b, real c, size_t len, size_t aWidth, size_t bWidth) { + for (unsigned int i = 0; i < len; ++i) { + a[i * aWidth] += (1.0 == c) ? b[i * bWidth] : b[i * bWidth] * c; + } +} +} // namespace + +namespace paddle { +/// sparse matrix (+)= dense matrix * dense matrix +template <> +void MulOp(CpuSparseMatrix& out, + const CpuMatrix& a, + const CpuMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans) { + CHECK_EQ(out.getValueType(), FLOAT_VALUE); + if (scaleT == 0) { + out.zeroMem(); + } + const real* A = a.getData(); + const real* B = b.getData(); + real* C = out.getValue(); + int* rows = out.getRows(); + int* cols = out.getCols(); + size_t width = out.getWidth(); + size_t height = out.getHeight(); + + /// SPARSE_CSC, {a any, b not trans} + if (out.getFormat() == SPARSE_CSC) { + /// b not trans and a any + CHECK(!bTrans); + size_t m = !aTrans ? a.getWidth() : a.getHeight(); + for (size_t i = 0; i < width; i++) { + size_t start = out.getColStartIdx(i); + size_t end = out.getColStartIdx(i + 1); + for (size_t j = start; j < end; j++) { + real sum = 0; + size_t rowIdx = rows[j]; + for (size_t k = 0; k < m; k++) { + sum += (!aTrans ? A[rowIdx * m + k] : A[k * height + rowIdx]) * + B[k * width + i]; + } + C[j] = scaleAB * sum + scaleT * C[j]; + } + } + return; + } + + /// SPARSE_CSR, {a any, b not trans} or {a not trans, b trans} + if (out.getFormat() == SPARSE_CSR) { + /// a and b can not both transpose + CHECK(!(aTrans && bTrans)); + size_t m = a.getWidth(); + for (size_t i = 0; i < height; i++) { + size_t start = out.getRowStartIdx(i); + size_t end = out.getRowStartIdx(i + 1); + for (size_t j = start; j < end; j++) { + real sum = 0; + size_t colIdx = cols[j]; + for (size_t k = 0; k < m; k++) { + sum += (!aTrans ? A[i * m + k] : A[k * height + i]) * + (!bTrans ? B[k * width + colIdx] : B[colIdx * m + k]); + } + C[j] = scaleAB * sum + scaleT * C[j]; + } + } + return; + } +} + +/// dense matrix (+)= dense matrix * dense matrix +template <> +void MulOp(CpuMatrix& out, + const CpuMatrix& a, + const CpuMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans) { + GEMM(aTrans ? CblasTrans : CblasNoTrans, + bTrans ? CblasTrans : CblasNoTrans, + out.getHeight(), + out.getWidth(), + !aTrans ? a.getWidth() : a.getHeight(), + scaleAB, + a.getData(), + a.getStride(), + b.getData(), + b.getStride(), + scaleT, + out.getData(), + out.getStride()); +} + +/// dense matrix (+)= sparse matrix * dense matrix +template <> +void MulOp(CpuMatrix& out, + const CpuSparseMatrix& a, + const CpuMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans) { + if (scaleT == 0) { + out.zeroMem(); + } + const real* B = b.getData(); + real* C = out.getData(); + if (out.getWidth() % 32 == 0) { + CHECK_EQ((size_t)B % 32, 0UL); + CHECK_EQ((size_t)C % 32, 0UL); + } + + int* cols = a.getCols(); + real* values = a.getValue(); + for (size_t i = 0; i < a.getHeight(); ++i) { + const int start = a.getRowStartIdx(i); + const int end = a.getRowStartIdx(i + 1); + for (int j = start; j < end; ++j) { + vecAddTo(!aTrans ? out.getRow(i) : out.getRow(cols[j]), + !aTrans ? const_cast(b).getRow(cols[j]) + : const_cast(b).getRow(i), + (a.getValueType() == FLOAT_VALUE) ? values[j] : (real)1.0, + out.getWidth()); + } + } +} + +/// dense matrix (+)= dense matrix * sparse matrix +template <> +void MulOp(CpuMatrix& out, + const CpuMatrix& a, + const CpuSparseMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans) { + if (scaleT == 0) { + out.zeroMem(); + } + real* A = const_cast(a.getData()); + real* B = const_cast(b.getValue()); + real* C = out.getData(); + int* rows = b.getRows(); + int* cols = b.getCols(); + + /// SPARSE_CSC format + if (b.getFormat() == SPARSE_CSC) { + for (size_t j = 0; j < b.getWidth(); ++j) { + int start = b.getColStartIdx(j); + int end = b.getColStartIdx(j + 1); + for (int i = start; i < end; ++i) { + colVecAddTo(!bTrans ? C + j : C + rows[i], + !bTrans ? A + rows[i] : A + j, + (b.getValueType() == NO_VALUE) ? (real)1.0 : B[i], + out.getHeight(), + out.getWidth(), + a.getWidth()); + } + } + return; + } + + /// SPARSE_CSR format + if (b.getFormat() == SPARSE_CSR) { + for (size_t j = 0; j < b.getHeight(); ++j) { + int start = b.getRowStartIdx(j); + int end = b.getRowStartIdx(j + 1); + for (int i = start; i < end; ++i) { + colVecAddTo(!bTrans ? C + cols[i] : C + j, + !bTrans ? A + j : A + cols[i], + (b.getValueType() == NO_VALUE) ? (real)1.0 : B[i], + out.getHeight(), + out.getWidth(), + a.getWidth()); + } + } + return; + } +} + +/** + * mul operator + * out = scaleT * out + scaleAB * (A * B) + * here, scaleT in {0, 1}, scaleAB == 1, + * out = A * B, ASSIGN_TO + * out += A * B, ADD_TO + * + * + * \param outputs[0] output matrix (out), M * N, + * could be either Sparse or Dense Matrix + * M is num of rows, N is num of columns + * \param inputs[0] first input matrix (A), M * K (if non-trans) + * could be either Sparse or Dense Matrix + * M is num of rows, K is num of columns + * \param inputs[1] second input matrix (B), K * N (if non-trans) + * could be either Sparse or Dense Matrix + * K is num of rows, N is num of columns + * + * Support eight Mul operators, with both GPU and CPU devices + * For each device, four Mul operators are supported: + * 1. dense (out) = dense (A) * dense (B) + * 2. dense (out) = sparse (A) * dense (B) + * sparse matrix only support SPARSE_CSR format + * 3. dense (out) = dense (A) * sparse (B) + * sparse matrix support SPARSE_CSC and SPARSE_CSR formats + * 4. sparse (out) = dense (A) * dense (B) + * sparse matrix support SPARSE_CSC and SPARSE_CSR formats + * + */ +template +class MulFunc : public FunctionBase { +public: + void init(const FuncConfig& config) override { + aTrans_ = config.get("aTrans"); + bTrans_ = config.get("bTrans"); + } + + void calc(const BufferArgs& inputs, const BufferArgs& outputs) override { + CHECK(!aTrans_ || !bTrans_) + << "Not support both a and b are transpose matrices"; + + CHECK_EQ((size_t)2, inputs.size()); + CHECK_EQ((size_t)1, outputs.size()); + CHECK(inputs[0].data() && inputs[1].data() && outputs[0].data()); + CHECK_EQ(inputs[0].shape().ndims(), (size_t)2); + CHECK_EQ(inputs[1].shape().ndims(), (size_t)2); + CHECK_EQ(outputs[0].shape().ndims(), (size_t)2); + + size_t aRow = !aTrans_ ? inputs[0].shape()[0] : inputs[0].shape()[1]; + size_t aCol = !aTrans_ ? inputs[0].shape()[1] : inputs[0].shape()[0]; + size_t bRow = !bTrans_ ? inputs[1].shape()[0] : inputs[1].shape()[1]; + size_t bCol = !bTrans_ ? inputs[1].shape()[1] : inputs[1].shape()[0]; + /// C = A * B, or C += A * B, for matrix format + CHECK_EQ(aCol, bRow); + CHECK_EQ(aRow, outputs[0].shape()[0]); + CHECK_EQ(bCol, outputs[0].shape()[1]); + + /// only support C = A * B (ASSIGN_TO) or C += A * B (ADD_TO) + real scaleT = (outputs[0].getArgType() == ADD_TO) ? 1.0 : 0.0; + + /// support dense = not both sparse * sparse + /// or sparse = dense * dense + CHECK((!outputs[0].isSparseArg() && + !(inputs[0].isSparseArg() && inputs[1].isSparseArg())) || + (outputs[0].isSparseArg() && !inputs[0].isSparseArg() && + !inputs[1].isSparseArg())); + + auto outMat = outputs[0].matrix(); + /// dense matrix = dense matrix * dense matrix + if (!inputs[0].isSparseArg() && !inputs[1].isSparseArg() && + !outputs[0].isSparseArg()) { + MulOp(outMat, + inputs[0].matrix(), + inputs[1].matrix(), + 1.0, // scaleAB + scaleT, + aTrans_, + bTrans_); + return; + } + + /// dense matrix = dense matrix * sparse matrix + if (!inputs[0].isSparseArg() && inputs[1].isSparseArg() && + !outputs[0].isSparseArg()) { + CHECK(!aTrans_) << "Not supported a transpose"; + MulOp(outMat, + inputs[0].matrix(), + inputs[1].sparse().SparseMatrix(), + 1.0, // scaleAB + scaleT, + aTrans_, + bTrans_); + return; + } + + /// dense matrix = sparse matrix * dense matrix + if (inputs[0].isSparseArg() && !inputs[1].isSparseArg() && + !outputs[0].isSparseArg()) { + CHECK(!bTrans_) << "Not supported b transpose"; + CHECK_EQ(inputs[0].sparse().dataFormat(), T_SPARSE_CSR) + << "Only supported SPARSE_CSR format for sparse matrix a"; + MulOp(outMat, + inputs[0].sparse().SparseMatrix(), + inputs[1].matrix(), + 1.0, // scaleAB + scaleT, + aTrans_, + bTrans_); + return; + } + + /// sparse matrix = dense matrix * dense matrix + auto outSparseMat = outputs[0].sparse().SparseMatrix(); + if (!inputs[0].isSparseArg() && !inputs[1].isSparseArg() && + outputs[0].isSparseArg()) { + MulOp(outSparseMat, + inputs[0].matrix(), + inputs[1].matrix(), + 1.0, // scaleAB + scaleT, + aTrans_, + bTrans_); + return; + } + } + +private: + bool aTrans_; + bool bTrans_; +}; + +REGISTER_TYPED_FUNC(MulOp, CPU, MulFunc); +#ifndef PADDLE_ONLY_CPU +REGISTER_TYPED_FUNC(MulOp, GPU, MulFunc); +#endif +} // namespace paddle diff --git a/paddle/function/MulOp.h b/paddle/function/MulOp.h new file mode 100644 index 0000000000000000000000000000000000000000..b6016a6ab6e9d6549b359573ecc2b33900a58365 --- /dev/null +++ b/paddle/function/MulOp.h @@ -0,0 +1,102 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include "Function.h" +#include "paddle/math/Matrix.h" +#include "paddle/math/SparseMatrix.h" + +namespace paddle { +/// CPU, dense matrix (+)= dense matrix * dense matrix +template +void MulOp(CpuMatrix& out, + const CpuMatrix& a, + const CpuMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans); + +/// CPU, dense matrix (+)= sparse matrix * dense matrix +template +void MulOp(CpuMatrix& out, + const CpuSparseMatrix& a, + const CpuMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans); + +/// CPU, dense matrix (+)= dense matrix * sparse matrix +template +void MulOp(CpuMatrix& out, + const CpuMatrix& a, + const CpuSparseMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans); + +/// CPU, sparse matrix (+)= dense matrix * dense matrix +template +void MulOp(CpuSparseMatrix& out, + const CpuMatrix& a, + const CpuMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans); + +/// GPU, dense matrix (+)= dense matrix * dense matrix +template +void MulOp(GpuMatrix& out, + const GpuMatrix& a, + const GpuMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans); + +/// GPU, dense matrix (+)= sparse matrix * dense matrix +template +void MulOp(GpuMatrix& out, + const GpuSparseMatrix& a, + const GpuMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans); + +/// GPU, dense matrix (+)= dense matrix * sparse matrix +template +void MulOp(GpuMatrix& out, + const GpuMatrix& a, + const GpuSparseMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans); + +/// GPU, sparse matrix (+)= dense matrix * dense matrix +template +void MulOp(GpuSparseMatrix& out, + const GpuMatrix& a, + const GpuMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans); + +} // namespace paddle diff --git a/paddle/function/MulOpGpu.cu b/paddle/function/MulOpGpu.cu new file mode 100644 index 0000000000000000000000000000000000000000..dcfcb2325d7dae22e0e0e78fc0bddf061fc0940c --- /dev/null +++ b/paddle/function/MulOpGpu.cu @@ -0,0 +1,130 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "hl_base.h" +#include "MulOp.h" +#include "paddle/math/Matrix.h" +#include "paddle/math/SparseMatrix.h" + +namespace paddle { +/// dense matrix (+)= dense matrix * dense matrix +template <> +void MulOp(GpuMatrix& out, + const GpuMatrix& a, + const GpuMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans) { + CHECK(a.useGpu_ && b.useGpu_) << "matrix device type not match"; + hl_matrix_mul(const_cast(a.getData()), + !aTrans ? HPPL_OP_N : HPPL_OP_T, + const_cast(b.getData()), + !bTrans ? HPPL_OP_N : HPPL_OP_T, + const_cast(out.getData()), + out.getHeight(), + out.getWidth(), + !aTrans ? a.getWidth() : a.getHeight(), + scaleAB, + scaleT, + a.getStride(), + b.getStride(), + out.getStride()); +} + +/// dense matrix (+)= sparse matrix * dense matrix +template <> +void MulOp(GpuMatrix& out, + const GpuSparseMatrix& a, + const GpuMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans) { + CHECK(out.isContiguous()); + CHECK(b.isContiguous()); + CHECK(a.useGpu_ && b.useGpu_) << "matrix device type not match"; + hl_matrix_csr_mul_dense(a.sMatrix_.get(), + aTrans ? HPPL_OP_T : HPPL_OP_N, + const_cast(b.getData()), + HPPL_OP_N, + const_cast(out.getData()), + out.getHeight(), + out.getWidth(), + b.getHeight(), + scaleAB, + scaleT); +} + +/// dense matrix (+)= dense matrix * sparse matrix +template <> +void MulOp(GpuMatrix& out, + const GpuMatrix& a, + const GpuSparseMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans) { + CHECK(out.isContiguous()); + CHECK(a.isContiguous()); + CHECK(a.useGpu_ && b.useGpu_) << "matrix device type not match"; + + if (b.format_ == SPARSE_CSC) { + hl_matrix_dense_mul_csc(const_cast(a.getData()), + HPPL_OP_N, + b.sMatrix_.get(), + bTrans ? HPPL_OP_T : HPPL_OP_N, + const_cast(out.getData()), + out.getHeight(), + out.getWidth(), + a.getWidth(), + scaleAB, + scaleT); + } else { + hl_matrix_dense_mul_csr(const_cast(a.getData()), + HPPL_OP_N, + b.sMatrix_.get(), + bTrans ? HPPL_OP_T : HPPL_OP_N, + const_cast(out.getData()), + out.getHeight(), + out.getWidth(), + a.getWidth(), + scaleAB, + scaleT); + } +} + +/// sparse matrix (+)= dense matrix * dense matrix +template <> +void MulOp(GpuSparseMatrix& out, + const GpuMatrix& a, + const GpuMatrix& b, + real scaleAB, + real scaleT, + bool aTrans, + bool bTrans) { + CHECK(a.useGpu_ && b.useGpu_) << "matrix device type not match"; + hl_sparse_matrix_mul(const_cast(a.getData()), + aTrans ? HPPL_OP_T : HPPL_OP_N, + const_cast(b.getData()), + bTrans ? HPPL_OP_T : HPPL_OP_N, + out.sMatrix_.get(), + out.getHeight(), + out.getWidth(), + !bTrans ? b.getHeight() : b.getWidth(), + scaleAB, + scaleT); +} + +} // namespace paddle diff --git a/paddle/function/MulOpTest.cpp b/paddle/function/MulOpTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..158c3c90983b12c352765479006669c5c9e5a8aa --- /dev/null +++ b/paddle/function/MulOpTest.cpp @@ -0,0 +1,212 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include "FunctionTest.h" +#include "paddle/math/Matrix.h" +#include "paddle/math/SparseMatrix.h" +#include "paddle/math/tests/test_matrixUtil.h" +#include "paddle/testing/TestUtil.h" + +using namespace paddle; // NOLINT + +/** + * C += A * B, A, B, C dense matrix + * dense = dense * dense + */ +void testFuncDDDMatrix( + bool transa, bool transb, size_t dimM, size_t dimN, size_t dimK) { + real scaleT = 1.0; + size_t heightA = (transa == false) ? dimM : dimK; + size_t widthA = (transa == false) ? dimK : dimM; + size_t heightB = (transb == false) ? dimK : dimN; + size_t widthB = (transb == false) ? dimN : dimK; + size_t heightC = dimM; + size_t widthC = dimN; + // init Test object + FunctionCompare test( + "MulOp", FuncConfig().set("aTrans", transa).set("bTrans", transb)); + // prepare input arguments + /// matrix A : HA * WA + test.addInputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{heightA, widthA})); + /// matrix B: HB * WB + test.addInputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{heightB, widthB})); + + /// output matrix C: HC * WC + test.addOutputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{heightC, widthC}), + scaleT == 1.0 ? ADD_TO : ASSIGN_TO); + // run Function + test.run(); +} + +TEST(MulOp, DDDMatrixMul) { + LOG(INFO) << "function test for dense = dense * dense matrix"; + for (const auto transa : {false, true}) { + for (const auto transb : {false, true}) { + for (const auto dimM : {1, 10, 100}) { + for (const auto dimN : {1, 10}) { + for (const auto dimK : {8}) { + if (transa && transb) { + continue; + } + VLOG(3) << setiosflags(std::ios::left) << std::setfill(' ') + << " transa=" << transa << " transb=" << transb + << " dimM=" << std::setw(5) << dimM + << " dimN=" << std::setw(5) << dimN + << " dimK=" << std::setw(5) << dimK; + testFuncDDDMatrix(transa, transb, dimM, dimN, dimK); + } + } + } + } + } +} + +/** + * C += A * B, B, C dense, A sparse + * dense = sparse * dense + */ +void testFuncDSparseDMatrix( + size_t dimM, size_t dimN, size_t dimK, size_t nnz, SparseFormat FORMAT) { + real scaleT = 1.0; + // init Test object + FunctionCompare test("MulOp", + FuncConfig().set("aTrans", false).set("bTrans", false)); + // prepare input arguments + /// sparse matrix A : M * K + test.addInputs(SparseMatrixArg( + VALUE_TYPE_FLOAT, TensorShape{dimM, dimK}, nnz, FORMAT, FLOAT_VALUE)); + /// matrix B: K * N + test.addInputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{dimK, dimN})); + + /// output matrix C: M * N + test.addOutputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{dimM, dimN}), + scaleT == 1.0 ? ADD_TO : ASSIGN_TO); + // run Function + test.run(); +} + +TEST(MuLOp, DSparseDMul) { + LOG(INFO) << "function test for dense = sparse * dense matrix"; + for (const auto dimM : {10, 100, 1000}) { + for (const auto dimN : {10, 100}) { + for (const auto dimK : {3, 10}) { + for (const auto nnz : {3, 10}) { + for (const auto FORMAT : {SPARSE_CSR}) { + VLOG(3) << setiosflags(std::ios::left) << std::setfill(' ') + << " dimM=" << std::setw(5) << dimM + << " dimN=" << std::setw(5) << dimN + << " dimK=" << std::setw(5) << dimK + << " nnz=" << std::setw(5) << nnz + << " format=" << std::setw(5) << FORMAT; + testFuncDSparseDMatrix(dimM, dimN, dimK, nnz, FORMAT); + } + } + } + } + } +} + +/** + * C += A * B, A, C dense, B sparse + * dense = dense * sparse + */ +void testFuncDDSparseMatrix( + size_t dimM, size_t dimN, size_t dimK, size_t nnz, SparseFormat FORMAT) { + real scaleT = 1.0; + // init Test object + FunctionCompare test("MulOp", + FuncConfig().set("aTrans", false).set("bTrans", false)); + // prepare input arguments + /// matrix A : M * K + test.addInputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{dimM, dimK})); + + /// matrix B: K * N + test.addInputs(SparseMatrixArg( + VALUE_TYPE_FLOAT, TensorShape{dimK, dimN}, nnz, FORMAT, FLOAT_VALUE)); + + /// output matrix C: M * N + test.addOutputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{dimM, dimN}), + scaleT == 1.0 ? ADD_TO : ASSIGN_TO); + // run Function + test.run(); +} + +TEST(MulOp, DDSparseMul) { + LOG(INFO) << "function test for dense = dense * sparse matrix"; + for (const auto dimM : {10, 100, 1000}) { + for (const auto dimN : {10, 100}) { + for (const auto dimK : {3, 10}) { + for (const auto nnz : {3, 10}) { + for (const auto FORMAT : {SPARSE_CSR, SPARSE_CSC}) { + VLOG(3) << setiosflags(std::ios::left) << std::setfill(' ') + << " dimM=" << std::setw(5) << dimM + << " dimN=" << std::setw(5) << dimN + << " dimK=" << std::setw(5) << dimK + << " nnz=" << std::setw(5) << nnz + << " format=" << std::setw(5) << FORMAT; + testFuncDDSparseMatrix(dimM, dimN, dimK, nnz, FORMAT); + } + } + } + } + } +} + +/** + * C += A * B, A sparse, B, C dense + * sparse = dense * dense + */ +void testFuncSparseDDMatrix( + size_t dimM, size_t dimN, size_t dimK, size_t nnz, SparseFormat FORMAT) { + real scaleT = 1.0; + // init Test object + FunctionCompare test("MulOp", + FuncConfig().set("aTrans", false).set("bTrans", false)); + // prepare input arguments + /// matrix A : M * K + test.addInputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{dimM, dimK})); + + /// matrix B: K * N + test.addInputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{dimK, dimN})); + + /// output sparse matrix C: M * N + test.addOutputs( + SparseMatrixArg( + VALUE_TYPE_FLOAT, TensorShape{dimM, dimN}, nnz, FORMAT, FLOAT_VALUE), + scaleT == 1.0 ? ADD_TO : ASSIGN_TO); + // run Function + test.run(); +} + +TEST(MulOp, SparseDDMul) { + LOG(INFO) << "function test for sparse = dense * dense matrix"; + for (const auto dimM : {10, 100, 1000}) { + for (const auto dimN : {10, 100}) { + for (const auto dimK : {3, 10}) { + for (const auto nnz : {3, 10}) { + for (const auto FORMAT : {SPARSE_CSC, SPARSE_CSR}) { + VLOG(3) << setiosflags(std::ios::left) << std::setfill(' ') + << " dimM=" << std::setw(5) << dimM + << " dimN=" << std::setw(5) << dimN + << " dimK=" << std::setw(5) << dimK + << " nnz=" << std::setw(5) << nnz + << " format=" << std::setw(5) << FORMAT; + testFuncSparseDDMatrix(dimM, dimN, dimK, nnz, FORMAT); + } + } + } + } + } +} diff --git a/paddle/function/TensorType.h b/paddle/function/TensorType.h index 98942cff9e2ea44e78727d66a059ab8cf5f0ef7c..8308bbd8ad4fe1b97b35b779f27d2bf4534f0fa6 100644 --- a/paddle/function/TensorType.h +++ b/paddle/function/TensorType.h @@ -31,6 +31,10 @@ enum DeviceType { DEVICE_TYPE_GPU = 2 }; +enum SparseDataType { T_NO_VALUE = 0, T_FLOAT_VALUE = 1 }; + +enum SparseDataFormat { T_SPARSE_CSR = 0, T_SPARSE_CSC = 1 }; + inline int sizeOfValuType(ValueType valueType) { if (valueType == VALUE_TYPE_INT32) { return 4; @@ -87,6 +91,29 @@ struct MatrixT { using type = void; // Not implemented }; +template +struct SparseMatrixT; + +template <> +struct SparseMatrixT { + using type = CpuSparseMatrix; +}; + +template <> +struct SparseMatrixT { + using type = GpuSparseMatrix; +}; + +template <> +struct SparseMatrixT { + using type = void; // Not implemented +}; + +template <> +struct SparseMatrixT { + using type = void; // Not implemented +}; + template struct VectorT; @@ -114,8 +141,9 @@ struct VectorT { template struct Tensor { - typedef typename detail::MatrixT::type Matrix; typedef typename detail::VectorT::type Vector; + typedef typename detail::MatrixT::type Matrix; + typedef typename detail::SparseMatrixT::type SparseMatrix; }; } // namespace paddle diff --git a/paddle/gserver/gradientmachines/GradientMachine.cpp b/paddle/gserver/gradientmachines/GradientMachine.cpp index 36ca05b919b136c162105cf4f1fb7705ae7ca7f3..3eb87d9b85c8207a23046fdb4bda06ba8185e2a3 100644 --- a/paddle/gserver/gradientmachines/GradientMachine.cpp +++ b/paddle/gserver/gradientmachines/GradientMachine.cpp @@ -60,55 +60,6 @@ GradientMachine* GradientMachine::create( return nullptr; } -GradientMachine* GradientMachine::create(const std::string& modelFile, - DataConfig* dataConfig) { - std::ifstream is(modelFile); - CHECK(is) << "Fail to open " << modelFile; - return create(is, dataConfig); -} - -GradientMachine* GradientMachine::create(std::istream& is, - DataConfig* dataConfig) { - TrainerConfig trainerConfig; - GradientMachine* ret = create(is, &trainerConfig); - if (dataConfig && trainerConfig.has_data_config()) { - *dataConfig = trainerConfig.data_config(); - } - return ret; -} - -GradientMachine* GradientMachine::create(const std::string& modelFile, - TrainerConfig* trainerConfig) { - std::ifstream is(modelFile); - CHECK(is) << "Fail to open " << modelFile; - return create(is, trainerConfig); -} - -GradientMachine* GradientMachine::create(std::istream& is, - TrainerConfig* trainerConfig) { - TrainerConfig trainerConfigTemp; - int64_t size; - CHECK(is.read((char*)&size, sizeof(size))) << "Fail to read "; - std::string buf; - buf.resize(size); - CHECK(is.read(&buf[0], size)) << "Fail to read "; - CHECK(trainerConfigTemp.ParseFromString(buf)) << "Fail to parse config"; - std::unique_ptr machine( - create(trainerConfigTemp.model_config())); - std::vector& parameters = machine->getParameters(); - for (auto& para : parameters) { - para->load(is); - } - - machine->onLoadParameter(); - - if (trainerConfig) { - *trainerConfig = trainerConfigTemp; - } - - return machine.release(); -} - void GradientMachine::saveParameters(const std::string& dir) const { LOG(INFO) << "Saving parameters to " << dir; diff --git a/paddle/gserver/gradientmachines/GradientMachine.h b/paddle/gserver/gradientmachines/GradientMachine.h index 1e35c7e2b8d185e45f33f6287ad4e32ccad2d5a6..0829968d87c5dc7eeb2d1b70c758ff305d89496f 100644 --- a/paddle/gserver/gradientmachines/GradientMachine.h +++ b/paddle/gserver/gradientmachines/GradientMachine.h @@ -89,39 +89,6 @@ public: std::vector{ PARAMETER_VALUE, PARAMETER_GRADIENT, PARAMETER_MOMENTUM}); - /** - * Create a gradient machine from the merged model file. - * The merged model file can be generated using tools/merge_model - * If dataConfig is not null, it will be filled with the DataConfig - * from the TrainerConfig - */ - static GradientMachine* create(const std::string& modelFile, - DataConfig* dataConfig); - - /** - * Create a gradient machine from a stream which contains the merged - * model file. The merged model file can be generated using tools/merge_model - * If dataConfig is not null, it will be filled with the DataConfig - * from the TrainerConfig - */ - static GradientMachine* create(std::istream& is, DataConfig* dataConfig); - - /** - * Create a gradient machine from the merged model file. - * The merged model file can be generated using tools/merge_model - * If trainerConfig is not null, it will be filled with the TrainerConfig - */ - static GradientMachine* create(const std::string& modelFile, - TrainerConfig* trainerConfig); - - /** - * Create a gradient machine from a stream which contains the merged - * model file. The merged model file can be generated using tools/merge_model - * If trainerConfig is not null, it will be filled with the TrainerConfig - */ - static GradientMachine* create(std::istream& is, - TrainerConfig* trainerConfig); - virtual ~GradientMachine() {} /** diff --git a/paddle/math/Matrix.h b/paddle/math/Matrix.h index dd24f8821d49768354840e0381742218ab9a0204..57c0c2fe40a95d75ca580196e6f9ae36ce6edcdc 100644 --- a/paddle/math/Matrix.h +++ b/paddle/math/Matrix.h @@ -31,6 +31,7 @@ limitations under the License. */ namespace paddle { +/// TODO(tianbing), move to paddle/function/TensorType.h enum SparseValueType { NO_VALUE = 0, FLOAT_VALUE = 1 }; /** @@ -56,6 +57,7 @@ enum SparseValueType { NO_VALUE = 0, FLOAT_VALUE = 1 }; * value [1, 1, 2, 2, 5] * @endcode */ +/// TODO(tianbing), move to paddle/function/TensorType.h enum SparseFormat { SPARSE_CSR = 0, SPARSE_CSC = 1 }; class Matrix; diff --git a/paddle/math/SparseMatrix.cpp b/paddle/math/SparseMatrix.cpp index 720a035ecbd26df01fe24c991982bbf7965ccbdc..3bae6d373f240fcc773644386b290ef9874828ae 100644 --- a/paddle/math/SparseMatrix.cpp +++ b/paddle/math/SparseMatrix.cpp @@ -177,7 +177,6 @@ GpuSparseMatrix::GpuSparseMatrix(real* value, hl_sparse_matrix_s_ptr tmp2(tmp, hl_destruct_sparse_matrix); sMatrix_ = tmp2; } - LOG(INFO) << "weight to matrix "; } } diff --git a/paddle/math/SparseRowMatrix.cpp b/paddle/math/SparseRowMatrix.cpp index b61c6b2d49ccead5e9cfdf595a8bebae0e5b87b5..b8c781ca1fd46c9840817abe26a20eec005c37e9 100644 --- a/paddle/math/SparseRowMatrix.cpp +++ b/paddle/math/SparseRowMatrix.cpp @@ -24,10 +24,6 @@ limitations under the License. */ #include "paddle/utils/Thread.h" #include "paddle/utils/Util.h" -DEFINE_bool(allow_inefficient_sparse_update, - false, - "Whether to allow inefficient sparse update"); - namespace paddle { const unsigned int SparseRowCpuMatrix::kUnusedId_ = -1U; diff --git a/paddle/math/SparseRowMatrix.h b/paddle/math/SparseRowMatrix.h index c05fc98ff9fe739688ed3c21466fb29b70e36854..1ccbf97b25922ae52377d7048da3a07012d21003 100644 --- a/paddle/math/SparseRowMatrix.h +++ b/paddle/math/SparseRowMatrix.h @@ -21,8 +21,6 @@ limitations under the License. */ #include "RowBuffer.h" #include "paddle/utils/Util.h" -DECLARE_bool(allow_inefficient_sparse_update); - namespace paddle { /** @@ -183,11 +181,10 @@ protected: inline void checkStoreSize() { if (buf_->isAutoGrowth()) { if (buf_->getRowCount() > 0.5 * height_) { - LOG(WARNING) - << "There are more than 0.5*height (" << localIndices_->size() - << ") rows are used for sparse " - << "update, which is not efficient. Considering not use " - << "sparse_update or set --allow_inefficient_sparse_update=true"; + LOG(WARNING) << "There are more than 0.5*height (" + << localIndices_->size() << ") rows are used for sparse " + << "update, which is not efficient. Considering not use " + << "sparse_update."; } } else { CHECK_LE(localIndices_->size(), buf_->getRowCount()); diff --git a/paddle/math/tests/test_matrixUtil.h b/paddle/math/tests/test_matrixUtil.h index 9aa74b15193723970d80b5d1a4e0ac95341cd45a..47f461474622d13ea2f922a77348c78b450ec37f 100644 --- a/paddle/math/tests/test_matrixUtil.h +++ b/paddle/math/tests/test_matrixUtil.h @@ -30,6 +30,17 @@ void checkMatrixEqual(const MatrixPtr& a, const MatrixPtr& b) { } } +void checkSMatrixEqual(const CpuSparseMatrix& a, const CpuSparseMatrix& b) { + ASSERT_EQ(a.getWidth(), b.getWidth()); + ASSERT_EQ(a.getHeight(), b.getHeight()); + ASSERT_EQ(a.isTransposed(), b.isTransposed()); + ASSERT_EQ(a.getFormat(), b.getFormat()); + ASSERT_EQ(a.getElementCnt(), b.getElementCnt()); + for (size_t r = 0; r < a.getElementCnt(); ++r) { + ASSERT_FLOAT_EQ(a.getValue()[r], b.getValue()[r]); + } +} + void checkSMatrixEqual(const CpuSparseMatrixPtr& a, const CpuSparseMatrixPtr& b) { ASSERT_EQ(a->getWidth(), b->getWidth()); @@ -73,6 +84,36 @@ void checkSMatrixEqual2(const CpuSparseMatrixPtr& a, } } +void checkSMatrixEqual2Dense(const CpuSparseMatrix& a, const CpuMatrix& b) { + ASSERT_EQ(a.getWidth(), b.getWidth()); + ASSERT_EQ(a.getHeight(), b.getHeight()); + ASSERT_EQ(a.isTransposed(), b.isTransposed()); + + if (a.getFormat() == SPARSE_CSC) { + int* rows = a.getRows(); + for (size_t i = 0; i < a.getWidth(); i++) { + for (size_t j = a.getColStartIdx(i); j < a.getColStartIdx(i + 1); j++) { + if (a.getValueType() == FLOAT_VALUE) { + ASSERT_FLOAT_EQ(a.getValue()[j], b.getElement(rows[j], i)); + } else { + ASSERT_FLOAT_EQ(1.0, b.getElement(rows[j], i)); + } + } + } + } else { + int* cols = a.getCols(); + for (size_t i = 0; i < a.getHeight(); i++) { + for (size_t j = a.getRowStartIdx(i); j < a.getRowStartIdx(i + 1); j++) { + if (a.getValueType() == FLOAT_VALUE) { + ASSERT_FLOAT_EQ(a.getValue()[j], b.getElement(i, cols[j])); + } else { + ASSERT_FLOAT_EQ(1.0, b.getElement(i, cols[j])); + } + } + } + } +} + void checkSMatrixEqual2Dense(const CpuSparseMatrixPtr& a, const CpuMatrixPtr& b) { ASSERT_EQ(a->getWidth(), b->getWidth()); diff --git a/paddle/trainer/Trainer.cpp b/paddle/trainer/Trainer.cpp index 8465addaf9e03831e914be2c73901c3b1a9d537f..bd84545375117b178d4324f0ad03f5bc35ae925d 100644 --- a/paddle/trainer/Trainer.cpp +++ b/paddle/trainer/Trainer.cpp @@ -90,16 +90,6 @@ DEFINE_string(model_list, "", "File that saves the model list when evaluation"); namespace paddle { -void Trainer::init(int argc, char** argv) { - initMain(argc, argv); - initPython(argc, argv); - - auto config = TrainerConfigHelper::createFromFlagConfig(); - feenableexcept(FE_INVALID | FE_DIVBYZERO | FE_OVERFLOW); - - init(config); -} - void Trainer::init(const std::shared_ptr& config, bool testing, const std::shared_ptr& gradientMachine, diff --git a/paddle/trainer/Trainer.h b/paddle/trainer/Trainer.h index 7cbf18ace7a5fed053653c73e62d36c388b15123..c8ee4726c24c335ceda22ea3a20049b01d11c149 100644 --- a/paddle/trainer/Trainer.h +++ b/paddle/trainer/Trainer.h @@ -71,11 +71,6 @@ public: const std::shared_ptr& dataProvider = nullptr, const std::shared_ptr& testDataProvider = nullptr); - /** - * Initialize Trainer from command line flags. - */ - void init(int argc, char** argv); - /** * Train until num_passes reached. * One pass means neural network train through all training data. diff --git a/paddle/trainer/tests/CMakeLists.txt b/paddle/trainer/tests/CMakeLists.txt index 22e07bd0e98a4cd36e6ed5860bcff0d4ae7cb1d2..c5c76a030d9e5f1deed63454b408442954ef5eae 100644 --- a/paddle/trainer/tests/CMakeLists.txt +++ b/paddle/trainer/tests/CMakeLists.txt @@ -1,11 +1,3 @@ -################# test_Prediction ###################### -add_unittest_without_exec(test_Prediction - test_Prediction.cpp) -add_test(NAME test_Prediction - COMMAND ${PROJ_ROOT}/paddle/.set_python_path.sh -d ${PROJ_ROOT}/python - ${CMAKE_CURRENT_BINARY_DIR}/test_Prediction --merger=${CMAKE_CURRENT_BINARY_DIR}/../paddle_merge_model - WORKING_DIRECTORY ${PROJ_ROOT}/paddle/) - ################# test_Compare ############################ add_unittest_without_exec(test_Compare test_Compare.cpp) diff --git a/paddle/trainer/tests/test_Prediction.cpp b/paddle/trainer/tests/test_Prediction.cpp deleted file mode 100644 index 0c79404eee1c0902c5c8e8eefd139da3da584636..0000000000000000000000000000000000000000 --- a/paddle/trainer/tests/test_Prediction.cpp +++ /dev/null @@ -1,174 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include - -#include "paddle/trainer/Trainer.h" - -#include - -DECLARE_string(config); -DECLARE_string(config_args); -DEFINE_string(merger, - "./paddle_merge_model", - "path to paddle_merge_model binary"); - -using namespace paddle; // NOLINT -using namespace std; // NOLINT - -static const string& configFile = "trainer/tests/sample_trainer_config.conf"; -static const string& mergedModelFile = "./test_model_file"; -static const string& modelDir = "./test_model_dir"; - -void checkBuffer(real* vec1, real* vec2, size_t len) { - for (size_t i = 0; i < len; i++) { - EXPECT_EQ(vec1[i], vec2[i]) << "vec1:" << vec1[i] << " vec2:" << vec2[i]; - } -} - -void checkParameters(vector A, vector B) { - CHECK_EQ(B.size(), A.size()) << "parameter size not equal"; - for (size_t i = 0; i < A.size(); i++) { - auto vec1 = A[i]->getBuf(PARAMETER_VALUE); - auto vec2 = B[i]->getBuf(PARAMETER_VALUE); - CHECK_EQ(vec1->useGpu_, vec2->useGpu_) << "use gpu not equal"; - CHECK_EQ(vec1->getSize(), vec2->getSize()) << "size not equal"; - - if (vec1->useGpu_ == false) { - checkBuffer(vec1->getData(), vec2->getData(), vec1->getSize()); - } else { - VectorPtr cpuVec1 = Vector::create(vec1->getSize(), false); - VectorPtr cpuVec2 = Vector::create(vec2->getSize(), false); - cpuVec1->copyFrom(*vec1, HPPL_STREAM_DEFAULT); - cpuVec2->copyFrom(*vec2, HPPL_STREAM_DEFAULT); - hl_stream_synchronize(HPPL_STREAM_DEFAULT); - checkBuffer(cpuVec1->getData(), cpuVec2->getData(), cpuVec1->getSize()); - } - } -} - -TEST(GradientMachine, create) { -#ifdef PADDLE_ONLY_CPU - FLAGS_use_gpu = false; -#endif - mkDir(modelDir.c_str()); - FLAGS_config = configFile; - FLAGS_config_args = "with_cost=False"; - auto config = TrainerConfigHelper::createFromFlagConfig(); - - // save model to directory - unique_ptr gradientMachine1( - GradientMachine::create(*config)); - gradientMachine1->saveParameters(modelDir); - Trainer trainer; - trainer.init(config); - ParameterUtil* paramUtil = trainer.getParameterUtilPtr(); - if (paramUtil != NULL) { - paramUtil->saveConfigWithPath(modelDir); - } - - // create a different GradientMachine - unique_ptr gradientMachine2( - GradientMachine::create(*config)); - gradientMachine2->randParameters(); - - // merge config and model to one file - string cmd = FLAGS_merger + " --model_dir=" + modelDir + - " --config_args=with_cost=False" + " --model_file=" + - mergedModelFile; - LOG(INFO) << cmd; - int ret = system(cmd.c_str()); - EXPECT_EQ(0, ret); - if (ret) { - return; - } - - // create GradientMachine from the merged model - DataConfig dataConfig; - unique_ptr gradientMachine3( - GradientMachine::create(mergedModelFile, &dataConfig)); - CHECK(gradientMachine3); - EXPECT_EQ(dataConfig.type(), "simple"); - EXPECT_EQ(dataConfig.feat_dim(), 3); - - // compare the parameters of GradientMachine and GradientMachine3 - std::vector paraMachine1 = gradientMachine1->getParameters(); - std::vector paraMachine3 = gradientMachine3->getParameters(); - checkParameters(paraMachine1, paraMachine3); - - // Test that the GradientMachine created from the merged model - // is same as the orginnal one. - vector inArgs(1); - vector outArgs; - - int inputDim = 3; - int numSamples = 2; - CpuMatrix cpuInput(numSamples, inputDim); - for (int i = 0; i < numSamples; ++i) { - for (int j = 0; j < inputDim; ++j) { - cpuInput.getData()[i * inputDim + j] = - rand() / (real)RAND_MAX; // NOLINT TODO(yuyang): use rand_r - } - } - MatrixPtr input = Matrix::create(numSamples, - inputDim, - /* trans */ false, - FLAGS_use_gpu); - input->copyFrom(cpuInput); - inArgs[0].value = input; - gradientMachine1->forward(inArgs, &outArgs, PASS_TEST); - EXPECT_EQ((size_t)1, outArgs.size()); - - vector outArgs2; - gradientMachine2->forward(inArgs, &outArgs2, PASS_TEST); - CpuMatrix out1(outArgs[0].value->getHeight(), outArgs[0].value->getWidth()); - CpuMatrix out2(outArgs2[0].value->getHeight(), outArgs2[0].value->getWidth()); - out1.copyFrom(*outArgs[0].value); - out2.copyFrom(*outArgs2[0].value); - for (size_t i = 0; i < out1.getHeight() * out1.getWidth(); i++) { - EXPECT_NE(out1.getData()[i], out2.getData()[i]); - } - - gradientMachine3->forward(inArgs, &outArgs2, PASS_TEST); - out2.copyFrom(*outArgs2[0].value); - checkBuffer( - out1.getData(), out2.getData(), out2.getHeight() * out2.getWidth()); - - cmd = " rm -rf " + modelDir + "/*"; - LOG(INFO) << "cmd " << cmd; - ret = system(cmd.c_str()); - EXPECT_EQ(0, ret); - if (ret) { - return; - } - - cmd = " rm -rf " + mergedModelFile; - LOG(INFO) << "cmd " << cmd; - ret = system(cmd.c_str()); - EXPECT_EQ(0, ret); - if (ret) { - return; - } - - // clean up - rmDir(modelDir.c_str()); - remove(mergedModelFile.c_str()); -} - -int main(int argc, char** argv) { - initMain(argc, argv); - initPython(argc, argv); - testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/paddle/utils/Flags.cpp b/paddle/utils/Flags.cpp index 59d6cbdc513660b87cb013d8aa92c5c8f9289ecb..e8f31bc811ac30d83e8203b784ee1f93a8d35d90 100644 --- a/paddle/utils/Flags.cpp +++ b/paddle/utils/Flags.cpp @@ -33,12 +33,15 @@ DEFINE_int32(port, 20134, "Listening port for pserver"); DEFINE_int32(data_server_port, 21134, "Listening port for dserver"); DEFINE_int32(ports_num, 1, - "The ports number for parameter send," - " increment based on default port number"); + "Number of ports for sending dense parameter," + " following ports on parameter server will be visited" + " for sending dense parameter: [port, port+ports_num-1]"); DEFINE_int32(ports_num_for_sparse, 0, - "The ports number for parameter send," - " increment based on default (port + ports_num)"); + "Number of ports for sending sparse parameter," + " following ports on parameter server will be visited" + " for sending sparse parameter:" + " [port+ports_num, port+ports_num+ports_num_for_sparse-1]"); DEFINE_string(nics, "xgbe0,xgbe1", "network device name for pservers"); DEFINE_string(rdma_tcp, "tcp", "use rdma or tcp rdma transport protocol"); DEFINE_int32(trainer_id, diff --git a/proto/ParameterServerConfig.proto b/proto/ParameterServerConfig.proto index 3068bba8b10d89b432b41076dc6eb3ebc40b3883..404f9613792653dda72eeb98f022851adedbfbfd 100644 --- a/proto/ParameterServerConfig.proto +++ b/proto/ParameterServerConfig.proto @@ -27,11 +27,14 @@ message ParameterClientConfig { * Configuration structure for ParameterServer2. */ message ParameterServerConfig { - // The ports number for parameter send, - // increment based on default port number + // Number of ports for sending dense parameter, + // following ports on parameter server will be visited + // for sending dense parameter: [port, port+ports_num-1] required int32 ports_num = 1 [default = 1]; - // The ports number for parameter send, - // increment based on default (port + ports_num + // Number of ports for sending sparse parameter, + // following ports on parameter server will be visited + // for sending sparse parameter: + // [port+ports_num, port+ports_num+ports_num_for_sparse-1] required int32 ports_num_for_sparse = 2 [default = 0]; // network device name for pservers required string nics = 3 [default = "xgbe0,xgbe1"]; diff --git a/python/paddle/v2/__init__.py b/python/paddle/v2/__init__.py index f662d6826321eb840739382558f76327d27b5847..b2ea87b086101d71e89c33ce7c1f4eb21afade5a 100644 --- a/python/paddle/v2/__init__.py +++ b/python/paddle/v2/__init__.py @@ -11,3 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import optimizer + +__all__ = ['optimizer'] diff --git a/python/paddle/v2/optimizer.py b/python/paddle/v2/optimizer.py new file mode 100644 index 0000000000000000000000000000000000000000..aa2942bc9faeb2a353459cd619886f56ea32f450 --- /dev/null +++ b/python/paddle/v2/optimizer.py @@ -0,0 +1,58 @@ +import py_paddle.swig_paddle as swig_api +import paddle.trainer_config_helpers.optimizers as v1_optimizers +import paddle.trainer_config_helpers.config_parser_utils as config_parser_utils +import paddle.v2 + +__all__ = ['Adam', 'Adamax'] + + +class Optimizer(object): + def __init__(self, **kwargs): + if 'batch_size' in kwargs: + del kwargs['batch_size'] # not important for python library. + + def __impl__(): + v1_optimizers.settings(batch_size=1, **kwargs) + + self.__opt_conf_proto__ = config_parser_utils.parse_optimizer_config( + __impl__) + self.__opt_conf__ = swig_api.OptimizationConfig.createFromProto( + self.__opt_conf_proto__) + + def enable_types(self): + """ + get enable_types for each optimizer. + enable_types = [value, gradient, momentum, etc] + For each optimizer(SGD, Adam), GradientMachine should enable different + buffers. + """ + tmp = swig_api.ParameterOptimizer.create(self.__opt_conf__) + assert isinstance(tmp, swig_api.ParameterOptimizer) + return tmp.getParameterTypes() + + def create_local_updater(self): + return swig_api.ParameterUpdater.createLocalUpdater(self.__opt_conf__) + + def create_remote_updater(self, pass_num): + return swig_api.ParameterUpdater.createRemoteUpdater(self.__opt_conf__, + pass_num) + + +class Adam(Optimizer): + def __init__(self, beta1=0.9, beta2=0.999, epsilon=1e-8, **kwargs): + learning_method = v1_optimizers.AdamOptimizer( + beta1=beta1, beta2=beta2, epsilon=epsilon) + super(Adam, self).__init__(learning_method=learning_method, **kwargs) + + +class Adamax(Optimizer): + def __init__(self, beta1=0.9, beta2=0.999, **kwargs): + learning_method = v1_optimizers.AdamaxOptimizer( + beta1=beta1, beta2=beta2) + super(Adamax, self).__init__(learning_method=learning_method, **kwargs) + + +if __name__ == '__main__': + swig_api.initPaddle('--use_gpu=false') + opt = paddle.v2.optimizer.Adam() + print opt.enable_types()