提交 65fdaca0 编写于 作者: X Xinghai Sun 提交者: GitHub

Merge pull request #144 from wanghaoshuang/ds2_pcloud

Add code adaptation to enable DS2 to run on PaddleCloud.
# Run DS2 on PaddleCloud
>Note:
>Make sure [PaddleCloud client](https://github.com/PaddlePaddle/cloud/blob/develop/doc/usage_cn.md#%E4%B8%8B%E8%BD%BD%E5%B9%B6%E9%85%8D%E7%BD%AEpaddlecloud) has be installed and current directory is `models/deep_speech_2/cloud/`
## Step-1 Configure data set
Configure your input data and output path in pcloud_submit.sh:
- `TRAIN_MANIFEST`: Absolute path of train data manifest file in local file system.This file has format as bellow:
```
{"audio_filepath": "/home/disk1/LibriSpeech/dev-clean/1272/128104/1272-128104-0000.flac", "duration": 5.855, "text
": "mister quilter is the ..."}
{"audio_filepath": "/home/disk1/LibriSpeech/dev-clean/1272/128104/1272-128104-0001.flac", "duration": 4.815, "text
": "nor is mister ..."}
```
- `TEST_MANIFEST`: Absolute path of train data manifest file in local filesystem. This file has format like `TRAIN_MANIFEST`.
- `VOCAB_FILE`: Absolute path of vocabulary file in local filesytem.
- `MEAN_STD_FILE`: Absolute path of normalizer's statistic file in local filesytem.
- `CLOUD_DATA_DIR:` Absolute path in PaddleCloud filesystem. We will upload local train data to this directory.
- `CLOUD_MODEL_DIR`: Absolute path in PaddleCloud filesystem. PaddleCloud trainer will save model to this directory.
>Note: Upload will be skipped if target file has existed in `CLOUD_DATA_DIR`.
## Step-2 Configure computation resource
Configure computation resource in pcloud_submit.sh:
```
# Configure computation resource and submit job to PaddleCloud
paddlecloud submit \
-image wanghaoshuang/pcloud_ds2:latest \
-jobname ${JOB_NAME} \
-cpu 4 \
-gpu 4 \
-memory 10Gi \
-parallelism 1 \
-pscpu 1 \
-pservers 1 \
-psmemory 10Gi \
-passes 1 \
-entry "sh pcloud_train.sh ${CLOUD_DATA_DIR} ${CLOUD_MODEL_DIR}" \
${DS2_PATH}
```
For more information, please refer to [PaddleCloud](https://github.com/PaddlePaddle/cloud/blob/develop/doc/usage_cn.md#提交任务)
## Step-3 Configure algorithm options
Configure algorithm options in pcloud_train.sh:
```
python train.py \
--use_gpu=1 \
--trainer_count=4 \
--batch_size=256 \
--mean_std_filepath=$MEAN_STD_FILE \
--train_manifest_path='./local.train.manifest' \
--dev_manifest_path='./local.test.manifest' \
--vocab_filepath=$VOCAB_PATH \
--output_model_dir=${MODEL_PATH}
```
You can get more information about algorithm options by follow command:
```
cd ..
python train.py --help
```
## Step-4 Submit job
```
$ sh pcloud_submit.sh
```
## Step-5 Get logs
```
$ paddlecloud logs -n 10000 deepspeech20170727130129
```
For more information, please refer to [PaddleCloud client](https://github.com/PaddlePaddle/cloud/blob/develop/doc/usage_cn.md#下载并配置paddlecloud) or get help by follow command:
```
paddlecloud --help
```
# Configure input data set in local filesystem
TRAIN_MANIFEST="/home/work/demo/ds2/pcloud/models/deep_speech_2/datasets/manifest.dev"
TEST_MANIFEST="/home/work/demo/ds2/pcloud/models/deep_speech_2/datasets/manifest.dev"
VOCAB_FILE="/home/work/demo/ds2/pcloud/models/deep_speech_2/datasets/vocab/eng_vocab.txt"
MEAN_STD_FILE="/home/work/demo/ds2/pcloud/models/deep_speech_2/mean_std.npz"
# Configure output path in PaddleCloud filesystem
CLOUD_DATA_DIR="/pfs/dlnel/home/demo/deepspeech2/data"
CLOUD_MODEL_DIR="/pfs/dlnel/home/demo/deepspeech2/model"
# Pack and upload local data to PaddleCloud filesystem
python upload_data.py \
--train_manifest_path=${TRAIN_MANIFEST} \
--test_manifest_path=${TEST_MANIFEST} \
--vocab_file=${VOCAB_FILE} \
--mean_std_file=${MEAN_STD_FILE} \
--cloud_data_path=${CLOUD_DATA_DIR}
if [ $? -ne 0 ]
then
echo "upload data failed!"
exit 1
fi
JOB_NAME=deepspeech`date +%Y%m%d%H%M%S`
DS2_PATH=${PWD%/*}
cp -f pcloud_train.sh ${DS2_PATH}
# Configure computation resource and submit job to PaddleCloud
paddlecloud submit \
-image bootstrapper:5000/wanghaoshuang/pcloud_ds2:latest \
-jobname ${JOB_NAME} \
-cpu 4 \
-gpu 4 \
-memory 10Gi \
-parallelism 2 \
-pscpu 1 \
-pservers 1 \
-psmemory 10Gi \
-passes 1 \
-entry "sh pcloud_train.sh ${CLOUD_DATA_DIR} ${CLOUD_MODEL_DIR}" \
${DS2_PATH}
rm ${DS2_PATH}/pcloud_train.sh
DATA_PATH=$1
MODEL_PATH=$2
TRAIN_MANI=${DATA_PATH}/cloud.train.manifest
DEV_MANI=${DATA_PATH}/cloud.test.manifest
TRAIN_TAR=${DATA_PATH}/cloud.train.tar
DEV_TAR=${DATA_PATH}/cloud.test.tar
VOCAB_PATH=${DATA_PATH}/vocab.txt
MEAN_STD_FILE=${DATA_PATH}/mean_std.npz
# split train data for each pcloud node
python ./cloud/split_data.py \
--in_manifest_path=$TRAIN_MANI \
--data_tar_path=$TRAIN_TAR \
--out_manifest_path='./local.train.manifest'
# split dev data for each pcloud node
python ./cloud/split_data.py \
--in_manifest_path=$DEV_MANI \
--data_tar_path=$DEV_TAR \
--out_manifest_path='./local.test.manifest'
python train.py \
--use_gpu=1 \
--mean_std_filepath=$MEAN_STD_FILE \
--train_manifest_path='./local.train.manifest' \
--dev_manifest_path='./local.test.manifest' \
--vocab_filepath=$VOCAB_PATH \
--output_model_dir=${MODEL_PATH}
"""This tool is used for splitting data into each node of
paddle cloud by total trainer count and current trainer id.
The meaning of trainer is a instance of k8s cluster.
This script should be called in paddle cloud.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import json
import argparse
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--in_manifest_path",
default='./cloud.train.manifest',
type=str,
help="Input manifest path. (default: %(default)s)")
parser.add_argument(
"--data_tar_path",
default='./cloud.train.tar',
type=str,
help="Data tar file path. (default: %(default)s)")
parser.add_argument(
"--out_manifest_path",
default='./local.train.manifest',
type=str,
help="Out manifest file path. (default: %(default)s)")
args = parser.parse_args()
def split_data(in_manifest, tar_path, out_manifest):
with open("/trainer_id", "r") as f:
trainer_id = int(f.readline()[:-1])
with open("/trainer_count", "r") as f:
trainer_count = int(f.readline()[:-1])
tar_path = os.path.abspath(tar_path)
result = []
for index, json_line in enumerate(open(in_manifest)):
if (index % trainer_count) == trainer_id:
json_data = json.loads(json_line)
json_data['audio_filepath'] = "tar:%s#%s" % (
tar_path, json_data['audio_filepath'])
result.append("%s\n" % json.dumps(json_data))
with open(out_manifest, 'w') as manifest:
manifest.writelines(result)
if __name__ == '__main__':
split_data(args.in_manifest_path, args.data_tar_path,
args.out_manifest_path)
"""This tool is used for preparing data for DeepSpeech2 trainning on paddle cloud.
Steps:
1. Read original manifest and get the local path of sound files.
2. Tar all local sound files into one tar file.
3. Modify original manifest to remove the local path information.
Finally, we will get a tar file and a manifest with sound file name, duration
and text.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import os
import tarfile
import sys
import argparse
import shutil
sys.path.append('../')
from data_utils.utils import read_manifest
from subprocess import call
TRAIN_TAR = "cloud.train.tar"
TRAIN_MANIFEST = "cloud.train.manifest"
TEST_TAR = "cloud.test.tar"
TEST_MANIFEST = "cloud.test.manifest"
VOCAB_FILE = "vocab.txt"
MEAN_STD_FILE = "mean_std.npz"
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--train_manifest_path",
default="../datasets/manifest.train",
type=str,
help="Manifest file of train data. (default: %(default)s)")
parser.add_argument(
"--test_manifest_path",
default="../datasets/manifest.test",
type=str,
help="Manifest file of test data. (default: %(default)s)")
parser.add_argument(
"--vocab_file",
default="../datasets/vocab/eng_vocab.txt",
type=str,
help="Vocab file to be uploaded to paddlecloud. (default: %(default)s)")
parser.add_argument(
"--mean_std_file",
default="../mean_std.npz",
type=str,
help="mean_std file to be uploaded to paddlecloud. (default: %(default)s)")
parser.add_argument(
"--cloud_data_path",
required=True,
type=str,
help="Destination path on paddlecloud. (default: %(default)s)")
args = parser.parse_args()
parser.add_argument(
"--local_tmp_path",
default="./tmp/",
type=str,
help="Local directory for storing temporary data. (default: %(default)s)")
args = parser.parse_args()
def pack_data(manifest_path, out_tar_path, out_manifest_path):
'''1. According to the manifest, tar sound files into out_tar_path
2. Generate a new manifest for output tar file
'''
out_tar = tarfile.open(out_tar_path, 'w')
manifest = read_manifest(manifest_path)
results = []
for json_data in manifest:
sound_file = json_data['audio_filepath']
filename = os.path.basename(sound_file)
out_tar.add(sound_file, arcname=filename)
json_data['audio_filepath'] = filename
results.append("%s\n" % json.dumps(json_data))
with open(out_manifest_path, 'w') as out_manifest:
out_manifest.writelines(results)
out_manifest.close()
out_tar.close()
def pcloud_cp(src, dst):
"""Copy src from local filesytem to dst in PaddleCloud filesystem.
"""
ret = call(['paddlecloud', 'cp', src, dst])
return ret
def pcloud_exist(path):
"""Check if file or directory exists in PaddleCloud filesystem.
"""
ret = call(['paddlecloud', 'ls', path])
return ret
if __name__ == '__main__':
cloud_train_manifest = os.path.join(args.cloud_data_path, TRAIN_MANIFEST)
cloud_train_tar = os.path.join(args.cloud_data_path, TRAIN_TAR)
cloud_test_manifest = os.path.join(args.cloud_data_path, TEST_MANIFEST)
cloud_test_tar = os.path.join(args.cloud_data_path, TEST_TAR)
cloud_vocab_file = os.path.join(args.cloud_data_path, VOCAB_FILE)
cloud_mean_file = os.path.join(args.cloud_data_path, MEAN_STD_FILE)
local_train_manifest = os.path.join(args.local_tmp_path, TRAIN_MANIFEST)
local_train_tar = os.path.join(args.local_tmp_path, TRAIN_TAR)
local_test_manifest = os.path.join(args.local_tmp_path, TEST_MANIFEST)
local_test_tar = os.path.join(args.local_tmp_path, TEST_TAR)
if os.path.exists(args.local_tmp_path):
shutil.rmtree(args.local_tmp_path)
os.makedirs(args.local_tmp_path)
# train data
if args.train_manifest_path != "":
ret = pcloud_exist(cloud_train_manifest)
if ret != 0:
pack_data(args.train_manifest_path, local_train_tar,
local_train_manifest)
pcloud_cp(local_train_manifest, cloud_train_manifest)
pcloud_cp(local_train_tar, cloud_train_tar)
# test data
if args.test_manifest_path != "":
ret = pcloud_exist(cloud_test_manifest)
if ret != 0:
pack_data(args.test_manifest_path, local_test_tar,
local_test_manifest)
pcloud_cp(local_test_manifest, cloud_test_manifest)
pcloud_cp(local_test_tar, cloud_test_tar)
# vocab file
if args.vocab_file != "":
ret = pcloud_exist(cloud_vocab_file)
if ret != 0:
pcloud_cp(args.vocab_file, cloud_vocab_file)
# mean_std file
if args.mean_std_file != "":
ret = pcloud_exist(cloud_mean_file)
if ret != 0:
pcloud_cp(args.mean_std_file, cloud_mean_file)
shutil.rmtree(args.local_tmp_path)
......@@ -6,15 +6,22 @@ from __future__ import division
from __future__ import print_function
import random
import numpy as np
import tarfile
import multiprocessing
import numpy as np
import paddle.v2 as paddle
from threading import local
from data_utils import utils
from data_utils.augmentor.augmentation import AugmentationPipeline
from data_utils.featurizer.speech_featurizer import SpeechFeaturizer
from data_utils.speech import SpeechSegment
from data_utils.normalizer import FeatureNormalizer
# for caching tar files info
local_data = local()
local_data.tar2info = {}
local_data.tar2object = {}
class DataGenerator(object):
"""
......@@ -87,7 +94,7 @@ class DataGenerator(object):
"""Load, augment, featurize and normalize for speech data.
:param filename: Audio filepath
:type filename: basestring
:type filename: basestring | file
:param transcript: Transcription text.
:type transcript: basestring
:return: Tuple of audio feature tensor and list of token ids for
......@@ -215,6 +222,38 @@ class DataGenerator(object):
"""
return self._speech_featurizer.vocab_list
def _parse_tar(self, file):
"""Parse a tar file to get a tarfile object
and a map containing tarinfoes
"""
result = {}
f = tarfile.open(file)
for tarinfo in f.getmembers():
result[tarinfo.name] = tarinfo
return f, result
def _get_file_object(self, file):
"""Get file object by file path.
If file startwith tar, it will return a tar file object
and cached tar file info for next reading request.
It will return file directly, if the type of file is not str.
"""
if file.startswith('tar:'):
tarpath, filename = file.split(':', 1)[1].split('#', 1)
if 'tar2info' not in local_data.__dict__:
local_data.tar2info = {}
if 'tar2object' not in local_data.__dict__:
local_data.tar2object = {}
if tarpath not in local_data.tar2info:
object, infoes = self._parse_tar(tarpath)
local_data.tar2info[tarpath] = infoes
local_data.tar2object[tarpath] = object
return local_data.tar2object[tarpath].extractfile(
local_data.tar2info[tarpath][filename])
else:
return open(file, 'r')
def _instance_reader_creator(self, manifest):
"""
Instance reader creator. Create a callable function to produce
......@@ -229,7 +268,8 @@ class DataGenerator(object):
yield instance
def mapper(instance):
return self.process_utterance(instance["audio_filepath"],
return self.process_utterance(
self._get_file_object(instance["audio_filepath"]),
instance["text"])
return paddle.reader.xmap_readers(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册