提交 437c98d5 编写于 作者: 王益

Resolve conflicts

......@@ -13,15 +13,7 @@ import numpy as np
import random
from mnist_util import read_from_mnist
from paddle.trainer_config_helpers import *
def optimizer_config():
settings(
learning_rate=1e-4,
learning_method=AdamOptimizer(),
batch_size=1000,
model_average=ModelAverage(average_window=0.5),
regularization=L2Regularization(rate=0.5))
import paddle.v2
def network_config():
......@@ -75,19 +67,23 @@ def input_order_converter(generator):
def main():
api.initPaddle("-use_gpu=false", "-trainer_count=4") # use 4 cpu cores
# get enable_types for each optimizer.
# enable_types = [value, gradient, momentum, etc]
# For each optimizer(SGD, Adam), GradientMachine should enable different
# buffers.
opt_config_proto = parse_optimizer_config(optimizer_config)
opt_config = api.OptimizationConfig.createFromProto(opt_config_proto)
_temp_optimizer_ = api.ParameterOptimizer.create(opt_config)
enable_types = _temp_optimizer_.getParameterTypes()
optimizer = paddle.v2.optimizer.Adam(
learning_rate=1e-4,
batch_size=1000,
model_average=ModelAverage(average_window=0.5),
regularization=L2Regularization(rate=0.5))
# Create Local Updater. Local means not run in cluster.
# For a cluster training, here we can change to createRemoteUpdater
# in future.
updater = optimizer.create_local_updater()
assert isinstance(updater, api.ParameterUpdater)
# Create Simple Gradient Machine.
model_config = parse_network_config(network_config)
m = api.GradientMachine.createFromConfigProto(
model_config, api.CREATE_MODE_NORMAL, enable_types)
m = api.GradientMachine.createFromConfigProto(model_config,
api.CREATE_MODE_NORMAL,
optimizer.enable_types())
# This type check is not useful. Only enable type hint in IDE.
# Such as PyCharm
......@@ -96,12 +92,6 @@ def main():
# Initialize Parameter by numpy.
init_parameter(network=m)
# Create Local Updater. Local means not run in cluster.
# For a cluster training, here we can change to createRemoteUpdater
# in future.
updater = api.ParameterUpdater.createLocalUpdater(opt_config)
assert isinstance(updater, api.ParameterUpdater)
# Initialize ParameterUpdater.
updater.init(m)
......
......@@ -127,11 +127,6 @@
<td class="left"></td><td class="left"></td><td class="left"></td><td class="left"></td>
</tr>
<tr>
<td class="left">allow_inefficient_sparse_update</td>
<td class="left"></td><td class="left"></td><td class="left"></td><td class="left"></td>
</tr>
<tr>
<td class="left">start_pass</td>
<td class="left"></td><td class="left"></td><td class="left"></td><td class="left"></td>
......
......@@ -127,11 +127,6 @@ It looks like there are a lot of arguments. However, most of them are for develo
<td class="left"></td><td class="left"></td><td class="left"></td><td class="left"></td>
</tr>
<tr>
<td class="left">allow_inefficient_sparse_update</td>
<td class="left"></td><td class="left"></td><td class="left"></td><td class="left"></td>
</tr>
<tr>
<td class="left">start_pass</td>
<td class="left"></td><td class="left"></td><td class="left"></td><td class="left"></td>
......
......@@ -306,10 +306,6 @@
- 指示是否显示参数服务器上的稀疏参数分布的日志细节.
- 类型: bool (默认: 0).
* `--allow_inefficient_sparse_update`
- 指示是否允许低效率的稀疏更新.
- 类型: bool (默认: 0).
* `--check_sparse_distribution_batches`
- 每运行多少个批次执行一次稀疏参数分布的检查.
- 类型: int32 (默认: 100).
......
......@@ -310,10 +310,6 @@
- show log details for sparse parameter distribution in pserver.
- type: bool (default: 0).
* `--allow_inefficient_sparse_update`
- Whether to allow inefficient sparse update.
- type: bool (default: 0).
* `--check_sparse_distribution_batches`
- Running sparse parameter distribution check every so many batches.
- type: int32 (default: 100).
......
# Kubernetes on AWS
## Create AWS Account and IAM Account
# Distributed PaddlePaddle Training on AWS with Kubernetes
We will show you step by step on how to run distributed PaddlePaddle training on AWS cluster with Kubernetes. Let's start from core concepts.
## Distributed PaddlePaddle Training Core Concepts
### Distributed Training Job
A distributed training job is represented by a [Kubernetes job](https://kubernetes.io/docs/user-guide/jobs/#what-is-a-job).
Each Kuberentes job is described by a job config file, which specifies the information like the number of [pods](https://kubernetes.io/docs/user-guide/pods/#what-is-a-pod) in the job and environment variables.
In a distributed training job, we would:
1. prepare partitioned training data and configuration file on a distributed file system (in this tutorial we use Amazon Elastic File System), and
1. create and submit the Kubernetes job config to the Kubernetes cluster to start the training job.
### Parameter Servers and Trainers
There are two roles in a PaddlePaddle cluster: *parameter server (pserver)* and *trainer*. Each parameter server process maintains a shard of the global model. Each trainer has its local copy of the model, and uses its local data to update the model. During the training process, trainers send model updates to parameter servers, parameter servers are responsible for aggregating these updates, so that trainers can synchronize their local copy with the global model.
<center>![Model is partitioned into two shards. Managed by two parameter servers respectively.](src/pserver_and_trainer.png)</center>
In order to communicate with pserver, trainer needs to know the ip address of each pserver. In kubernetes it's better to use a service discovery mechanism (e.g., DNS hostname) rather than static ip address, since any pserver's pod may be killed and a new pod could be schduled onto another node of different ip address. However, now we are using static ip. This will be improved.
Parameter server and trainer are packaged into a same docker image. They will run once pod is scheduled by kubernetes job.
### Trainer ID
Each trainer process requires a trainer ID, a zero-based index value, passed in as a command-line parameter. The trainer process thus reads the data partition indexed by this ID.
### Training
The entry-point of a container is a shell script. It can see some environment variables pre-defined by Kubernetes. This includes one that gives the job's identity, which can be used in a remote call to the Kubernetes apiserver that lists all pods in the job.
We rank each pod by sorting them by their ips. The rank of each pod could be the "pod ID". Because we run one trainer and one parameter server in each pod, we can use this "pod ID" as the trainer ID. A detailed workflow of the entry-point script is as follows:
1. Query the api server to get pod information, and assign the `trainer_id` by sorting the ip.
1. Copy the training data from EFS persistent volume into container.
1. Parse the `paddle pserver` and `paddle trainer` startup parameters from environment variables, and then start up the processes.
1. Trainer with `train_id` 0 will automatically write results onto EFS volume.
## PaddlePaddle on AWS with Kubernetes
### Choose AWS Service Region
This tutorial requires several AWS services work in the same region. Before we create anything in AWS, please check the following link
https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services/
Choose a region which has the following services available: EC2, EFS, VPS, CloudFormation, KMS, VPC, S3.
In this tutorial, we use "Oregon(us-west-2)" as example.
### Create AWS Account and IAM Account
Under each AWS account, we can create multiple [IAM](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html) users. This allows us to grant some privileges to each IAM user and to create/operate AWS clusters as an IAM user.
......@@ -25,17 +75,13 @@ Please be aware that this tutorial needs the following privileges for the user i
- AWSKeyManagementServicePowerUser
## PaddlePaddle on AWS
Here we will show you step by step on how to run PaddlePaddle training on AWS cluster.
### Download kube-aws and kubectl
#### kube-aws
[kube-aws](https://github.com/coreos/kube-aws) is a CLI tool to automate cluster deployment to AWS.
##### Verify kube-aws integrity
Note: if you are using a non-official release (e.g RC release) kube-aws, you can skip this setp.
Import the CoreOS Application Signing Public Key:
```
......@@ -60,7 +106,7 @@ PLATFORM=darwin-amd64
gpg2 --verify kube-aws-${PLATFORM}.tar.gz.sig kube-aws-${PLATFORM}.tar.gz
```
##### Install kube-aws
Extract the binary:
```
......@@ -103,7 +149,6 @@ And then configure your AWS account information:
```
aws configure
```
......@@ -113,7 +158,7 @@ Fill in the required fields:
```
AWS Access Key ID: YOUR_ACCESS_KEY_ID
AWS Secrete Access Key: YOUR_SECRETE_ACCESS_KEY
Default region name: us-west-1
Default region name: us-west-2
Default output format: json
```
......@@ -131,25 +176,28 @@ aws ec2 describe-instances
The keypair that will authenticate SSH access to your EC2 instances. The public half of this key pair will be configured on each CoreOS node.
Follow [EC2 Keypair docs](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html) to create a EC2 key pair
Follow [EC2 Keypair User Guide](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html) to create a EC2 key pair
After creating a key pair, you will use the key pair name to configure the cluster.
Key pairs are only available to EC2 instances in the same region. We are using us-west-1 in our tutorial, so make sure to creat key pairs in that region (N. California).
Key pairs are only available to EC2 instances in the same region. We are using us-west-2 in our tutorial, so make sure to creat key pairs in that region (Oregon).
Your browser will download a `key-name.pem` file which is the key to access the EC2 instances. We will use it later.
#### KMS key
Amazon KMS keys are used to encrypt and decrypt cluster TLS assets. If you already have a KMS Key that you would like to use, you can skip creating a new key and provide the Arn string for your existing key.
You can create a KMS key in the AWS console, or with the aws command line tool:
You can create a KMS key with the aws command line tool:
```
aws kms --region=us-west-1 create-key --description="kube-aws assets"
aws kms --region=us-west-2 create-key --description="kube-aws assets"
{
"KeyMetadata": {
"CreationDate": 1458235139.724,
"KeyState": "Enabled",
"Arn": "arn:aws:kms:us-west-1:aaaaaaaaaaaaa:key/xxxxxxxxxxxxxxxxxxx",
"Arn": "arn:aws:kms:us-west-2:aaaaaaaaaaaaa:key/xxxxxxxxxxxxxxxxxxx",
"AWSAccountId": "xxxxxxxxxxxxx",
"Enabled": true,
"KeyUsage": "ENCRYPT_DECRYPT",
......@@ -161,14 +209,14 @@ aws kms --region=us-west-1 create-key --description="kube-aws assets"
We will need to use the value of `Arn` later.
And then you need to add several inline policies in your user permission.
And then let's add several inline policies in your IAM user permission.
Go to IAM user page, click on `Add inline policy` button, and then select `Custom Policy`
Go to [IAM Console](https://console.aws.amazon.com/iam/home?region=us-west-2#/home). Click on button `Users`, click user that we just created, and then click on `Add inline policy` button, and select `Custom Policy`.
paste into following inline policies:
Paste into following inline policies:
```
{
(Caution: node_0, node_1, node_2 directories represents PaddlePaddle node and train_id, not the Kubernetes node){
"Version": "2012-10-17",
"Statement": [
{
......@@ -195,39 +243,40 @@ paste into following inline policies:
"cloudformation:DescribeStackEvents"
],
"Resource": [
"arn:aws:cloudformation:us-west-1:AWS_ACCOUNT_ID:stack/MY_CLUSTER_NAME/*"
"arn:aws:cloudformation:us-west-2:AWS_ACCOUNT_ID:stack/MY_CLUSTER_NAME/*"
]
}
]
}
```
`Version` : Its value has to be exactly "2012-10-17".
`AWS_ACCOUNT_ID`: You can get it from following command line:
```
aws sts get-caller-identity --output text --query Account
```
`MY_CLUSTER_NAME`: Pick a MY_CLUSTER_NAME that you like, you will use it later as well.
`MY_CLUSTER_NAME`: Pick a MY_CLUSTER_NAME that you like, you will use it later as well.
Please note, stack name must satisfy regular expression pattern: [a-zA-Z][-a-zA-Z0-9*]*, which means no "_" or "-" in stack name, or kube-aws will throw error in later steps.
#### External DNS name
When the cluster is created, the controller will expose the TLS-secured API on a DNS name.
The A record of that DNS name needs to be point to the cluster ip address.
DNS name should have a CNAME points to cluster DNS name or an A record points to the cluster IP address.
We will need to use DNS name later in tutorial. If you don't already own one, you can choose any DNS name (e.g., `paddle`) and modify `/etc/hosts` to associate cluster ip with that DNS name.
We will need to use DNS name later in tutorial. If you don't already own one, you can choose any DNS name (e.g., `paddle`) and modify `/etc/hosts` to associate cluster IP with that DNS name for your local machine. And add name service (route53) in aws to associate the IP to paddle for cluster. We will find the cluster IP in later steps.
#### S3 bucket
You need to create an S3 bucket before startup the Kubernetes cluster.
There are some bugs in aws cli in creating S3 bucket, so let's use the [Web console](https://console.aws.amazon.com/s3/home?region=us-west-1).
There are some bugs in aws cli in creating S3 bucket, so let's use the [S3 Console](https://console.aws.amazon.com/s3/home?region=us-west-2).
Click on `Create Bucket`, fill in a unique BUCKET_NAME, and make sure region is us-west-1 (Northern California).
Click on `Create Bucket`, fill in a unique BUCKET_NAME, and make sure region is us-west-2 (Oregon).
#### Initialize an asset directory
#### Initialize Assets
Create a directory on your local machine to hold the generated assets:
......@@ -242,10 +291,10 @@ Initialize the cluster CloudFormation stack with the KMS Arn, key pair name, and
kube-aws init \
--cluster-name=MY_CLUSTER_NAME \
--external-dns-name=MY_EXTERNAL_DNS_NAME \
--region=us-west-1 \
--availability-zone=us-west-1a \
--region=us-west-2 \
--availability-zone=us-west-2a \
--key-name=KEY_PAIR_NAME \
--kms-key-arn="arn:aws:kms:us-west-1:xxxxxxxxxx:key/xxxxxxxxxxxxxxxxxxx"
--kms-key-arn="arn:aws:kms:us-west-2:xxxxxxxxxx:key/xxxxxxxxxxxxxxxxxxx"
```
`MY_CLUSTER_NAME`: the one you picked in [KMS key](#kms-key)
......@@ -256,14 +305,15 @@ kube-aws init \
`--kms-key-arn`: the "Arn" in [KMS key](#kms-key)
Here `us-west-1a` is used for parameter `--availability-zone`, but supported availability zone varies among AWS accounts.
Here `us-west-2a` is used for parameter `--availability-zone`, but supported availability zone varies among AWS accounts.
Please check if `us-west-1a` is supported by `aws ec2 --region us-west-1 describe-availability-zones`, if not switch to other supported availability zone. (e.g., `us-west-1a`, or `us-west-1b`)
Please check if `us-west-2a` is supported by `aws ec2 --region us-west-2 describe-availability-zones`, if not switch to other supported availability zone. (e.g., `us-west-2a`, or `us-west-2b`)
Note: please don't use `us-west-1c`. Subnets can currently only be created in the following availability zones: us-west-1b, us-west-1a.
There will now be a cluster.yaml file in the asset directory. This is the main configuration file for your cluster.
By default `kube-aws` will only create one worker node. Let's edit `cluster.yaml` and change `workerCount` from 1 to 3.
#### Render contents of the asset directory
......@@ -278,41 +328,14 @@ The next command generates the default set of cluster assets in your asset direc
```
kube-aws render stack
```
Here's what the directory structure looks like:
```
$ tree
.
├── cluster.yaml
├── credentials
│ ├── admin-key.pem
│ ├── admin.pem
│ ├── apiserver-key.pem
│ ├── apiserver.pem
│ ├── ca-key.pem
│ ├── ca.pem
│ ├── worker-key.pem
│ └── worker.pem
│ ├── etcd-key.pem
│ └── etcd.pem
│ ├── etcd-client-key.pem
│ └── etcd-client.pem
├── kubeconfig
├── stack-template.json
└── userdata
├── cloud-config-controller
└── cloud-config-worker
```
These assets (templates and credentials) are used to create, update and interact with your Kubernetes cluster.
Assets (templates and credentials) that are used to create, update and interact with your Kubernetes cluster will be created under your current folder.
### Kubernetes Cluster Start Up
#### Create the instances defined in the CloudFormation template
Now let's create your cluster (choose any PREFIX for the command below):
Now let's create your cluster (choose any `PREFIX` for the command below):
```
kube-aws up --s3-uri s3://BUCKET_NAME/PREFIX
......@@ -328,239 +351,178 @@ You can invoke `kube-aws status` to get the cluster API endpoint after cluster c
```
$ kube-aws status
Cluster Name: paddle-cluster
Controller DNS Name: paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-1.elb.amazonaws.com
Controller DNS Name: paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-2.elb.amazonaws.com
```
If you own a DNS name, set the A record to any of the above ip. __Or__ you can set up CNAME point to `Controller DNS Name` (`paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-2.elb.amazonaws.com`)
##### Find IP address
Use command `dig` to check the load balancer hostname to get the ip address.
```
$ dig paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-1.elb.amazonaws.com
$ dig paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-2.elb.amazonaws.com
;; QUESTION SECTION:
;paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-1.elb.amazonaws.com. IN A
;paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-2.elb.amazonaws.com. IN A
;; ANSWER SECTION:
paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-1.elb.amazonaws.com. 59 IN A 54.241.164.52
paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-1.elb.amazonaws.com. 59 IN A 54.67.102.112
paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-2.elb.amazonaws.com. 59 IN A 54.241.164.52
paddle-cl-ElbAPISe-EEOI3EZPR86C-531251350.us-west-2.elb.amazonaws.com. 59 IN A 54.67.102.112
```
In the above output, both ip `54.241.164.52`, `54.67.102.112` will work.
If you own a DNS name, set the A record to any of the above ip. Otherwise you can edit `/etc/hosts` to associate ip with the DNS name.
*If you own a DNS name*, set the A record to any of the above ip. Then you can skip to the step "Access the cluster".
*If you do not own a DNS name*:
##### Update local DNS association
Edit `/etc/hosts` to associate above ip with the DNS name.
##### Add Route53 private name service in VPC
- Open [Route53 Console](https://console.aws.amazon.com/route53/home)
- Create hosted zone with following config
- Domain name: "paddle"
- Type: "Private hosted zone for amazon VPC"
- VPC ID: <Your VPC ID>
- Add A record
- Click on the zone "paddle" just created
- Click the button "Create record set"
- Name : leave blank
- type: "A"
- Value: <kube-controller ec2 private ip>
- Verify name service
- Connect to any instance created by kube-aws via ssh
- Run command "host paddle", see if the ip returned is the private ip of kube-controller
#### Access the cluster
Once the API server is running, you should see:
```
$ kubectl --kubeconfig=kubeconfig get nodes
NAME STATUS AGE
ip-10-0-0-xxx.us-west-1.compute.internal Ready 5m
ip-10-0-0-xxx.us-west-1.compute.internal Ready 5m
ip-10-0-0-xx.us-west-1.compute.internal Ready,SchedulingDisabled 5m
$ kubectl --kubeconfig=kubeconfig get nodes
NAME STATUS AGE
ip-10-0-0-134.us-west-2.compute.internal Ready 6m
ip-10-0-0-238.us-west-2.compute.internal Ready 6m
ip-10-0-0-50.us-west-2.compute.internal Ready 6m
ip-10-0-0-55.us-west-2.compute.internal Ready 6m
```
### Setup Elastic File System for Cluster
Training data is usually served on a distributed filesystem, we use Elastic File System (EFS) on AWS. Ceph might be a better solution, but it requires high version of Linux kernel that might not be stable enough at this moment. We haven't automated the EFS setup at this moment, so please do the following steps:
Training data is usually served on a distributed filesystem, we use Elastic File System (EFS) on AWS.
1. Create security group for EFS in [security group console](https://us-west-2.console.aws.amazon.com/ec2/v2/home?region=us-west-2#SecurityGroups:sort=groupId)
1. Look up security group id for `paddle-cluster-sg-worker` (`sg-055ee37d` in the image below)
<center>![](src/worker_security_group.png)</center>
2. Add security group `paddle-efs` with `ALL TCP` inbound rule and custom source as group id of `paddle-cluster-sg-worker`. And VPC of `paddle-cluster-vpc`. Make sure availability zone is same as the one you used in [Initialize Assets](#initialize-assets).
<center>![](src/add_security_group.png)</center>
1. Make sure you added AmazonElasticFileSystemFullAccess policy in your group.
1. Create the Elastic File System in AWS console, and attach the new VPC with it.
2. Create the Elastic File System in [EFS console](https://us-west-2.console.aws.amazon.com/efs/home?region=us-west-2#/wizard/1) with `paddle-cluster-vpc` VPC. Make sure subnet is `paddle-cluster-Subnet0` andd security group is `paddle-efs`.
<center>![](src/create_efs.png)</center>
1. Modify the Kubernetes security group under ec2/Security Groups, add additional inbound policy "All TCP TCP 0 - 65535 0.0.0.0/0" for Kubernetes default VPC security group.
<center>![](src/add_security_group.png)</center>
1. Follow the EC2 mount instruction to mount the disk onto all the Kubernetes nodes, we recommend to mount EFS disk onto ~/efs.
<center>![](src/efs_mount.png)</center>
We will place user config and divided training data onto EFS. Training task will cache related files by copying them from EFS into container. It will also write the training results back onto EFS. We will show you how to place the data later in this article.
### Core Concepts of PaddlePaddle Training on AWS
Now we've already setup a 3 nodes distributed Kubernetes cluster, and on each node we've attached the EFS volume. In this training demo, we will create three Kubernetes pods and schedule them on three nodes. Each pod contains a PaddlePaddle container. When container gets created, it will start parameter server (pserver) and trainer process, load the training data from EFS volume and start the distributed training task.
#### Distributed Training Job
A distributed training job is represented by a [kubernetes job](https://kubernetes.io/docs/user-guide/jobs/#what-is-a-job).
Each Kuberentes job is described by a job config file, which specifies the information like the number of pods in the job and environment variables.
In a distributed training job, we would:
1. upload the partitioned training data and configuration file onto EFS volume, and
1. create and submit the Kubernetes job config to the Kubernetes cluster to start the training job.
#### Parameter Servers and Trainers
There are two roles in a PaddlePaddle cluster: `parameter server` and `trainer`. Each parameter server process maintains a shard of the global model. Each trainer has its local copy of the model, and uses its local data to update the model. During the training process, trainers send model updates to parameter servers, parameter servers are responsible for aggregating these updates, so that trainers can synchronize their local copy with the global model.
<center>![Model is partitioned into two shards. Managed by two parameter servers respectively.](src/pserver_and_trainer.png)</center>
In order to communicate with pserver, trainer needs to know the ip address of each pserver. In kubernetes it's better to use a service discovery mechanism (e.g., DNS hostname) rather than static ip address, since any pserver's pod may be killed and a new pod could be schduled onto another node of different ip address. We will improve paddlepaddle's service discovery ability. For now we will use static ip.
Parameter server and trainer are packaged into a same docker image. They will run once pod is scheduled by kubernetes job.
#### Trainer ID
Each trainer process requires a trainer ID, a zero-based index value, passed in as a command-line parameter. The trainer process thus reads the data partition indexed by this ID.
#### Training
The entry-point of a container is a Python script. As it runs in a pod, it can see some environment variables pre-defined by Kubernetes. This includes one that gives the job's identity, which can be used in a remote call to the Kubernetes apiserver that lists all pods in the job.
We rank each pod by sorting them by their ips. The rank of each pod could be the "pod ID". Because we run one trainer and one parameter server in each pod, we can use this "pod ID" as the trainer ID. A detailed workflow of the entry-point script is as follows:
1. Query the api server to get pod information, and assign the `trainer_id` by sorting the ip.
1. Copy the training data from EFS sharing volume into container.
1. Parse the `paddle pserver` and `paddle trainer` startup parameters from environment variables, and then start up the processes.
1. Trainer with `train_id` 0 will automatically write results onto EFS volume.
### Start PaddlePaddle Training Demo on AWS
Now we'll start a PaddlePaddle training demo on AWS, steps are as follows:
1. Build PaddlePaddle Docker image.
1. Divide the training data file and upload it onto the EFS sharing volume.
1. Create the training job config file, and start up the job.
1. Check the result after training.
#### Build PaddlePaddle Docker Image
PaddlePaddle docker image need to provide the runtime environment for `pserver` and `trainer`, so the container use this image should have two main function:
#### Configure Kubernetes Volume that Points to EFS
1. Copy the training data into container.
1. Generate the startup parameter for `pserver` and `trainer` process, and startup the training.
We need to create a new image since official `paddledev/paddle:cpu-latest` only have PaddlePaddle binary, but lack of the above functionalities.
Dockerfile for creating the new image is as follows:
First we need to create a [PersistentVolume](https://kubernetes.io/docs/user-guide/persistent-volumes/) to provision EFS volumn.
Save following snippet as `pv.yaml`
```
FROM paddledev/paddle:cpu-latest
MAINTAINER zjsxzong89@gmail.com
COPY start.sh /root/
COPY start_paddle.py /root/
CMD ["bash"," -c","/root/start.sh"]
apiVersion: v1
kind: PersistentVolume
metadata:
name: efsvol
spec:
capacity:
storage: 100Gi
accessModes:
- ReadWriteMany
nfs:
server: EFS_DNS_NAME
path: "/"
```
At this point, we will copy our `start.sh` and `start_paddle.py` file into container, and then exec `start_paddle.py` script to start up the training, all the steps like assigning trainer_id, getting other nodes' ip are implemented in `start_paddle.py`.
`start_paddle.py` will start parsing the parameters.
`EFS_DNS_NAME`: DNS name as shown in description of `paddle-efs` that we created. Looks similar to `fs-2cbf7385.efs.us-west-2.amazonaws.com`
Run following command to create a persistent volumn:
```
parser = argparse.ArgumentParser(prog="start_paddle.py",
description='simple tool for k8s')
args, train_args_list = parser.parse_known_args()
train_args = refine_unknown_args(train_args_list)
train_args_dict = dict(zip(train_args[:-1:2], train_args[1::2]))
podlist = getPodList()
kubectl --kubeconfig=kubeconfig create -f pv.yaml
```
And then using function `getPodList()` to query all the pod information from the job name through Kubernetes api server. When all the pods are in the running status, using `getIdMap(podlist)` to get the trainer_id.
Next let's create a [PersistentVolumeClaim](https://kubernetes.io/docs/user-guide/persistent-volumes/) to claim the persistent volume.
Save following snippet as `pvc.yaml`.
```
podlist = getPodList()
# need to wait until all pods are running
while not isPodAllRunning(podlist):
time.sleep(10)
podlist = getPodList()
idMap = getIdMap(podlist)
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: efsvol
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 50Gi
```
In function `getIdMap(podlist)`, we use podlist to get the ip address for each pod and sort them, use the index as the trainer_id.
Run following command to create a persistent volumn claim:
```
def getIdMap(podlist):
'''
generate tainer_id by ip
'''
ips = []
for pod in podlist["items"]:
ips.append(pod["status"]["podIP"])
ips.sort()
idMap = {}
for i in range(len(ips)):
idMap[ips[i]] = i
return idMap
kubectl --kubeconfig=kubeconfig create -f pvc.yaml
```
After getting `idMap`, we use function `startPaddle(idMap, train_args_dict)` to generate `paddle pserver` and `paddle train` start up parameters and then start up the processes.
In function `startPaddle`, the most important work is to generate `paddle pserver` and `paddle train` start up parameters. For example, `paddle train` parameter parsing, we will get parameters like `PADDLE_NIC`, `PADDLE_PORT`, `PADDLE_PORTS_NUM`, and get the `trainer_id` from `idMap`.
```
program = 'paddle train'
args = " --nics=" + PADDLE_NIC
args += " --port=" + str(PADDLE_PORT)
args += " --ports_num=" + str(PADDLE_PORTS_NUM)
args += " --comment=" + "paddle_process_by_paddle"
ip_string = ""
for ip in idMap.keys():
ip_string += (ip + ",")
ip_string = ip_string.rstrip(",")
args += " --pservers=" + ip_string
args_ext = ""
for key, value in train_args_dict.items():
args_ext += (' --' + key + '=' + value)
localIP = socket.gethostbyname(socket.gethostname())
trainerId = idMap[localIP]
args += " " + args_ext + " --trainer_id=" + \
str(trainerId) + " --save_dir=" + JOB_PATH_OUTPUT
```
#### Prepare Training Data
Use `docker build` to build toe Docker Image:
We will now launch a kubernetes job that downloads, saves and evenly splits training data into 3 shards on the persistent volumn that we just created.
save following snippet as `paddle-data-job.yaml`
```
docker build -t your_repo/paddle:mypaddle .
apiVersion: batch/v1
kind: Job
metadata:
name: paddle-data
spec:
template:
metadata:
name: pi
spec:
containers:
- name: paddle-data
image: paddledev/paddle-tutorial:k8s_data
imagePullPolicy: Always
volumeMounts:
- mountPath: "/efs"
name: efs
env:
- name: OUT_DIR
value: /efs/paddle-cluster-job
- name: SPLIT_COUNT
value: "3"
volumes:
- name: efs
persistentVolumeClaim:
claimName: efsvol
restartPolicy: Never
```
And then push the built image onto docker registry.
Run following command to launch the job:
```
docker push your_repo/paddle:mypaddle
kubectl --kubeconfig=kubeconfig create -f paddle-data-job.yaml
```
#### Upload Training Data File
Here we will use PaddlePaddle's official recommendation demo as the content for this training, we put the training data file into a directory named by job name, which located in EFS sharing volume, the tree structure for the directory looks like:
Job may take 7 min to finish, use following command to check job status. Do not proceed until `SUCCESSFUL` for `paddle-data` job is `1`
```
efs
└── paddle-cluster-job
├── data
│ ├── 0
│ │
│ ├── 1
│ │
│ └── 2
├── output
└── recommendation
$ kubectl --kubeconfig=kubeconfig get jobs
NAME DESIRED SUCCESSFUL AGE
paddle-data 1 1 6m
```
The `paddle-cluster-job` directory is the job name for this training, this training includes 3 PaddlePaddle node, we store the partitioned data under `paddle-cluster-job/data` directory, directory 0, 1, 2 each represent 3 nodes' trainer_id. the training data in in recommendation directory, the training results and logs will be in the output directory.
#### Create Kubernetes Job
Kubernetes use yaml file to describe job details, and then use command line tool to create the job in Kubernetes cluster.
In yaml file, we describe the Docker image we use for this training, the node number we need to startup, the volume mounting information and all the necessary parameters we need for `paddle pserver` and `paddle train` processes.
Data preparation is done by docker image `paddledev/paddle-tutorial:k8s_data`, see [here](src/k8s_data/README.md) for how to build this docker image and source code.
The yaml file content is as follows:
#### Start Training
Now we are ready to start paddle training job. Save following snippet as `paddle-cluster-job.yaml`
```
apiVersion: batch/v1
kind: Job
......@@ -574,12 +536,12 @@ spec:
name: paddle-cluster-job
spec:
volumes:
- name: jobpath
hostPath:
path: /home/admin/efs
- name: efs
persistentVolumeClaim:
claimName: efsvol
containers:
- name: trainer
image: drinkcode/paddle:k8s-job
image: paddledev/paddle-tutorial:k8s_train
command: ["bin/bash", "-c", "/root/start.sh"]
env:
- name: JOB_NAME
......@@ -589,7 +551,7 @@ spec:
- name: JOB_NAMESPACE
value: default
- name: TRAIN_CONFIG_DIR
value: recommendation
value: quick_start
- name: CONF_PADDLE_NIC
value: eth0
- name: CONF_PADDLE_PORT
......@@ -600,106 +562,124 @@ spec:
value: "2"
- name: CONF_PADDLE_GRADIENT_NUM
value: "3"
- name: TRAINER_COUNT
value: "3"
volumeMounts:
- name: jobpath
mountPath: /home/jobpath
- mountPath: "/home/jobpath"
name: efs
ports:
- name: jobport
hostPort: 30001
containerPort: 30001
- name: jobport0
hostPort: 7164
containerPort: 7164
- name: jobport1
hostPort: 7165
containerPort: 7165
- name: jobport2
hostPort: 7166
containerPort: 7166
- name: jobport3
hostPort: 7167
containerPort: 7167
restartPolicy: Never
```
In yaml file, the metadata's name is the job's name. `parallelism, completions` means this job will simultaneously start up 3 PaddlePaddle nodes, and this job will be finished when there are 3 finished pods. For the data store volume, we declare the path jobpath, it mount the /home/admin/efs on host machine into the container with path /home/jobpath. So in container, the /home/jobpath actually stores the data onto EFS sharing volume.
`env` field represents container's environment variables, we pass the PaddlePaddle parameters into containers by using the `env` field.
`parallelism: 3, completions: 3` means this job will simultaneously start 3 PaddlePaddle pods, and this job will be finished when there are 3 finished pods.
`JOB_PATH` represents the sharing volume path, `JOB_NAME` represents job name, `TRAIN_CONFIG_DIR` represents the training data file directory, we can these three parameters to get the file path for this training.
`env` field represents container's environment variables, we specify PaddlePaddle parameters by environment variables.
`CONF_PADDLE_NIC` represents `paddle pserver` process's `--nics` parameters, the NIC name.
`ports` indicates that TCP port 7164 - 7167 are exposed for communication between `pserver` ans trainer. port starts continously from `CONF_PADDLE_PORT` (7164) to `CONF_PADDLE_PORT + CONF_PADDLE_PORTS_NUM + CONF_PADDLE_PORTS_NUM_SPARSE - 1` (7167). We use multiple ports for dense and sparse paramter updates to improve latency.
`CONF_PADDLE_PORT` represents `paddle pserver` process's `--port` parameters, `CONF_PADDLE_PORTS_NUM` represents `--port_num` parameter.
`CONF_PADDLE_PORTS_NUM_SPARSE` represents the sparse updated port number, `--ports_num_for_sparse` parameter.
Run following command to launch the job.
```
kubectl --kubeconfig=kubeconfig create -f paddle-claster-job.yaml
```
`CONF_PADDLE_GRADIENT_NUM` represents the training node number, `--num_gradient_servers` parameter.
Inspect individual pods
After we create the yaml file, we can use Kubernetes command line tool to create the job onto the cluster.
```
$ kubectl --kubeconfig=kubeconfig get pods
NAME READY STATUS RESTARTS AGE
paddle-cluster-job-cm469 1/1 Running 0 9m
paddle-cluster-job-fnt03 1/1 Running 0 9m
paddle-cluster-job-jx4xr 1/1 Running 0 9m
```
Inspect individual console output
```
kubectl create -f job.yaml
kubectl --kubeconfig=kubeconfig log -f POD_NAME
```
After we execute the above command, Kubernetes will create 3 pods and then pull the PaddlePaddle image, then start up the containers for training.
`POD_NAME`: name of any pod (e.g., `paddle-cluster-job-cm469`).
Run `kubectl --kubeconfig=kubeconfig describe job paddle-cluster-job` to check training job status. It will complete in around 20 minutes.
The details for start `pserver` and `trainer` are hidden inside docker image `paddledev/paddle-tutorial:k8s_train`, see [here](src/k8s_train/README.md) for how to build the docker image and source code.
#### Check Training Results
#### Inspect Training Output
During the training, we can see the logs and models on EFS sharing volume, the output directory contains the training results. (Caution: node_0, node_1, node_2 directories represents PaddlePaddle node and train_id, not the Kubernetes node)
Training output (model snapshot and logs) will be saved in EFS. We can ssh into worker EC2 instance, mount EFS and check training output.
1. ssh Into Worker EC2 instance
```
[root@paddle-kubernetes-node0 output]# tree -d
.
├── node_0
│ ├── server.log
│ └── train.log
├── node_1
│ ├── server.log
│ └── train.log
├── node_2
......
├── pass-00002
│ ├── done
│ ├── ___embedding_0__.w0
│ ├── ___embedding_1__.w0
......
chmod 400 key-name.pem
ssh -i key-name.pem core@INSTANCE_IP
```
We can always check the container training status through logs, for example:
`INSTANCE_IP`: public IP address of EC2 kubernetes worker node. Go to [EC2 console](https://us-west-2.console.aws.amazon.com/ec2/v2/home?region=us-west-2#Instances:sort=instanceId) and check `public IP` of any `paddle-cluster-kube-aws-worker` instance.
2. Mount EFS
```
[root@paddle-kubernetes-node0 node_0]# cat train.log
I1116 09:10:17.123121 50 Util.cpp:155] commandline:
/usr/local/bin/../opt/paddle/bin/paddle_trainer
--nics=eth0 --port=7164
--ports_num=2 --comment=paddle_process_by_paddle
--pservers=192.168.129.66,192.168.223.143,192.168.129.71
--ports_num_for_sparse=2 --config=./trainer_config.py
--trainer_count=4 --num_passes=10 --use_gpu=0
--log_period=50 --dot_period=10 --saving_period=1
--local=0 --trainer_id=0
--save_dir=/home/jobpath/paddle-cluster-job/output
I1116 09:10:17.123440 50 Util.cpp:130] Calling runInitFunctions
I1116 09:10:17.123764 50 Util.cpp:143] Call runInitFunctions done.
[WARNING 2016-11-16 09:10:17,227 default_decorators.py:40] please use keyword arguments in paddle config.
[INFO 2016-11-16 09:10:17,239 networks.py:1282] The input order is [movie_id, title, genres, user_id, gender, age, occupation, rating]
[INFO 2016-11-16 09:10:17,239 networks.py:1289] The output order is [__regression_cost_0__]
I1116 09:10:17.392917 50 Trainer.cpp:170] trainer mode: Normal
I1116 09:10:17.613910 50 PyDataProvider2.cpp:257] loading dataprovider dataprovider::process
I1116 09:10:17.680917 50 PyDataProvider2.cpp:257] loading dataprovider dataprovider::process
I1116 09:10:17.681543 50 GradientMachine.cpp:134] Initing parameters..
I1116 09:10:18.012390 50 GradientMachine.cpp:141] Init parameters done.
I1116 09:10:18.018641 50 ParameterClient2.cpp:122] pserver 0 192.168.129.66:7164
I1116 09:10:18.018950 50 ParameterClient2.cpp:122] pserver 1 192.168.129.66:7165
I1116 09:10:18.019069 50 ParameterClient2.cpp:122] pserver 2 192.168.223.143:7164
I1116 09:10:18.019492 50 ParameterClient2.cpp:122] pserver 3 192.168.223.143:7165
I1116 09:10:18.019716 50 ParameterClient2.cpp:122] pserver 4 192.168.129.71:7164
I1116 09:10:18.019836 50 ParameterClient2.cpp:122] pserver 5 192.168.129.71:7165
mkdir efs
sudo mount -t nfs4 -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2 EFS_DNS_NAME:/ efs
```
It'll take around 8 hours to finish this PaddlePaddle recommendation training demo on three 2 core 8 GB EC2 machine (m3.large).
`EFS_DNS_NAME`: DNS name as shown in description of `paddle-efs` that we created. Look similar to `fs-2cbf7385.efs.us-west-2.amazonaws.com`.
Now folder `efs` will have structure similar to:
```
-- paddle-cluster-job
|-- ...
|-- output
| |-- node_0
| | |-- server.log
| | `-- train.log
| |-- node_1
| | |-- server.log
| | `-- train.log
| |-- node_2
| | |-- server.log
| | `-- train.log
| |-- pass-00000
| | |-- ___fc_layer_0__.w0
| | |-- ___fc_layer_0__.wbias
| | |-- done
| | |-- path.txt
| | `-- trainer_config.lr.py
| |-- pass-00001...
```
`server.log` contains log for `pserver`. `train.log` contains log for `trainer`. Model description and snapshot is stored in `pass-0000*`.
### Kubernetes Cluster Tear Down
#### Delete EFS
Go to [EFS Console](https://us-west-2.console.aws.amazon.com/efs/home?region=us-west-2) and delete the EFS volumn that we created.
#### Delete security group
Go to [Security Group Console](https://us-west-2.console.aws.amazon.com/ec2/v2/home?region=us-west-2#SecurityGroups:sort=groupId) and delete security group `paddle-efs`.
If you want to tear down the whole Kubernetes cluster, make sure to *delete* the EFS volume first (otherwise, you will get stucked on following steps), and then use the following command:
#### Delete S3 Bucket
Go to [S3 Console](https://console.aws.amazon.com/s3/home?region=us-west-2#) and delete the S3 bucket that we created.
#### Destroy Cluster
```
kube-aws destroy
```
It's an async call, it might take 5 min to tear down the whole cluster.
If you created any Kubernetes Services of type LoadBalancer, you must delete these first, as the CloudFormation cannot be fully destroyed if any externally-managed resources still exist.
The command will return immediately, but it might take 5 min to tear down the whole cluster.
You can go to [CludFormation Console](https://us-west-2.console.aws.amazon.com/cloudformation/home?region=us-west-2#/stacks?filter=active) to check destroy process.
doc/howto/usage/k8s/src/create_efs.png

244.5 KB | W: | H:

doc/howto/usage/k8s/src/create_efs.png

236.1 KB | W: | H:

doc/howto/usage/k8s/src/create_efs.png
doc/howto/usage/k8s/src/create_efs.png
doc/howto/usage/k8s/src/create_efs.png
doc/howto/usage/k8s/src/create_efs.png
  • 2-up
  • Swipe
  • Onion skin
apiVersion: batch/v1
kind: Job
metadata:
name: paddle-cluster-job
spec:
parallelism: 3
completions: 3
template:
metadata:
name: paddle-cluster-job
spec:
volumes:
- name: jobpath
hostPath:
path: /home/work/paddle_output
containers:
- name: trainer
image: registry.baidu.com/public/paddle:mypaddle
command: ["bin/bash", "-c", "/root/start.sh"]
env:
- name: JOB_NAME
value: paddle-cluster-job
- name: JOB_PATH
value: /home/jobpath
- name: JOB_NAMESPACE
value: default
- name: TRAIN_CONFIG_DIR
value: recommendation
- name: CONF_PADDLE_NIC
value: eth0
- name: CONF_PADDLE_PORT
value: "7164"
- name: CONF_PADDLE_PORTS_NUM
value: "2"
- name: CONF_PADDLE_PORTS_NUM_SPARSE
value: "2"
- name: CONF_PADDLE_GRADIENT_NUM
value: "3"
volumeMounts:
- name: jobpath
mountPath: /home/jobpath
restartPolicy: Never
FROM alpine
RUN apk update && apk upgrade && apk add coreutils
ADD quick_start /quick_start
ADD get_data.sh /bin/
RUN chmod +x /bin/get_data.sh
ENTRYPOINT ["/bin/get_data.sh"]
To build PaddlePaddle data preparation image in tutorial [Distributed PaddlePaddle Training on AWS with Kubernetes](../../k8s_aws_en.md), run following commands:
```
cp -r ../../../../../../demo/quick_start .
docker build . -t prepare-data-image-name
```
#!/bin/sh
out_dir=$OUT_DIR
split_count=$SPLIT_COUNT
set -e
mkdir -p $out_dir
cp -r /quick_start $out_dir/
mkdir -p $out_dir/0/data
cd $out_dir/0/data
wget http://paddlepaddle.bj.bcebos.com/demo/quick_start_preprocessed_data/preprocessed_data.tar.gz
tar zxvf preprocessed_data.tar.gz
rm preprocessed_data.tar.gz
split -d --number=l/$split_count -a 5 train.txt train.
mv train.00000 train.txt
cd $out_dir
end=$(expr $split_count - 1)
for i in $(seq 1 $end); do
mkdir -p $i/data
cp -r 0/data/* $i/data
mv $i/data/train.`printf %05d $i` $i/data/train.txt
done;
FROM paddledev/paddle:cpu-latest
COPY start.sh /root/
COPY start_paddle.py /root/
RUN chmod +x /root/start.sh
CMD ["bash"," -c","/root/start.sh"]
To build PaddlePaddle training image in tutorial [Distributed PaddlePaddle Training on AWS with Kubernetes](../../k8s_aws_en.md), run following command:
```
docker build . -t train-image-name
```
#!/bin/sh
set -eu
jobconfig=${JOB_PATH}"/"${JOB_NAME}"/"${TRAIN_CONFIG_DIR}
cd /root
cp -rf $jobconfig .
cd $TRAIN_CONFIG_DIR
cp -rf $jobconfig/* .
python /root/start_paddle.py \
--dot_period=10 \
--ports_num_for_sparse=$CONF_PADDLE_PORTS_NUM \
--ports_num=$CONF_PADDLE_PORTS_NUM \
--ports_num_for_sparse=$CONF_PADDLE_PORTS_NUM_SPARSE \
--log_period=50 \
--num_passes=10 \
--trainer_count=4 \
--trainer_count=$TRAINER_COUNT \
--saving_period=1 \
--local=0 \
--config=./trainer_config.py \
--config=trainer_config.lr.py \
--use_gpu=0
......@@ -23,7 +23,6 @@ import argparse
API = "/api/v1/namespaces/"
JOBSELECTOR = "labelSelector=job-name="
JOB_PATH = os.getenv("JOB_PATH") + "/" + os.getenv("JOB_NAME")
JOB_PATH_DATA = JOB_PATH + "/data"
JOB_PATH_OUTPUT = JOB_PATH + "/output"
JOBNAME = os.getenv("JOB_NAME")
NAMESPACE = os.getenv("JOB_NAMESPACE")
......@@ -33,6 +32,8 @@ PADDLE_PORTS_NUM = os.getenv("CONF_PADDLE_PORTS_NUM")
PADDLE_PORTS_NUM_SPARSE = os.getenv("CONF_PADDLE_PORTS_NUM_SPARSE")
PADDLE_SERVER_NUM = os.getenv("CONF_PADDLE_GRADIENT_NUM")
tokenpath = '/var/run/secrets/kubernetes.io/serviceaccount/token'
def refine_unknown_args(cmd_args):
'''
......@@ -64,6 +65,7 @@ def isPodAllRunning(podlist):
for pod in podlist["items"]:
if pod["status"]["phase"] == "Running":
running += 1
print "waiting for pods running, require:", require, "running:", running
if require == running:
return True
return False
......@@ -79,8 +81,17 @@ def getPodList():
pod = API + NAMESPACE + "/pods?"
job = JOBNAME
return requests.get(apiserver + pod + JOBSELECTOR + job,
verify=False).json()
if os.path.isfile(tokenpath):
tokenfile = open(tokenpath, mode='r')
token = tokenfile.read()
Bearer = "Bearer " + token
headers = {"Authorization": Bearer}
return requests.get(apiserver + pod + JOBSELECTOR + job,
headers=headers,
verify=False).json()
else:
return requests.get(apiserver + pod + JOBSELECTOR + job,
verify=False).json()
def getIdMap(podlist):
......@@ -122,8 +133,8 @@ def startPaddle(idMap={}, train_args_dict=None):
if not os.path.exists(JOB_PATH_OUTPUT):
os.makedirs(JOB_PATH_OUTPUT)
os.mkdir(logDir)
copyCommand = 'cp -rf ' + JOB_PATH_DATA + \
"/" + str(trainerId) + " ./data"
copyCommand = 'cp -rf ' + JOB_PATH + \
"/" + str(trainerId) + "/data/*" + " ./data/"
os.system(copyCommand)
startPserver = 'nohup paddle pserver' + \
" --port=" + str(PADDLE_PORT) + \
......@@ -136,9 +147,9 @@ def startPaddle(idMap={}, train_args_dict=None):
print startPserver
os.system(startPserver)
# wait until pservers completely start
time.sleep(10)
startTrainer = program + args + " > " + \
logDir + "/train.log 2>&1 < /dev/null"
time.sleep(20)
startTrainer = program + args + " 2>&1 | tee " + \
logDir + "/train.log"
print startTrainer
os.system(startTrainer)
......@@ -152,7 +163,7 @@ if __name__ == '__main__':
podlist = getPodList()
# need to wait until all pods are running
while not isPodAllRunning(podlist):
time.sleep(10)
time.sleep(20)
podlist = getPodList()
idMap = getIdMap(podlist)
startPaddle(idMap, train_args_dict)
......@@ -32,14 +32,20 @@ const SparseMatrixArg& BufferArg::sparse() const {
SparseMatrixArg::SparseMatrixArg(const CpuSparseMatrix& sparse, ArgType argType)
: BufferArg(sparse, argType),
row_(reinterpret_cast<void*>(sparse.getRows()), VALUE_TYPE_INT32),
col_(reinterpret_cast<void*>(sparse.getCols()), VALUE_TYPE_INT32) {
col_(reinterpret_cast<void*>(sparse.getCols()), VALUE_TYPE_INT32),
nnz_(sparse.getElementCnt()),
format_(static_cast<SparseDataFormat>(sparse.getFormat())),
type_(static_cast<SparseDataType>(sparse.getValueType())) {
bufferType_ = TENSOR_SPARSE;
}
SparseMatrixArg::SparseMatrixArg(const GpuSparseMatrix& sparse, ArgType argType)
: BufferArg(sparse, argType),
row_(reinterpret_cast<void*>(sparse.getRows()), VALUE_TYPE_INT32),
col_(reinterpret_cast<void*>(sparse.getCols()), VALUE_TYPE_INT32) {
col_(reinterpret_cast<void*>(sparse.getCols()), VALUE_TYPE_INT32),
nnz_(sparse.getElementCnt()),
format_(static_cast<SparseDataFormat>(sparse.getFormat())),
type_(static_cast<SparseDataType>(sparse.getValueType())) {
bufferType_ = TENSOR_SPARSE;
}
......
......@@ -30,13 +30,6 @@ enum BufferType {
TENSOR_SPARSE = 4
};
enum SparseDataType {
SPARSE_NO_VALUE = 0, // do not need value pointer, all values are 1
SPARSE_FLOAT_VALUE = 1
};
enum SparseDataFormat { SPARSE_CSR_FORMAT = 0, SPARSE_CSC_FORMAT = 1 };
class BufferArg;
class SequenceArg;
class SparseMatrixArg;
......@@ -79,19 +72,21 @@ public:
BufferArg(ValueType valueType,
const TensorShape& shape,
ArgType argType = UNSPECIFIED)
: buf_(nullptr),
valueType_(valueType),
shape_(shape),
argType_(argType) {}
: buf_(nullptr), valueType_(valueType), shape_(shape), argType_(argType) {
bufferType_ = TENSOR_NORMAL;
}
BufferArg(void* buf,
ValueType valueType,
const TensorShape& shape,
ArgType argType = UNSPECIFIED)
: buf_(buf), valueType_(valueType), shape_(shape), argType_(argType) {}
: buf_(buf), valueType_(valueType), shape_(shape), argType_(argType) {
bufferType_ = TENSOR_NORMAL;
}
BufferArg(void* buf, ValueType valueType)
: buf_(buf), valueType_(valueType) {}
BufferArg(void* buf, ValueType valueType) : buf_(buf), valueType_(valueType) {
bufferType_ = TENSOR_NORMAL;
}
BufferArg(const Matrix& matrix, ArgType argType = UNSPECIFIED)
: buf_(
......@@ -167,8 +162,9 @@ public:
ValueType valueType() const { return valueType_; }
BufferType bufferType() const { return bufferType_; }
const TensorShape& shape() const { return shape_; }
bool isSparse() const { return (TENSOR_SPARSE == bufferType_); }
bool isSparseArg() const { return TENSOR_SPARSE == bufferType_; }
bool isSequenceArg() const { return TENSOR_SEQUENCE_DATA == bufferType_; }
virtual size_t numElements() const { return shape_.getElements(); }
const SequenceArg& sequence() const;
const SparseMatrixArg& sparse() const;
......@@ -179,6 +175,7 @@ protected:
TensorShape shape_;
BufferType bufferType_{TENSOR_UNKNOWN};
ArgType argType_{UNSPECIFIED};
// TODO(tianbing), add deviceType_
// leading dimensions. The size is dims_.size()
// Dims lds_;
};
......@@ -191,6 +188,7 @@ class SequenceIdArg : public BufferArg {
public:
SequenceIdArg(const TensorShape& shape, ArgType argType = UNSPECIFIED)
: BufferArg(VALUE_TYPE_INT32, shape, argType) {
bufferType_ = TENSOR_SEQUENCE_ID;
CHECK_EQ(shape_.ndims(), (size_t)1);
CHECK_GT(shape_[0], 1);
numSeqs_ = shape_[0] - 1;
......@@ -228,7 +226,9 @@ public:
SequenceArg(ValueType valueType,
const TensorShape& shape,
ArgType argType = UNSPECIFIED)
: BufferArg(valueType, shape, argType), startPositions_(TensorShape()) {}
: BufferArg(valueType, shape, argType), startPositions_(TensorShape()) {
bufferType_ = TENSOR_SEQUENCE_DATA;
}
SequenceArg(void* buf,
ValueType valueType,
......@@ -269,31 +269,75 @@ public:
const BufferArg& row,
const BufferArg& col,
size_t nnz,
SparseDataFormat format,
SparseDataType type,
SparseFormat format,
SparseValueType type,
ArgType argType = UNSPECIFIED)
: BufferArg(buf, valueType, shape, argType),
row_(row),
col_(col),
nnz_(nnz),
format_(format),
type_(type) {
format_(static_cast<SparseDataFormat>(format)),
type_(static_cast<SparseDataType>(type)) {
bufferType_ = TENSOR_SPARSE;
CHECK((valueType == VALUE_TYPE_FLOAT) || (valueType == VALUE_TYPE_DOUBLE));
CHECK_EQ(shape_.ndims(), (size_t)2);
CHECK_EQ(row_.shape().ndims(), (size_t)1);
CHECK_EQ(col_.shape().ndims(), (size_t)1);
if (format == SPARSE_CSR_FORMAT) {
if (format_ == T_SPARSE_CSR) {
CHECK_EQ(nnz, col.shape()[0]);
} else if (format == SPARSE_CSC_FORMAT) {
} else if (format_ == T_SPARSE_CSC) {
CHECK_EQ(nnz, row.shape()[0]);
}
}
SparseMatrixArg(ValueType valueType,
const TensorShape& shape,
size_t nnz,
SparseFormat format,
SparseValueType type,
ArgType argType = UNSPECIFIED)
: BufferArg(valueType, shape, argType),
row_(BufferArg(nullptr, VALUE_TYPE_INT32)),
col_(BufferArg(nullptr, VALUE_TYPE_INT32)),
nnz_(nnz),
format_(static_cast<SparseDataFormat>(format)),
type_(static_cast<SparseDataType>(type)) {
bufferType_ = TENSOR_SPARSE;
CHECK((valueType == VALUE_TYPE_FLOAT) || (valueType == VALUE_TYPE_DOUBLE));
CHECK_EQ(shape_.ndims(), (size_t)2);
/// len of row_ : height + 1 (CSR) or nnz (CSC), buf_ == nullptr
row_ = (format_ == T_SPARSE_CSR
? BufferArg(VALUE_TYPE_INT32, TensorShape{shape_[0] + 1})
: BufferArg(VALUE_TYPE_INT32, TensorShape{nnz}));
/// len of col_ : width + 1 (CSC) or nnz (CSR), buf_ == nullptr
col_ = (format_ == T_SPARSE_CSR
? BufferArg(VALUE_TYPE_INT32, TensorShape{nnz})
: BufferArg(VALUE_TYPE_INT32, TensorShape{shape_[1] + 1}));
}
SparseMatrixArg(const CpuSparseMatrix& sparse, ArgType argType = UNSPECIFIED);
SparseMatrixArg(const GpuSparseMatrix& sparse, ArgType argType = UNSPECIFIED);
template <DeviceType DType>
typename Tensor<real, DType>::SparseMatrix SparseMatrix() const {
CHECK(buf_);
CHECK(valueType_ == DataType<real>::value);
// CHECK(deviceType_ == DType);
CHECK_EQ(2, shape_.ndims());
return typename Tensor<real, DType>::SparseMatrix(
reinterpret_cast<real*>(buf_),
reinterpret_cast<int*>(row_.data()),
reinterpret_cast<int*>(col_.data()),
shape_[0],
shape_[1],
nnz_,
static_cast<SparseValueType>(type_),
static_cast<SparseFormat>(format_),
false);
}
~SparseMatrixArg() {}
void* getRowBuf() const { return row_.data(); }
......@@ -302,6 +346,8 @@ public:
size_t nnz() const { return nnz_; }
size_t numElements() const override { return nnz_; }
SparseDataFormat dataFormat() const { return format_; }
SparseDataType dataType() const { return type_; }
......
......@@ -26,6 +26,7 @@ if(WITH_TESTING)
add_simple_unittest(FunctionTest)
add_simple_unittest(ContextProjectionOpTest)
add_simple_unittest(PadOpTest)
add_simple_unittest(MulOpTest)
endif()
endif()
......
......@@ -162,38 +162,64 @@ template <DeviceType Device>
class CrossMapNormalFunc : public FunctionBase {
public:
void init(const FuncConfig& config) override {
// function arguments
size_ = config.get<size_t>("size");
scale_ = config.get<real>("scale");
pow_ = config.get<real>("pow");
// number of inputs and outputs
numInputs_ = 1;
numOutputs_ = 2;
}
void calc(const BufferArgs& inputs, const BufferArgs& outputs) override {
CHECK_EQ((size_t)1, inputs.size());
CHECK_EQ((size_t)2, outputs.size());
CHECK_EQ(inputs[0].shape().ndims(), (size_t)4);
CHECK(inputs[0].shape() == outputs[0].shape());
CHECK(inputs[0].shape() == outputs[1].shape());
check(inputs, outputs);
// ArgType check still on here,
// not sure whether it is better to put inside the check.
CHECK_EQ(outputs[0].getArgType(), ASSIGN_TO);
CHECK_EQ(outputs[1].getArgType(), ASSIGN_TO);
size_t samples = inputs[0].shape()[0];
size_t channels = inputs[0].shape()[1];
size_t height = inputs[0].shape()[2];
size_t width = inputs[0].shape()[3];
size_t batchSize = inputs[0].shape()[0];
size_t maps = inputs[0].shape()[1];
size_t rows = inputs[0].shape()[2];
size_t columns = inputs[0].shape()[3];
CrossMapNormal<Device>(outputs[0].data<real>(),
outputs[1].data<real>(),
inputs[0].data<real>(),
samples,
channels,
height,
width,
batchSize,
maps,
rows,
columns,
size_,
scale_,
pow_);
}
void check(const BufferArgs& inputs, const BufferArgs& outputs) override {
CHECK_EQ(numInputs_, inputs.size());
CHECK_EQ(numOutputs_, outputs.size());
CHECK_EQ(inputs[0].shape().ndims(), (size_t)4);
CHECK(inputs[0].shape() == outputs[0].shape());
CHECK(inputs[0].shape() == outputs[1].shape());
}
// Only need the shape of the input, can calculate the
// floating-point operation.
size_t ops(const BufferArgs& inputs, const BufferArgs& outputs) override {
CHECK_EQ((size_t)numInputs_, inputs.size());
size_t batchSize = inputs[0].shape()[0];
size_t maps = inputs[0].shape()[1];
size_t rows = inputs[0].shape()[2];
size_t columns = inputs[0].shape()[3];
// number of floating-point operations
// an approximate value
size_t ops = batchSize * maps * rows * columns * (size_ * 2 + 3);
return ops;
}
private:
size_t size_;
real scale_;
......@@ -236,21 +262,18 @@ template <DeviceType Device>
class CrossMapNormalGradFunc : public FunctionBase {
public:
void init(const FuncConfig& config) override {
// function arguments
size_ = config.get<size_t>("size");
scale_ = config.get<real>("scale");
pow_ = config.get<real>("pow");
// number of inputs and outputs
numInputs_ = 4;
numOutputs_ = 1;
}
void calc(const BufferArgs& inputs, const BufferArgs& outputs) override {
CHECK_EQ((size_t)4, inputs.size());
CHECK_EQ((size_t)1, outputs.size());
CHECK_EQ(inputs[0].shape().ndims(), (size_t)4);
CHECK(inputs[0].shape() == inputs[1].shape());
CHECK(inputs[0].shape() == inputs[2].shape());
CHECK(inputs[0].shape() == inputs[3].shape());
CHECK(inputs[0].shape() == outputs[0].shape());
check(inputs, outputs);
if (outputs[0].getArgType() != ADD_TO) {
// Currently, some algorithm implementations are ASSIGN_TO mode,
// if need to support the ADD_TO calculation, need to clear the output.
......@@ -259,25 +282,52 @@ public:
tmp.zero();
}
size_t samples = inputs[0].shape()[0];
size_t channels = inputs[0].shape()[1];
size_t height = inputs[0].shape()[2];
size_t width = inputs[0].shape()[3];
size_t batchSize = inputs[0].shape()[0];
size_t maps = inputs[0].shape()[1];
size_t rows = inputs[0].shape()[2];
size_t columns = inputs[0].shape()[3];
CrossMapNormalGrad<Device>(outputs[0].data<real>(),
inputs[0].data<real>(),
inputs[1].data<real>(),
inputs[2].data<real>(),
inputs[3].data<real>(),
samples,
channels,
height,
width,
batchSize,
maps,
rows,
columns,
size_,
scale_,
pow_);
}
void check(const BufferArgs& inputs, const BufferArgs& outputs) override {
CHECK_EQ(numInputs_, inputs.size());
CHECK_EQ(numOutputs_, outputs.size());
CHECK_EQ(inputs[0].shape().ndims(), (size_t)4);
CHECK(inputs[0].shape() == inputs[1].shape());
CHECK(inputs[0].shape() == inputs[2].shape());
CHECK(inputs[0].shape() == inputs[3].shape());
CHECK(inputs[0].shape() == outputs[0].shape());
}
// Only need the shape of one input, can calculate the
// floating-point operation.
size_t ops(const BufferArgs& inputs, const BufferArgs& outputs) override {
CHECK_LT((size_t)1, inputs.size());
size_t batchSize = inputs[0].shape()[0];
size_t maps = inputs[0].shape()[1];
size_t rows = inputs[0].shape()[2];
size_t columns = inputs[0].shape()[3];
// number of floating-point operations
// an approximate value
size_t ops = batchSize * maps * rows * columns * (size_ * 4 + 2);
return ops;
}
private:
size_t size_;
real scale_;
......
......@@ -153,7 +153,36 @@ public:
virtual void calc(const BufferArgs& inputs, const BufferArgs& outputs) {}
// This member function is used to check whether the BufferType and shape of
// the inputs and outputs arguments of the Function are correct.
// General calc function which will call this check to do arguments check.
// And before the calc called, the caller can also check their own arguments.
virtual void check(const BufferArgs& inputs, const BufferArgs& outputs) {}
// Calculate the number of floating-point operations of this Function.
// The inputs and outputs arguments do not need to contain the actual data,
// only the shape.
// And some Functions have the same input and output shapes,
// so you may not need to enter the complete number of arguments.
// But entering the full arguments is always correct for this interface.
virtual size_t ops(const BufferArgs& inputs, const BufferArgs& outputs) {
return 0;
}
int getNumInputs() const { return numInputs_; }
int getNumOutputs() const { return numOutputs_; }
static ClassRegistrar<FunctionBase> funcRegistrar_;
protected:
// numInputs_ and numOutputs_ represents the maximum
// input and output supported by Function.
// Some functions are optimized for input and output,
// so when comparing the number of arguments, for these functions
// inputs.size() <= numInputs_ or outputs.size() <= numOutputs_
size_t numInputs_;
size_t numOutputs_;
};
#define FUNC_NAME(typeName, deviceName) #typeName "-" #deviceName
......
......@@ -13,7 +13,8 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "Function.h"
#include "paddle/math/Vector.h"
#include "paddle/math/Matrix.h"
#include "paddle/math/SparseMatrix.h"
#include "paddle/math/tests/TensorCheck.h"
#include "paddle/testing/TestUtil.h"
......@@ -69,7 +70,7 @@ public:
}
// output need only contains shape, do not contains data.
void addOutputs(const BufferArg& output) {
void addOutputs(const BufferArg& output, ArgType argType = ASSIGN_TO) {
size_t size =
output.shape().getElements() * sizeOfValuType(output.valueType());
cpuMemory_.emplace_back(std::make_shared<CpuMemoryHandle>(size));
......@@ -79,12 +80,40 @@ public:
std::make_shared<BufferArg>(cpuMemory_.back()->getBuf(),
output.valueType(),
output.shape(),
ASSIGN_TO));
argType));
gpuOutputs_.emplace_back(
std::make_shared<BufferArg>(gpuMemory_.back()->getBuf(),
output.valueType(),
output.shape(),
ASSIGN_TO));
argType));
}
/// add and init output sparse matrix
void addOutputs(const SparseMatrixArg& output, ArgType argType = ASSIGN_TO) {
cpuSparse_ = std::make_shared<CpuSparseMatrix>(
output.shape()[0],
output.shape()[1],
output.nnz(),
static_cast<SparseValueType>(output.dataType()),
static_cast<SparseFormat>(output.dataFormat()));
gpuSparse_ = std::make_shared<GpuSparseMatrix>(
output.shape()[0],
output.shape()[1],
output.nnz(),
static_cast<SparseValueType>(output.dataType()),
static_cast<SparseFormat>(output.dataFormat()));
/// init sparse matrix
hl_stream_t stream(HPPL_STREAM_1);
cpuSparse_->randomizeUniform();
gpuSparse_->copyFrom(*cpuSparse_, stream);
hl_stream_synchronize(stream);
cpuOutputs_.emplace_back(
std::make_shared<SparseMatrixArg>(*cpuSparse_, argType));
gpuOutputs_.emplace_back(
std::make_shared<SparseMatrixArg>(*gpuSparse_, argType));
}
void addInputs(const SequenceArg& input) {
......@@ -107,10 +136,36 @@ public:
// TODO: need be implemented.
}
void addInputs(const SparseMatrixArg& input) {
cpuSparse_ = std::make_shared<CpuSparseMatrix>(
input.shape()[0],
input.shape()[1],
input.nnz(),
static_cast<SparseValueType>(input.dataType()),
static_cast<SparseFormat>(input.dataFormat()));
gpuSparse_ = std::make_shared<GpuSparseMatrix>(
input.shape()[0],
input.shape()[1],
input.nnz(),
static_cast<SparseValueType>(input.dataType()),
static_cast<SparseFormat>(input.dataFormat()));
/// init sparse matrix
hl_stream_t stream(HPPL_STREAM_1);
cpuSparse_->randomizeUniform();
gpuSparse_->copyFrom(*cpuSparse_, stream);
hl_stream_synchronize(stream);
cpuInputs_.emplace_back(std::make_shared<SparseMatrixArg>(*cpuSparse_));
gpuInputs_.emplace_back(std::make_shared<SparseMatrixArg>(*gpuSparse_));
}
void run() {
// prepare cpu/gpu arguments
initInputs();
initOutputs();
// function calculate
auto callFunction = [](FunctionBase* function,
std::vector<BufferArgPtr>& inputs,
......@@ -129,7 +184,7 @@ public:
callFunction(cpuFunc_.get(), cpuInputs_, cpuOutputs_);
callFunction(gpuFunc_.get(), gpuInputs_, gpuOutputs_);
// check outputs and inouts
// check outputs
compareOutputs();
}
......@@ -140,6 +195,10 @@ public:
protected:
void initInputs() {
for (size_t i = 0; i < cpuInputs_.size(); i++) {
if (cpuInputs_[i]->isSparseArg()) {
continue; /// sparse matrix already init
}
initArg(*cpuInputs_[i]);
// TODO: Need a BufferCopy used to copy from one BufferArg to another.
......@@ -152,14 +211,32 @@ protected:
}
}
void initOutputs() {
for (size_t i = 0; i < cpuOutputs_.size(); i++) {
if (cpuOutputs_[i]->isSparseArg()) {
continue; /// sparse matrix already init
}
initArg(*cpuOutputs_[i]);
// TODO: Need a BufferCopy used to copy from one BufferArg to another.
CpuVector cpuVector(cpuOutputs_[i]->shape().getElements(),
(real*)cpuOutputs_[i]->data());
GpuVector gpuVector(gpuOutputs_[i]->shape().getElements(),
(real*)gpuOutputs_[i]->data());
gpuVector.copyFrom(cpuVector);
}
}
void compareOutputs() {
for (size_t i = 0; i < cpuOutputs_.size(); i++) {
// TODO, Need a BufferCheck used to compare the two buffers.
auto cpu = cpuOutputs_[i];
auto gpu = gpuOutputs_[i];
CpuVector cpuVector(cpu->shape().getElements(), (real*)cpu->data());
GpuVector gpuVector(cpu->shape().getElements(), (real*)gpu->data());
const auto cpu = cpuOutputs_[i];
const auto gpu = gpuOutputs_[i];
CHECK_EQ(cpu->numElements(), gpu->numElements());
CpuVector cpuVector(cpu->numElements(), (real*)cpu->data());
GpuVector gpuVector(gpu->numElements(), (real*)gpu->data());
autotest::TensorCheckErr(cpuVector, gpuVector);
}
}
......@@ -195,6 +272,8 @@ protected:
std::vector<BufferArgPtr> cpuOutputs_;
std::vector<BufferArgPtr> gpuInputs_;
std::vector<BufferArgPtr> gpuOutputs_;
std::shared_ptr<CpuSparseMatrix> cpuSparse_;
std::shared_ptr<GpuSparseMatrix> gpuSparse_;
};
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "MulOp.h"
/// todo(tianbing), delete it
#include <iostream>
#include "paddle/math/MathFunctions.h"
#include "paddle/math/SIMDFunctions.h"
#include "paddle/utils/ThreadLocal.h"
#ifndef PADDLE_TYPE_DOUBLE
#define GEMM paddle::gemm<float>
#else
#define GEMM paddle::gemm<double>
#endif
namespace {
inline void vecAddTo(real* a, const real* b, real scaleB, size_t len) {
for (unsigned int i = 0; i < len; ++i) {
a[i] += (1.0 == scaleB) ? b[i] : scaleB * b[i];
}
}
inline void colVecAddTo(
real* a, real* b, real c, size_t len, size_t aWidth, size_t bWidth) {
for (unsigned int i = 0; i < len; ++i) {
a[i * aWidth] += (1.0 == c) ? b[i * bWidth] : b[i * bWidth] * c;
}
}
} // namespace
namespace paddle {
/// sparse matrix (+)= dense matrix * dense matrix
template <>
void MulOp<DEVICE_TYPE_CPU>(CpuSparseMatrix& out,
const CpuMatrix& a,
const CpuMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans) {
CHECK_EQ(out.getValueType(), FLOAT_VALUE);
if (scaleT == 0) {
out.zeroMem();
}
const real* A = a.getData();
const real* B = b.getData();
real* C = out.getValue();
int* rows = out.getRows();
int* cols = out.getCols();
size_t width = out.getWidth();
size_t height = out.getHeight();
/// SPARSE_CSC, {a any, b not trans}
if (out.getFormat() == SPARSE_CSC) {
/// b not trans and a any
CHECK(!bTrans);
size_t m = !aTrans ? a.getWidth() : a.getHeight();
for (size_t i = 0; i < width; i++) {
size_t start = out.getColStartIdx(i);
size_t end = out.getColStartIdx(i + 1);
for (size_t j = start; j < end; j++) {
real sum = 0;
size_t rowIdx = rows[j];
for (size_t k = 0; k < m; k++) {
sum += (!aTrans ? A[rowIdx * m + k] : A[k * height + rowIdx]) *
B[k * width + i];
}
C[j] = scaleAB * sum + scaleT * C[j];
}
}
return;
}
/// SPARSE_CSR, {a any, b not trans} or {a not trans, b trans}
if (out.getFormat() == SPARSE_CSR) {
/// a and b can not both transpose
CHECK(!(aTrans && bTrans));
size_t m = a.getWidth();
for (size_t i = 0; i < height; i++) {
size_t start = out.getRowStartIdx(i);
size_t end = out.getRowStartIdx(i + 1);
for (size_t j = start; j < end; j++) {
real sum = 0;
size_t colIdx = cols[j];
for (size_t k = 0; k < m; k++) {
sum += (!aTrans ? A[i * m + k] : A[k * height + i]) *
(!bTrans ? B[k * width + colIdx] : B[colIdx * m + k]);
}
C[j] = scaleAB * sum + scaleT * C[j];
}
}
return;
}
}
/// dense matrix (+)= dense matrix * dense matrix
template <>
void MulOp<DEVICE_TYPE_CPU>(CpuMatrix& out,
const CpuMatrix& a,
const CpuMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans) {
GEMM(aTrans ? CblasTrans : CblasNoTrans,
bTrans ? CblasTrans : CblasNoTrans,
out.getHeight(),
out.getWidth(),
!aTrans ? a.getWidth() : a.getHeight(),
scaleAB,
a.getData(),
a.getStride(),
b.getData(),
b.getStride(),
scaleT,
out.getData(),
out.getStride());
}
/// dense matrix (+)= sparse matrix * dense matrix
template <>
void MulOp<DEVICE_TYPE_CPU>(CpuMatrix& out,
const CpuSparseMatrix& a,
const CpuMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans) {
if (scaleT == 0) {
out.zeroMem();
}
const real* B = b.getData();
real* C = out.getData();
if (out.getWidth() % 32 == 0) {
CHECK_EQ((size_t)B % 32, 0UL);
CHECK_EQ((size_t)C % 32, 0UL);
}
int* cols = a.getCols();
real* values = a.getValue();
for (size_t i = 0; i < a.getHeight(); ++i) {
const int start = a.getRowStartIdx(i);
const int end = a.getRowStartIdx(i + 1);
for (int j = start; j < end; ++j) {
vecAddTo(!aTrans ? out.getRow(i) : out.getRow(cols[j]),
!aTrans ? const_cast<CpuMatrix&>(b).getRow(cols[j])
: const_cast<CpuMatrix&>(b).getRow(i),
(a.getValueType() == FLOAT_VALUE) ? values[j] : (real)1.0,
out.getWidth());
}
}
}
/// dense matrix (+)= dense matrix * sparse matrix
template <>
void MulOp<DEVICE_TYPE_CPU>(CpuMatrix& out,
const CpuMatrix& a,
const CpuSparseMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans) {
if (scaleT == 0) {
out.zeroMem();
}
real* A = const_cast<real*>(a.getData());
real* B = const_cast<real*>(b.getValue());
real* C = out.getData();
int* rows = b.getRows();
int* cols = b.getCols();
/// SPARSE_CSC format
if (b.getFormat() == SPARSE_CSC) {
for (size_t j = 0; j < b.getWidth(); ++j) {
int start = b.getColStartIdx(j);
int end = b.getColStartIdx(j + 1);
for (int i = start; i < end; ++i) {
colVecAddTo(!bTrans ? C + j : C + rows[i],
!bTrans ? A + rows[i] : A + j,
(b.getValueType() == NO_VALUE) ? (real)1.0 : B[i],
out.getHeight(),
out.getWidth(),
a.getWidth());
}
}
return;
}
/// SPARSE_CSR format
if (b.getFormat() == SPARSE_CSR) {
for (size_t j = 0; j < b.getHeight(); ++j) {
int start = b.getRowStartIdx(j);
int end = b.getRowStartIdx(j + 1);
for (int i = start; i < end; ++i) {
colVecAddTo(!bTrans ? C + cols[i] : C + j,
!bTrans ? A + j : A + cols[i],
(b.getValueType() == NO_VALUE) ? (real)1.0 : B[i],
out.getHeight(),
out.getWidth(),
a.getWidth());
}
}
return;
}
}
/**
* mul operator
* out = scaleT * out + scaleAB * (A * B)
* here, scaleT in {0, 1}, scaleAB == 1,
* out = A * B, ASSIGN_TO
* out += A * B, ADD_TO
*
*
* \param outputs[0] output matrix (out), M * N,
* could be either Sparse or Dense Matrix
* M is num of rows, N is num of columns
* \param inputs[0] first input matrix (A), M * K (if non-trans)
* could be either Sparse or Dense Matrix
* M is num of rows, K is num of columns
* \param inputs[1] second input matrix (B), K * N (if non-trans)
* could be either Sparse or Dense Matrix
* K is num of rows, N is num of columns
*
* Support eight Mul operators, with both GPU and CPU devices
* For each device, four Mul operators are supported:
* 1. dense (out) = dense (A) * dense (B)
* 2. dense (out) = sparse (A) * dense (B)
* sparse matrix only support SPARSE_CSR format
* 3. dense (out) = dense (A) * sparse (B)
* sparse matrix support SPARSE_CSC and SPARSE_CSR formats
* 4. sparse (out) = dense (A) * dense (B)
* sparse matrix support SPARSE_CSC and SPARSE_CSR formats
*
*/
template <DeviceType Device>
class MulFunc : public FunctionBase {
public:
void init(const FuncConfig& config) override {
aTrans_ = config.get<bool>("aTrans");
bTrans_ = config.get<bool>("bTrans");
}
void calc(const BufferArgs& inputs, const BufferArgs& outputs) override {
CHECK(!aTrans_ || !bTrans_)
<< "Not support both a and b are transpose matrices";
CHECK_EQ((size_t)2, inputs.size());
CHECK_EQ((size_t)1, outputs.size());
CHECK(inputs[0].data() && inputs[1].data() && outputs[0].data());
CHECK_EQ(inputs[0].shape().ndims(), (size_t)2);
CHECK_EQ(inputs[1].shape().ndims(), (size_t)2);
CHECK_EQ(outputs[0].shape().ndims(), (size_t)2);
size_t aRow = !aTrans_ ? inputs[0].shape()[0] : inputs[0].shape()[1];
size_t aCol = !aTrans_ ? inputs[0].shape()[1] : inputs[0].shape()[0];
size_t bRow = !bTrans_ ? inputs[1].shape()[0] : inputs[1].shape()[1];
size_t bCol = !bTrans_ ? inputs[1].shape()[1] : inputs[1].shape()[0];
/// C = A * B, or C += A * B, for matrix format
CHECK_EQ(aCol, bRow);
CHECK_EQ(aRow, outputs[0].shape()[0]);
CHECK_EQ(bCol, outputs[0].shape()[1]);
/// only support C = A * B (ASSIGN_TO) or C += A * B (ADD_TO)
real scaleT = (outputs[0].getArgType() == ADD_TO) ? 1.0 : 0.0;
/// support dense = not both sparse * sparse
/// or sparse = dense * dense
CHECK((!outputs[0].isSparseArg() &&
!(inputs[0].isSparseArg() && inputs[1].isSparseArg())) ||
(outputs[0].isSparseArg() && !inputs[0].isSparseArg() &&
!inputs[1].isSparseArg()));
auto outMat = outputs[0].matrix<Device>();
/// dense matrix = dense matrix * dense matrix
if (!inputs[0].isSparseArg() && !inputs[1].isSparseArg() &&
!outputs[0].isSparseArg()) {
MulOp<Device>(outMat,
inputs[0].matrix<Device>(),
inputs[1].matrix<Device>(),
1.0, // scaleAB
scaleT,
aTrans_,
bTrans_);
return;
}
/// dense matrix = dense matrix * sparse matrix
if (!inputs[0].isSparseArg() && inputs[1].isSparseArg() &&
!outputs[0].isSparseArg()) {
CHECK(!aTrans_) << "Not supported a transpose";
MulOp<Device>(outMat,
inputs[0].matrix<Device>(),
inputs[1].sparse().SparseMatrix<Device>(),
1.0, // scaleAB
scaleT,
aTrans_,
bTrans_);
return;
}
/// dense matrix = sparse matrix * dense matrix
if (inputs[0].isSparseArg() && !inputs[1].isSparseArg() &&
!outputs[0].isSparseArg()) {
CHECK(!bTrans_) << "Not supported b transpose";
CHECK_EQ(inputs[0].sparse().dataFormat(), T_SPARSE_CSR)
<< "Only supported SPARSE_CSR format for sparse matrix a";
MulOp<Device>(outMat,
inputs[0].sparse().SparseMatrix<Device>(),
inputs[1].matrix<Device>(),
1.0, // scaleAB
scaleT,
aTrans_,
bTrans_);
return;
}
/// sparse matrix = dense matrix * dense matrix
auto outSparseMat = outputs[0].sparse().SparseMatrix<Device>();
if (!inputs[0].isSparseArg() && !inputs[1].isSparseArg() &&
outputs[0].isSparseArg()) {
MulOp<Device>(outSparseMat,
inputs[0].matrix<Device>(),
inputs[1].matrix<Device>(),
1.0, // scaleAB
scaleT,
aTrans_,
bTrans_);
return;
}
}
private:
bool aTrans_;
bool bTrans_;
};
REGISTER_TYPED_FUNC(MulOp, CPU, MulFunc);
#ifndef PADDLE_ONLY_CPU
REGISTER_TYPED_FUNC(MulOp, GPU, MulFunc);
#endif
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include "Function.h"
#include "paddle/math/Matrix.h"
#include "paddle/math/SparseMatrix.h"
namespace paddle {
/// CPU, dense matrix (+)= dense matrix * dense matrix
template <DeviceType DType>
void MulOp(CpuMatrix& out,
const CpuMatrix& a,
const CpuMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans);
/// CPU, dense matrix (+)= sparse matrix * dense matrix
template <DeviceType DType>
void MulOp(CpuMatrix& out,
const CpuSparseMatrix& a,
const CpuMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans);
/// CPU, dense matrix (+)= dense matrix * sparse matrix
template <DeviceType DType>
void MulOp(CpuMatrix& out,
const CpuMatrix& a,
const CpuSparseMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans);
/// CPU, sparse matrix (+)= dense matrix * dense matrix
template <DeviceType DType>
void MulOp(CpuSparseMatrix& out,
const CpuMatrix& a,
const CpuMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans);
/// GPU, dense matrix (+)= dense matrix * dense matrix
template <DeviceType DType>
void MulOp(GpuMatrix& out,
const GpuMatrix& a,
const GpuMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans);
/// GPU, dense matrix (+)= sparse matrix * dense matrix
template <DeviceType DType>
void MulOp(GpuMatrix& out,
const GpuSparseMatrix& a,
const GpuMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans);
/// GPU, dense matrix (+)= dense matrix * sparse matrix
template <DeviceType DType>
void MulOp(GpuMatrix& out,
const GpuMatrix& a,
const GpuSparseMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans);
/// GPU, sparse matrix (+)= dense matrix * dense matrix
template <DeviceType DType>
void MulOp(GpuSparseMatrix& out,
const GpuMatrix& a,
const GpuMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans);
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "hl_base.h"
#include "MulOp.h"
#include "paddle/math/Matrix.h"
#include "paddle/math/SparseMatrix.h"
namespace paddle {
/// dense matrix (+)= dense matrix * dense matrix
template <>
void MulOp<DEVICE_TYPE_GPU>(GpuMatrix& out,
const GpuMatrix& a,
const GpuMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans) {
CHECK(a.useGpu_ && b.useGpu_) << "matrix device type not match";
hl_matrix_mul(const_cast<real*>(a.getData()),
!aTrans ? HPPL_OP_N : HPPL_OP_T,
const_cast<real*>(b.getData()),
!bTrans ? HPPL_OP_N : HPPL_OP_T,
const_cast<real*>(out.getData()),
out.getHeight(),
out.getWidth(),
!aTrans ? a.getWidth() : a.getHeight(),
scaleAB,
scaleT,
a.getStride(),
b.getStride(),
out.getStride());
}
/// dense matrix (+)= sparse matrix * dense matrix
template <>
void MulOp<DEVICE_TYPE_GPU>(GpuMatrix& out,
const GpuSparseMatrix& a,
const GpuMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans) {
CHECK(out.isContiguous());
CHECK(b.isContiguous());
CHECK(a.useGpu_ && b.useGpu_) << "matrix device type not match";
hl_matrix_csr_mul_dense(a.sMatrix_.get(),
aTrans ? HPPL_OP_T : HPPL_OP_N,
const_cast<real*>(b.getData()),
HPPL_OP_N,
const_cast<real*>(out.getData()),
out.getHeight(),
out.getWidth(),
b.getHeight(),
scaleAB,
scaleT);
}
/// dense matrix (+)= dense matrix * sparse matrix
template <>
void MulOp<DEVICE_TYPE_GPU>(GpuMatrix& out,
const GpuMatrix& a,
const GpuSparseMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans) {
CHECK(out.isContiguous());
CHECK(a.isContiguous());
CHECK(a.useGpu_ && b.useGpu_) << "matrix device type not match";
if (b.format_ == SPARSE_CSC) {
hl_matrix_dense_mul_csc(const_cast<real*>(a.getData()),
HPPL_OP_N,
b.sMatrix_.get(),
bTrans ? HPPL_OP_T : HPPL_OP_N,
const_cast<real*>(out.getData()),
out.getHeight(),
out.getWidth(),
a.getWidth(),
scaleAB,
scaleT);
} else {
hl_matrix_dense_mul_csr(const_cast<real*>(a.getData()),
HPPL_OP_N,
b.sMatrix_.get(),
bTrans ? HPPL_OP_T : HPPL_OP_N,
const_cast<real*>(out.getData()),
out.getHeight(),
out.getWidth(),
a.getWidth(),
scaleAB,
scaleT);
}
}
/// sparse matrix (+)= dense matrix * dense matrix
template <>
void MulOp<DEVICE_TYPE_GPU>(GpuSparseMatrix& out,
const GpuMatrix& a,
const GpuMatrix& b,
real scaleAB,
real scaleT,
bool aTrans,
bool bTrans) {
CHECK(a.useGpu_ && b.useGpu_) << "matrix device type not match";
hl_sparse_matrix_mul(const_cast<real*>(a.getData()),
aTrans ? HPPL_OP_T : HPPL_OP_N,
const_cast<real*>(b.getData()),
bTrans ? HPPL_OP_T : HPPL_OP_N,
out.sMatrix_.get(),
out.getHeight(),
out.getWidth(),
!bTrans ? b.getHeight() : b.getWidth(),
scaleAB,
scaleT);
}
} // namespace paddle
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <gtest/gtest.h>
#include "FunctionTest.h"
#include "paddle/math/Matrix.h"
#include "paddle/math/SparseMatrix.h"
#include "paddle/math/tests/test_matrixUtil.h"
#include "paddle/testing/TestUtil.h"
using namespace paddle; // NOLINT
/**
* C += A * B, A, B, C dense matrix
* dense = dense * dense
*/
void testFuncDDDMatrix(
bool transa, bool transb, size_t dimM, size_t dimN, size_t dimK) {
real scaleT = 1.0;
size_t heightA = (transa == false) ? dimM : dimK;
size_t widthA = (transa == false) ? dimK : dimM;
size_t heightB = (transb == false) ? dimK : dimN;
size_t widthB = (transb == false) ? dimN : dimK;
size_t heightC = dimM;
size_t widthC = dimN;
// init Test object
FunctionCompare test(
"MulOp", FuncConfig().set("aTrans", transa).set("bTrans", transb));
// prepare input arguments
/// matrix A : HA * WA
test.addInputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{heightA, widthA}));
/// matrix B: HB * WB
test.addInputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{heightB, widthB}));
/// output matrix C: HC * WC
test.addOutputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{heightC, widthC}),
scaleT == 1.0 ? ADD_TO : ASSIGN_TO);
// run Function
test.run();
}
TEST(MulOp, DDDMatrixMul) {
LOG(INFO) << "function test for dense = dense * dense matrix";
for (const auto transa : {false, true}) {
for (const auto transb : {false, true}) {
for (const auto dimM : {1, 10, 100}) {
for (const auto dimN : {1, 10}) {
for (const auto dimK : {8}) {
if (transa && transb) {
continue;
}
VLOG(3) << setiosflags(std::ios::left) << std::setfill(' ')
<< " transa=" << transa << " transb=" << transb
<< " dimM=" << std::setw(5) << dimM
<< " dimN=" << std::setw(5) << dimN
<< " dimK=" << std::setw(5) << dimK;
testFuncDDDMatrix(transa, transb, dimM, dimN, dimK);
}
}
}
}
}
}
/**
* C += A * B, B, C dense, A sparse
* dense = sparse * dense
*/
void testFuncDSparseDMatrix(
size_t dimM, size_t dimN, size_t dimK, size_t nnz, SparseFormat FORMAT) {
real scaleT = 1.0;
// init Test object
FunctionCompare test("MulOp",
FuncConfig().set("aTrans", false).set("bTrans", false));
// prepare input arguments
/// sparse matrix A : M * K
test.addInputs(SparseMatrixArg(
VALUE_TYPE_FLOAT, TensorShape{dimM, dimK}, nnz, FORMAT, FLOAT_VALUE));
/// matrix B: K * N
test.addInputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{dimK, dimN}));
/// output matrix C: M * N
test.addOutputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{dimM, dimN}),
scaleT == 1.0 ? ADD_TO : ASSIGN_TO);
// run Function
test.run();
}
TEST(MuLOp, DSparseDMul) {
LOG(INFO) << "function test for dense = sparse * dense matrix";
for (const auto dimM : {10, 100, 1000}) {
for (const auto dimN : {10, 100}) {
for (const auto dimK : {3, 10}) {
for (const auto nnz : {3, 10}) {
for (const auto FORMAT : {SPARSE_CSR}) {
VLOG(3) << setiosflags(std::ios::left) << std::setfill(' ')
<< " dimM=" << std::setw(5) << dimM
<< " dimN=" << std::setw(5) << dimN
<< " dimK=" << std::setw(5) << dimK
<< " nnz=" << std::setw(5) << nnz
<< " format=" << std::setw(5) << FORMAT;
testFuncDSparseDMatrix(dimM, dimN, dimK, nnz, FORMAT);
}
}
}
}
}
}
/**
* C += A * B, A, C dense, B sparse
* dense = dense * sparse
*/
void testFuncDDSparseMatrix(
size_t dimM, size_t dimN, size_t dimK, size_t nnz, SparseFormat FORMAT) {
real scaleT = 1.0;
// init Test object
FunctionCompare test("MulOp",
FuncConfig().set("aTrans", false).set("bTrans", false));
// prepare input arguments
/// matrix A : M * K
test.addInputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{dimM, dimK}));
/// matrix B: K * N
test.addInputs(SparseMatrixArg(
VALUE_TYPE_FLOAT, TensorShape{dimK, dimN}, nnz, FORMAT, FLOAT_VALUE));
/// output matrix C: M * N
test.addOutputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{dimM, dimN}),
scaleT == 1.0 ? ADD_TO : ASSIGN_TO);
// run Function
test.run();
}
TEST(MulOp, DDSparseMul) {
LOG(INFO) << "function test for dense = dense * sparse matrix";
for (const auto dimM : {10, 100, 1000}) {
for (const auto dimN : {10, 100}) {
for (const auto dimK : {3, 10}) {
for (const auto nnz : {3, 10}) {
for (const auto FORMAT : {SPARSE_CSR, SPARSE_CSC}) {
VLOG(3) << setiosflags(std::ios::left) << std::setfill(' ')
<< " dimM=" << std::setw(5) << dimM
<< " dimN=" << std::setw(5) << dimN
<< " dimK=" << std::setw(5) << dimK
<< " nnz=" << std::setw(5) << nnz
<< " format=" << std::setw(5) << FORMAT;
testFuncDDSparseMatrix(dimM, dimN, dimK, nnz, FORMAT);
}
}
}
}
}
}
/**
* C += A * B, A sparse, B, C dense
* sparse = dense * dense
*/
void testFuncSparseDDMatrix(
size_t dimM, size_t dimN, size_t dimK, size_t nnz, SparseFormat FORMAT) {
real scaleT = 1.0;
// init Test object
FunctionCompare test("MulOp",
FuncConfig().set("aTrans", false).set("bTrans", false));
// prepare input arguments
/// matrix A : M * K
test.addInputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{dimM, dimK}));
/// matrix B: K * N
test.addInputs(BufferArg(VALUE_TYPE_FLOAT, TensorShape{dimK, dimN}));
/// output sparse matrix C: M * N
test.addOutputs(
SparseMatrixArg(
VALUE_TYPE_FLOAT, TensorShape{dimM, dimN}, nnz, FORMAT, FLOAT_VALUE),
scaleT == 1.0 ? ADD_TO : ASSIGN_TO);
// run Function
test.run();
}
TEST(MulOp, SparseDDMul) {
LOG(INFO) << "function test for sparse = dense * dense matrix";
for (const auto dimM : {10, 100, 1000}) {
for (const auto dimN : {10, 100}) {
for (const auto dimK : {3, 10}) {
for (const auto nnz : {3, 10}) {
for (const auto FORMAT : {SPARSE_CSC, SPARSE_CSR}) {
VLOG(3) << setiosflags(std::ios::left) << std::setfill(' ')
<< " dimM=" << std::setw(5) << dimM
<< " dimN=" << std::setw(5) << dimN
<< " dimK=" << std::setw(5) << dimK
<< " nnz=" << std::setw(5) << nnz
<< " format=" << std::setw(5) << FORMAT;
testFuncSparseDDMatrix(dimM, dimN, dimK, nnz, FORMAT);
}
}
}
}
}
}
......@@ -31,6 +31,10 @@ enum DeviceType {
DEVICE_TYPE_GPU = 2
};
enum SparseDataType { T_NO_VALUE = 0, T_FLOAT_VALUE = 1 };
enum SparseDataFormat { T_SPARSE_CSR = 0, T_SPARSE_CSC = 1 };
inline int sizeOfValuType(ValueType valueType) {
if (valueType == VALUE_TYPE_INT32) {
return 4;
......@@ -87,6 +91,29 @@ struct MatrixT<int, DEVICE_TYPE_GPU> {
using type = void; // Not implemented
};
template <typename VType, DeviceType Device>
struct SparseMatrixT;
template <>
struct SparseMatrixT<real, DEVICE_TYPE_CPU> {
using type = CpuSparseMatrix;
};
template <>
struct SparseMatrixT<real, DEVICE_TYPE_GPU> {
using type = GpuSparseMatrix;
};
template <>
struct SparseMatrixT<int, DEVICE_TYPE_CPU> {
using type = void; // Not implemented
};
template <>
struct SparseMatrixT<int, DEVICE_TYPE_GPU> {
using type = void; // Not implemented
};
template <typename VType, DeviceType Device>
struct VectorT;
......@@ -114,8 +141,9 @@ struct VectorT<int, DEVICE_TYPE_GPU> {
template <typename VType, DeviceType DType>
struct Tensor {
typedef typename detail::MatrixT<VType, DType>::type Matrix;
typedef typename detail::VectorT<VType, DType>::type Vector;
typedef typename detail::MatrixT<VType, DType>::type Matrix;
typedef typename detail::SparseMatrixT<VType, DType>::type SparseMatrix;
};
} // namespace paddle
......@@ -60,55 +60,6 @@ GradientMachine* GradientMachine::create(
return nullptr;
}
GradientMachine* GradientMachine::create(const std::string& modelFile,
DataConfig* dataConfig) {
std::ifstream is(modelFile);
CHECK(is) << "Fail to open " << modelFile;
return create(is, dataConfig);
}
GradientMachine* GradientMachine::create(std::istream& is,
DataConfig* dataConfig) {
TrainerConfig trainerConfig;
GradientMachine* ret = create(is, &trainerConfig);
if (dataConfig && trainerConfig.has_data_config()) {
*dataConfig = trainerConfig.data_config();
}
return ret;
}
GradientMachine* GradientMachine::create(const std::string& modelFile,
TrainerConfig* trainerConfig) {
std::ifstream is(modelFile);
CHECK(is) << "Fail to open " << modelFile;
return create(is, trainerConfig);
}
GradientMachine* GradientMachine::create(std::istream& is,
TrainerConfig* trainerConfig) {
TrainerConfig trainerConfigTemp;
int64_t size;
CHECK(is.read((char*)&size, sizeof(size))) << "Fail to read ";
std::string buf;
buf.resize(size);
CHECK(is.read(&buf[0], size)) << "Fail to read ";
CHECK(trainerConfigTemp.ParseFromString(buf)) << "Fail to parse config";
std::unique_ptr<GradientMachine> machine(
create(trainerConfigTemp.model_config()));
std::vector<ParameterPtr>& parameters = machine->getParameters();
for (auto& para : parameters) {
para->load(is);
}
machine->onLoadParameter();
if (trainerConfig) {
*trainerConfig = trainerConfigTemp;
}
return machine.release();
}
void GradientMachine::saveParameters(const std::string& dir) const {
LOG(INFO) << "Saving parameters to " << dir;
......
......@@ -89,39 +89,6 @@ public:
std::vector<ParameterType>{
PARAMETER_VALUE, PARAMETER_GRADIENT, PARAMETER_MOMENTUM});
/**
* Create a gradient machine from the merged model file.
* The merged model file can be generated using tools/merge_model
* If dataConfig is not null, it will be filled with the DataConfig
* from the TrainerConfig
*/
static GradientMachine* create(const std::string& modelFile,
DataConfig* dataConfig);
/**
* Create a gradient machine from a stream which contains the merged
* model file. The merged model file can be generated using tools/merge_model
* If dataConfig is not null, it will be filled with the DataConfig
* from the TrainerConfig
*/
static GradientMachine* create(std::istream& is, DataConfig* dataConfig);
/**
* Create a gradient machine from the merged model file.
* The merged model file can be generated using tools/merge_model
* If trainerConfig is not null, it will be filled with the TrainerConfig
*/
static GradientMachine* create(const std::string& modelFile,
TrainerConfig* trainerConfig);
/**
* Create a gradient machine from a stream which contains the merged
* model file. The merged model file can be generated using tools/merge_model
* If trainerConfig is not null, it will be filled with the TrainerConfig
*/
static GradientMachine* create(std::istream& is,
TrainerConfig* trainerConfig);
virtual ~GradientMachine() {}
/**
......
......@@ -31,6 +31,7 @@ limitations under the License. */
namespace paddle {
/// TODO(tianbing), move to paddle/function/TensorType.h
enum SparseValueType { NO_VALUE = 0, FLOAT_VALUE = 1 };
/**
......@@ -56,6 +57,7 @@ enum SparseValueType { NO_VALUE = 0, FLOAT_VALUE = 1 };
* value [1, 1, 2, 2, 5]
* @endcode
*/
/// TODO(tianbing), move to paddle/function/TensorType.h
enum SparseFormat { SPARSE_CSR = 0, SPARSE_CSC = 1 };
class Matrix;
......
......@@ -177,7 +177,6 @@ GpuSparseMatrix::GpuSparseMatrix(real* value,
hl_sparse_matrix_s_ptr tmp2(tmp, hl_destruct_sparse_matrix);
sMatrix_ = tmp2;
}
LOG(INFO) << "weight to matrix ";
}
}
......
......@@ -24,10 +24,6 @@ limitations under the License. */
#include "paddle/utils/Thread.h"
#include "paddle/utils/Util.h"
DEFINE_bool(allow_inefficient_sparse_update,
false,
"Whether to allow inefficient sparse update");
namespace paddle {
const unsigned int SparseRowCpuMatrix::kUnusedId_ = -1U;
......
......@@ -21,8 +21,6 @@ limitations under the License. */
#include "RowBuffer.h"
#include "paddle/utils/Util.h"
DECLARE_bool(allow_inefficient_sparse_update);
namespace paddle {
/**
......@@ -183,11 +181,10 @@ protected:
inline void checkStoreSize() {
if (buf_->isAutoGrowth()) {
if (buf_->getRowCount() > 0.5 * height_) {
LOG(WARNING)
<< "There are more than 0.5*height (" << localIndices_->size()
<< ") rows are used for sparse "
<< "update, which is not efficient. Considering not use "
<< "sparse_update or set --allow_inefficient_sparse_update=true";
LOG(WARNING) << "There are more than 0.5*height ("
<< localIndices_->size() << ") rows are used for sparse "
<< "update, which is not efficient. Considering not use "
<< "sparse_update.";
}
} else {
CHECK_LE(localIndices_->size(), buf_->getRowCount());
......
......@@ -30,6 +30,17 @@ void checkMatrixEqual(const MatrixPtr& a, const MatrixPtr& b) {
}
}
void checkSMatrixEqual(const CpuSparseMatrix& a, const CpuSparseMatrix& b) {
ASSERT_EQ(a.getWidth(), b.getWidth());
ASSERT_EQ(a.getHeight(), b.getHeight());
ASSERT_EQ(a.isTransposed(), b.isTransposed());
ASSERT_EQ(a.getFormat(), b.getFormat());
ASSERT_EQ(a.getElementCnt(), b.getElementCnt());
for (size_t r = 0; r < a.getElementCnt(); ++r) {
ASSERT_FLOAT_EQ(a.getValue()[r], b.getValue()[r]);
}
}
void checkSMatrixEqual(const CpuSparseMatrixPtr& a,
const CpuSparseMatrixPtr& b) {
ASSERT_EQ(a->getWidth(), b->getWidth());
......@@ -73,6 +84,36 @@ void checkSMatrixEqual2(const CpuSparseMatrixPtr& a,
}
}
void checkSMatrixEqual2Dense(const CpuSparseMatrix& a, const CpuMatrix& b) {
ASSERT_EQ(a.getWidth(), b.getWidth());
ASSERT_EQ(a.getHeight(), b.getHeight());
ASSERT_EQ(a.isTransposed(), b.isTransposed());
if (a.getFormat() == SPARSE_CSC) {
int* rows = a.getRows();
for (size_t i = 0; i < a.getWidth(); i++) {
for (size_t j = a.getColStartIdx(i); j < a.getColStartIdx(i + 1); j++) {
if (a.getValueType() == FLOAT_VALUE) {
ASSERT_FLOAT_EQ(a.getValue()[j], b.getElement(rows[j], i));
} else {
ASSERT_FLOAT_EQ(1.0, b.getElement(rows[j], i));
}
}
}
} else {
int* cols = a.getCols();
for (size_t i = 0; i < a.getHeight(); i++) {
for (size_t j = a.getRowStartIdx(i); j < a.getRowStartIdx(i + 1); j++) {
if (a.getValueType() == FLOAT_VALUE) {
ASSERT_FLOAT_EQ(a.getValue()[j], b.getElement(i, cols[j]));
} else {
ASSERT_FLOAT_EQ(1.0, b.getElement(i, cols[j]));
}
}
}
}
}
void checkSMatrixEqual2Dense(const CpuSparseMatrixPtr& a,
const CpuMatrixPtr& b) {
ASSERT_EQ(a->getWidth(), b->getWidth());
......
......@@ -90,16 +90,6 @@ DEFINE_string(model_list, "", "File that saves the model list when evaluation");
namespace paddle {
void Trainer::init(int argc, char** argv) {
initMain(argc, argv);
initPython(argc, argv);
auto config = TrainerConfigHelper::createFromFlagConfig();
feenableexcept(FE_INVALID | FE_DIVBYZERO | FE_OVERFLOW);
init(config);
}
void Trainer::init(const std::shared_ptr<TrainerConfigHelper>& config,
bool testing,
const std::shared_ptr<GradientMachine>& gradientMachine,
......
......@@ -71,11 +71,6 @@ public:
const std::shared_ptr<DataProvider>& dataProvider = nullptr,
const std::shared_ptr<DataProvider>& testDataProvider = nullptr);
/**
* Initialize Trainer from command line flags.
*/
void init(int argc, char** argv);
/**
* Train until num_passes reached.
* One pass means neural network train through all training data.
......
################# test_Prediction ######################
add_unittest_without_exec(test_Prediction
test_Prediction.cpp)
add_test(NAME test_Prediction
COMMAND ${PROJ_ROOT}/paddle/.set_python_path.sh -d ${PROJ_ROOT}/python
${CMAKE_CURRENT_BINARY_DIR}/test_Prediction --merger=${CMAKE_CURRENT_BINARY_DIR}/../paddle_merge_model
WORKING_DIRECTORY ${PROJ_ROOT}/paddle/)
################# test_Compare ############################
add_unittest_without_exec(test_Compare
test_Compare.cpp)
......
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include <paddle/utils/PythonUtil.h>
#include "paddle/trainer/Trainer.h"
#include <gtest/gtest.h>
DECLARE_string(config);
DECLARE_string(config_args);
DEFINE_string(merger,
"./paddle_merge_model",
"path to paddle_merge_model binary");
using namespace paddle; // NOLINT
using namespace std; // NOLINT
static const string& configFile = "trainer/tests/sample_trainer_config.conf";
static const string& mergedModelFile = "./test_model_file";
static const string& modelDir = "./test_model_dir";
void checkBuffer(real* vec1, real* vec2, size_t len) {
for (size_t i = 0; i < len; i++) {
EXPECT_EQ(vec1[i], vec2[i]) << "vec1:" << vec1[i] << " vec2:" << vec2[i];
}
}
void checkParameters(vector<ParameterPtr> A, vector<ParameterPtr> B) {
CHECK_EQ(B.size(), A.size()) << "parameter size not equal";
for (size_t i = 0; i < A.size(); i++) {
auto vec1 = A[i]->getBuf(PARAMETER_VALUE);
auto vec2 = B[i]->getBuf(PARAMETER_VALUE);
CHECK_EQ(vec1->useGpu_, vec2->useGpu_) << "use gpu not equal";
CHECK_EQ(vec1->getSize(), vec2->getSize()) << "size not equal";
if (vec1->useGpu_ == false) {
checkBuffer(vec1->getData(), vec2->getData(), vec1->getSize());
} else {
VectorPtr cpuVec1 = Vector::create(vec1->getSize(), false);
VectorPtr cpuVec2 = Vector::create(vec2->getSize(), false);
cpuVec1->copyFrom(*vec1, HPPL_STREAM_DEFAULT);
cpuVec2->copyFrom(*vec2, HPPL_STREAM_DEFAULT);
hl_stream_synchronize(HPPL_STREAM_DEFAULT);
checkBuffer(cpuVec1->getData(), cpuVec2->getData(), cpuVec1->getSize());
}
}
}
TEST(GradientMachine, create) {
#ifdef PADDLE_ONLY_CPU
FLAGS_use_gpu = false;
#endif
mkDir(modelDir.c_str());
FLAGS_config = configFile;
FLAGS_config_args = "with_cost=False";
auto config = TrainerConfigHelper::createFromFlagConfig();
// save model to directory
unique_ptr<GradientMachine> gradientMachine1(
GradientMachine::create(*config));
gradientMachine1->saveParameters(modelDir);
Trainer trainer;
trainer.init(config);
ParameterUtil* paramUtil = trainer.getParameterUtilPtr();
if (paramUtil != NULL) {
paramUtil->saveConfigWithPath(modelDir);
}
// create a different GradientMachine
unique_ptr<GradientMachine> gradientMachine2(
GradientMachine::create(*config));
gradientMachine2->randParameters();
// merge config and model to one file
string cmd = FLAGS_merger + " --model_dir=" + modelDir +
" --config_args=with_cost=False" + " --model_file=" +
mergedModelFile;
LOG(INFO) << cmd;
int ret = system(cmd.c_str());
EXPECT_EQ(0, ret);
if (ret) {
return;
}
// create GradientMachine from the merged model
DataConfig dataConfig;
unique_ptr<GradientMachine> gradientMachine3(
GradientMachine::create(mergedModelFile, &dataConfig));
CHECK(gradientMachine3);
EXPECT_EQ(dataConfig.type(), "simple");
EXPECT_EQ(dataConfig.feat_dim(), 3);
// compare the parameters of GradientMachine and GradientMachine3
std::vector<ParameterPtr> paraMachine1 = gradientMachine1->getParameters();
std::vector<ParameterPtr> paraMachine3 = gradientMachine3->getParameters();
checkParameters(paraMachine1, paraMachine3);
// Test that the GradientMachine created from the merged model
// is same as the orginnal one.
vector<Argument> inArgs(1);
vector<Argument> outArgs;
int inputDim = 3;
int numSamples = 2;
CpuMatrix cpuInput(numSamples, inputDim);
for (int i = 0; i < numSamples; ++i) {
for (int j = 0; j < inputDim; ++j) {
cpuInput.getData()[i * inputDim + j] =
rand() / (real)RAND_MAX; // NOLINT TODO(yuyang): use rand_r
}
}
MatrixPtr input = Matrix::create(numSamples,
inputDim,
/* trans */ false,
FLAGS_use_gpu);
input->copyFrom(cpuInput);
inArgs[0].value = input;
gradientMachine1->forward(inArgs, &outArgs, PASS_TEST);
EXPECT_EQ((size_t)1, outArgs.size());
vector<Argument> outArgs2;
gradientMachine2->forward(inArgs, &outArgs2, PASS_TEST);
CpuMatrix out1(outArgs[0].value->getHeight(), outArgs[0].value->getWidth());
CpuMatrix out2(outArgs2[0].value->getHeight(), outArgs2[0].value->getWidth());
out1.copyFrom(*outArgs[0].value);
out2.copyFrom(*outArgs2[0].value);
for (size_t i = 0; i < out1.getHeight() * out1.getWidth(); i++) {
EXPECT_NE(out1.getData()[i], out2.getData()[i]);
}
gradientMachine3->forward(inArgs, &outArgs2, PASS_TEST);
out2.copyFrom(*outArgs2[0].value);
checkBuffer(
out1.getData(), out2.getData(), out2.getHeight() * out2.getWidth());
cmd = " rm -rf " + modelDir + "/*";
LOG(INFO) << "cmd " << cmd;
ret = system(cmd.c_str());
EXPECT_EQ(0, ret);
if (ret) {
return;
}
cmd = " rm -rf " + mergedModelFile;
LOG(INFO) << "cmd " << cmd;
ret = system(cmd.c_str());
EXPECT_EQ(0, ret);
if (ret) {
return;
}
// clean up
rmDir(modelDir.c_str());
remove(mergedModelFile.c_str());
}
int main(int argc, char** argv) {
initMain(argc, argv);
initPython(argc, argv);
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......@@ -33,12 +33,15 @@ DEFINE_int32(port, 20134, "Listening port for pserver");
DEFINE_int32(data_server_port, 21134, "Listening port for dserver");
DEFINE_int32(ports_num,
1,
"The ports number for parameter send,"
" increment based on default port number");
"Number of ports for sending dense parameter,"
" following ports on parameter server will be visited"
" for sending dense parameter: [port, port+ports_num-1]");
DEFINE_int32(ports_num_for_sparse,
0,
"The ports number for parameter send,"
" increment based on default (port + ports_num)");
"Number of ports for sending sparse parameter,"
" following ports on parameter server will be visited"
" for sending sparse parameter:"
" [port+ports_num, port+ports_num+ports_num_for_sparse-1]");
DEFINE_string(nics, "xgbe0,xgbe1", "network device name for pservers");
DEFINE_string(rdma_tcp, "tcp", "use rdma or tcp rdma transport protocol");
DEFINE_int32(trainer_id,
......
......@@ -27,11 +27,14 @@ message ParameterClientConfig {
* Configuration structure for ParameterServer2.
*/
message ParameterServerConfig {
// The ports number for parameter send,
// increment based on default port number
// Number of ports for sending dense parameter,
// following ports on parameter server will be visited
// for sending dense parameter: [port, port+ports_num-1]
required int32 ports_num = 1 [default = 1];
// The ports number for parameter send,
// increment based on default (port + ports_num
// Number of ports for sending sparse parameter,
// following ports on parameter server will be visited
// for sending sparse parameter:
// [port+ports_num, port+ports_num+ports_num_for_sparse-1]
required int32 ports_num_for_sparse = 2 [default = 0];
// network device name for pservers
required string nics = 3 [default = "xgbe0,xgbe1"];
......
......@@ -11,3 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import optimizer
__all__ = ['optimizer']
import py_paddle.swig_paddle as swig_api
import paddle.trainer_config_helpers.optimizers as v1_optimizers
import paddle.trainer_config_helpers.config_parser_utils as config_parser_utils
import paddle.v2
__all__ = ['Adam', 'Adamax']
class Optimizer(object):
def __init__(self, **kwargs):
if 'batch_size' in kwargs:
del kwargs['batch_size'] # not important for python library.
def __impl__():
v1_optimizers.settings(batch_size=1, **kwargs)
self.__opt_conf_proto__ = config_parser_utils.parse_optimizer_config(
__impl__)
self.__opt_conf__ = swig_api.OptimizationConfig.createFromProto(
self.__opt_conf_proto__)
def enable_types(self):
"""
get enable_types for each optimizer.
enable_types = [value, gradient, momentum, etc]
For each optimizer(SGD, Adam), GradientMachine should enable different
buffers.
"""
tmp = swig_api.ParameterOptimizer.create(self.__opt_conf__)
assert isinstance(tmp, swig_api.ParameterOptimizer)
return tmp.getParameterTypes()
def create_local_updater(self):
return swig_api.ParameterUpdater.createLocalUpdater(self.__opt_conf__)
def create_remote_updater(self, pass_num):
return swig_api.ParameterUpdater.createRemoteUpdater(self.__opt_conf__,
pass_num)
class Adam(Optimizer):
def __init__(self, beta1=0.9, beta2=0.999, epsilon=1e-8, **kwargs):
learning_method = v1_optimizers.AdamOptimizer(
beta1=beta1, beta2=beta2, epsilon=epsilon)
super(Adam, self).__init__(learning_method=learning_method, **kwargs)
class Adamax(Optimizer):
def __init__(self, beta1=0.9, beta2=0.999, **kwargs):
learning_method = v1_optimizers.AdamaxOptimizer(
beta1=beta1, beta2=beta2)
super(Adamax, self).__init__(learning_method=learning_method, **kwargs)
if __name__ == '__main__':
swig_api.initPaddle('--use_gpu=false')
opt = paddle.v2.optimizer.Adam()
print opt.enable_types()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册