diff --git a/deep_speech_2/cloud/README.md b/deep_speech_2/cloud/README.md new file mode 100644 index 0000000000000000000000000000000000000000..8e7e49f9ea75a56d84431f80a87565fbec62bbb2 --- /dev/null +++ b/deep_speech_2/cloud/README.md @@ -0,0 +1,81 @@ +# Run DS2 on PaddleCloud + +>Note: +>Make sure [PaddleCloud client](https://github.com/PaddlePaddle/cloud/blob/develop/doc/usage_cn.md#%E4%B8%8B%E8%BD%BD%E5%B9%B6%E9%85%8D%E7%BD%AEpaddlecloud) has be installed and current directory is `models/deep_speech_2/cloud/` + +## Step-1 Configure data set + +Configure your input data and output path in pcloud_submit.sh: + +- `TRAIN_MANIFEST`: Absolute path of train data manifest file in local file system.This file has format as bellow: + +``` +{"audio_filepath": "/home/disk1/LibriSpeech/dev-clean/1272/128104/1272-128104-0000.flac", "duration": 5.855, "text +": "mister quilter is the ..."} +{"audio_filepath": "/home/disk1/LibriSpeech/dev-clean/1272/128104/1272-128104-0001.flac", "duration": 4.815, "text +": "nor is mister ..."} +``` + +- `TEST_MANIFEST`: Absolute path of train data manifest file in local filesystem. This file has format like `TRAIN_MANIFEST`. +- `VOCAB_FILE`: Absolute path of vocabulary file in local filesytem. +- `MEAN_STD_FILE`: Absolute path of normalizer's statistic file in local filesytem. +- `CLOUD_DATA_DIR:` Absolute path in PaddleCloud filesystem. We will upload local train data to this directory. +- `CLOUD_MODEL_DIR`: Absolute path in PaddleCloud filesystem. PaddleCloud trainer will save model to this directory. + +>Note: Upload will be skipped if target file has existed in `CLOUD_DATA_DIR`. + +## Step-2 Configure computation resource + +Configure computation resource in pcloud_submit.sh: + +``` +# Configure computation resource and submit job to PaddleCloud + paddlecloud submit \ + -image wanghaoshuang/pcloud_ds2:latest \ + -jobname ${JOB_NAME} \ + -cpu 4 \ + -gpu 4 \ + -memory 10Gi \ + -parallelism 1 \ + -pscpu 1 \ + -pservers 1 \ + -psmemory 10Gi \ + -passes 1 \ + -entry "sh pcloud_train.sh ${CLOUD_DATA_DIR} ${CLOUD_MODEL_DIR}" \ + ${DS2_PATH} +``` +For more information, please refer to [PaddleCloud](https://github.com/PaddlePaddle/cloud/blob/develop/doc/usage_cn.md#提交任务) + +## Step-3 Configure algorithm options +Configure algorithm options in pcloud_train.sh: +``` +python train.py \ +--use_gpu=1 \ +--trainer_count=4 \ +--batch_size=256 \ +--mean_std_filepath=$MEAN_STD_FILE \ +--train_manifest_path='./local.train.manifest' \ +--dev_manifest_path='./local.test.manifest' \ +--vocab_filepath=$VOCAB_PATH \ +--output_model_dir=${MODEL_PATH} +``` +You can get more information about algorithm options by follow command: +``` +cd .. +python train.py --help +``` + +## Step-4 Submit job +``` +$ sh pcloud_submit.sh +``` + + +## Step-5 Get logs +``` +$ paddlecloud logs -n 10000 deepspeech20170727130129 +``` +For more information, please refer to [PaddleCloud client](https://github.com/PaddlePaddle/cloud/blob/develop/doc/usage_cn.md#下载并配置paddlecloud) or get help by follow command: +``` +paddlecloud --help +``` diff --git a/deep_speech_2/cloud/pcloud_submit.sh b/deep_speech_2/cloud/pcloud_submit.sh new file mode 100644 index 0000000000000000000000000000000000000000..5ecb011bc80e9008ba796177712b57d0869610b4 --- /dev/null +++ b/deep_speech_2/cloud/pcloud_submit.sh @@ -0,0 +1,43 @@ +# Configure input data set in local filesystem +TRAIN_MANIFEST="/home/work/demo/ds2/pcloud/models/deep_speech_2/datasets/manifest.dev" +TEST_MANIFEST="/home/work/demo/ds2/pcloud/models/deep_speech_2/datasets/manifest.dev" +VOCAB_FILE="/home/work/demo/ds2/pcloud/models/deep_speech_2/datasets/vocab/eng_vocab.txt" +MEAN_STD_FILE="/home/work/demo/ds2/pcloud/models/deep_speech_2/mean_std.npz" + +# Configure output path in PaddleCloud filesystem +CLOUD_DATA_DIR="/pfs/dlnel/home/demo/deepspeech2/data" +CLOUD_MODEL_DIR="/pfs/dlnel/home/demo/deepspeech2/model" + +# Pack and upload local data to PaddleCloud filesystem +python upload_data.py \ +--train_manifest_path=${TRAIN_MANIFEST} \ +--test_manifest_path=${TEST_MANIFEST} \ +--vocab_file=${VOCAB_FILE} \ +--mean_std_file=${MEAN_STD_FILE} \ +--cloud_data_path=${CLOUD_DATA_DIR} +if [ $? -ne 0 ] +then + echo "upload data failed!" + exit 1 +fi + +JOB_NAME=deepspeech`date +%Y%m%d%H%M%S` +DS2_PATH=${PWD%/*} +cp -f pcloud_train.sh ${DS2_PATH} + +# Configure computation resource and submit job to PaddleCloud +paddlecloud submit \ +-image bootstrapper:5000/wanghaoshuang/pcloud_ds2:latest \ +-jobname ${JOB_NAME} \ +-cpu 4 \ +-gpu 4 \ +-memory 10Gi \ +-parallelism 2 \ +-pscpu 1 \ +-pservers 1 \ +-psmemory 10Gi \ +-passes 1 \ +-entry "sh pcloud_train.sh ${CLOUD_DATA_DIR} ${CLOUD_MODEL_DIR}" \ +${DS2_PATH} + +rm ${DS2_PATH}/pcloud_train.sh diff --git a/deep_speech_2/cloud/pcloud_train.sh b/deep_speech_2/cloud/pcloud_train.sh new file mode 100644 index 0000000000000000000000000000000000000000..b9a50360ac0ff512706cd82a18ccd5b58f880e7b --- /dev/null +++ b/deep_speech_2/cloud/pcloud_train.sh @@ -0,0 +1,28 @@ +DATA_PATH=$1 +MODEL_PATH=$2 +TRAIN_MANI=${DATA_PATH}/cloud.train.manifest +DEV_MANI=${DATA_PATH}/cloud.test.manifest +TRAIN_TAR=${DATA_PATH}/cloud.train.tar +DEV_TAR=${DATA_PATH}/cloud.test.tar +VOCAB_PATH=${DATA_PATH}/vocab.txt +MEAN_STD_FILE=${DATA_PATH}/mean_std.npz + +# split train data for each pcloud node +python ./cloud/split_data.py \ +--in_manifest_path=$TRAIN_MANI \ +--data_tar_path=$TRAIN_TAR \ +--out_manifest_path='./local.train.manifest' + +# split dev data for each pcloud node +python ./cloud/split_data.py \ +--in_manifest_path=$DEV_MANI \ +--data_tar_path=$DEV_TAR \ +--out_manifest_path='./local.test.manifest' + +python train.py \ +--use_gpu=1 \ +--mean_std_filepath=$MEAN_STD_FILE \ +--train_manifest_path='./local.train.manifest' \ +--dev_manifest_path='./local.test.manifest' \ +--vocab_filepath=$VOCAB_PATH \ +--output_model_dir=${MODEL_PATH} diff --git a/deep_speech_2/cloud/split_data.py b/deep_speech_2/cloud/split_data.py new file mode 100644 index 0000000000000000000000000000000000000000..6b0754a80cbb9420f5aa0dce60e30ac0c166d5fa --- /dev/null +++ b/deep_speech_2/cloud/split_data.py @@ -0,0 +1,52 @@ +"""This tool is used for splitting data into each node of +paddle cloud by total trainer count and current trainer id. +The meaning of trainer is a instance of k8s cluster. +This script should be called in paddle cloud. +""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +import os +import json +import argparse + +parser = argparse.ArgumentParser(description=__doc__) +parser.add_argument( + "--in_manifest_path", + default='./cloud.train.manifest', + type=str, + help="Input manifest path. (default: %(default)s)") +parser.add_argument( + "--data_tar_path", + default='./cloud.train.tar', + type=str, + help="Data tar file path. (default: %(default)s)") +parser.add_argument( + "--out_manifest_path", + default='./local.train.manifest', + type=str, + help="Out manifest file path. (default: %(default)s)") +args = parser.parse_args() + + +def split_data(in_manifest, tar_path, out_manifest): + with open("/trainer_id", "r") as f: + trainer_id = int(f.readline()[:-1]) + with open("/trainer_count", "r") as f: + trainer_count = int(f.readline()[:-1]) + + tar_path = os.path.abspath(tar_path) + result = [] + for index, json_line in enumerate(open(in_manifest)): + if (index % trainer_count) == trainer_id: + json_data = json.loads(json_line) + json_data['audio_filepath'] = "tar:%s#%s" % ( + tar_path, json_data['audio_filepath']) + result.append("%s\n" % json.dumps(json_data)) + with open(out_manifest, 'w') as manifest: + manifest.writelines(result) + + +if __name__ == '__main__': + split_data(args.in_manifest_path, args.data_tar_path, + args.out_manifest_path) diff --git a/deep_speech_2/cloud/upload_data.py b/deep_speech_2/cloud/upload_data.py new file mode 100644 index 0000000000000000000000000000000000000000..3336f722b478658a2751f3a0ac3a4a65ded7fa98 --- /dev/null +++ b/deep_speech_2/cloud/upload_data.py @@ -0,0 +1,147 @@ +"""This tool is used for preparing data for DeepSpeech2 trainning on paddle cloud. + +Steps: +1. Read original manifest and get the local path of sound files. +2. Tar all local sound files into one tar file. +3. Modify original manifest to remove the local path information. + +Finally, we will get a tar file and a manifest with sound file name, duration +and text. +""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +import json +import os +import tarfile +import sys +import argparse +import shutil +sys.path.append('../') +from data_utils.utils import read_manifest +from subprocess import call + +TRAIN_TAR = "cloud.train.tar" +TRAIN_MANIFEST = "cloud.train.manifest" +TEST_TAR = "cloud.test.tar" +TEST_MANIFEST = "cloud.test.manifest" +VOCAB_FILE = "vocab.txt" +MEAN_STD_FILE = "mean_std.npz" + +parser = argparse.ArgumentParser(description=__doc__) +parser.add_argument( + "--train_manifest_path", + default="../datasets/manifest.train", + type=str, + help="Manifest file of train data. (default: %(default)s)") +parser.add_argument( + "--test_manifest_path", + default="../datasets/manifest.test", + type=str, + help="Manifest file of test data. (default: %(default)s)") +parser.add_argument( + "--vocab_file", + default="../datasets/vocab/eng_vocab.txt", + type=str, + help="Vocab file to be uploaded to paddlecloud. (default: %(default)s)") +parser.add_argument( + "--mean_std_file", + default="../mean_std.npz", + type=str, + help="mean_std file to be uploaded to paddlecloud. (default: %(default)s)") +parser.add_argument( + "--cloud_data_path", + required=True, + type=str, + help="Destination path on paddlecloud. (default: %(default)s)") +args = parser.parse_args() + +parser.add_argument( + "--local_tmp_path", + default="./tmp/", + type=str, + help="Local directory for storing temporary data. (default: %(default)s)") +args = parser.parse_args() + + +def pack_data(manifest_path, out_tar_path, out_manifest_path): + '''1. According to the manifest, tar sound files into out_tar_path + 2. Generate a new manifest for output tar file + ''' + out_tar = tarfile.open(out_tar_path, 'w') + manifest = read_manifest(manifest_path) + results = [] + for json_data in manifest: + sound_file = json_data['audio_filepath'] + filename = os.path.basename(sound_file) + out_tar.add(sound_file, arcname=filename) + json_data['audio_filepath'] = filename + results.append("%s\n" % json.dumps(json_data)) + with open(out_manifest_path, 'w') as out_manifest: + out_manifest.writelines(results) + out_manifest.close() + out_tar.close() + + +def pcloud_cp(src, dst): + """Copy src from local filesytem to dst in PaddleCloud filesystem. + """ + ret = call(['paddlecloud', 'cp', src, dst]) + return ret + + +def pcloud_exist(path): + """Check if file or directory exists in PaddleCloud filesystem. + """ + ret = call(['paddlecloud', 'ls', path]) + return ret + + +if __name__ == '__main__': + cloud_train_manifest = os.path.join(args.cloud_data_path, TRAIN_MANIFEST) + cloud_train_tar = os.path.join(args.cloud_data_path, TRAIN_TAR) + cloud_test_manifest = os.path.join(args.cloud_data_path, TEST_MANIFEST) + cloud_test_tar = os.path.join(args.cloud_data_path, TEST_TAR) + cloud_vocab_file = os.path.join(args.cloud_data_path, VOCAB_FILE) + cloud_mean_file = os.path.join(args.cloud_data_path, MEAN_STD_FILE) + + local_train_manifest = os.path.join(args.local_tmp_path, TRAIN_MANIFEST) + local_train_tar = os.path.join(args.local_tmp_path, TRAIN_TAR) + local_test_manifest = os.path.join(args.local_tmp_path, TEST_MANIFEST) + local_test_tar = os.path.join(args.local_tmp_path, TEST_TAR) + + if os.path.exists(args.local_tmp_path): + shutil.rmtree(args.local_tmp_path) + os.makedirs(args.local_tmp_path) + + # train data + if args.train_manifest_path != "": + ret = pcloud_exist(cloud_train_manifest) + if ret != 0: + pack_data(args.train_manifest_path, local_train_tar, + local_train_manifest) + pcloud_cp(local_train_manifest, cloud_train_manifest) + pcloud_cp(local_train_tar, cloud_train_tar) + + # test data + if args.test_manifest_path != "": + ret = pcloud_exist(cloud_test_manifest) + if ret != 0: + pack_data(args.test_manifest_path, local_test_tar, + local_test_manifest) + pcloud_cp(local_test_manifest, cloud_test_manifest) + pcloud_cp(local_test_tar, cloud_test_tar) + + # vocab file + if args.vocab_file != "": + ret = pcloud_exist(cloud_vocab_file) + if ret != 0: + pcloud_cp(args.vocab_file, cloud_vocab_file) + + # mean_std file + if args.mean_std_file != "": + ret = pcloud_exist(cloud_mean_file) + if ret != 0: + pcloud_cp(args.mean_std_file, cloud_mean_file) + + shutil.rmtree(args.local_tmp_path) diff --git a/deep_speech_2/data_utils/data.py b/deep_speech_2/data_utils/data.py index 14b02f993df53a3731864143ff11b87f7d52217b..f404b4fa7bd658484c20d26de7accd42a3bfacbc 100644 --- a/deep_speech_2/data_utils/data.py +++ b/deep_speech_2/data_utils/data.py @@ -6,15 +6,22 @@ from __future__ import division from __future__ import print_function import random -import numpy as np +import tarfile import multiprocessing +import numpy as np import paddle.v2 as paddle +from threading import local from data_utils import utils from data_utils.augmentor.augmentation import AugmentationPipeline from data_utils.featurizer.speech_featurizer import SpeechFeaturizer from data_utils.speech import SpeechSegment from data_utils.normalizer import FeatureNormalizer +# for caching tar files info +local_data = local() +local_data.tar2info = {} +local_data.tar2object = {} + class DataGenerator(object): """ @@ -46,7 +53,7 @@ class DataGenerator(object): :param specgram_type: Specgram feature type. Options: 'linear'. :type specgram_type: str :param use_dB_normalization: Whether to normalize the audio to -20 dB - before extracting the features. + before extracting the features. :type use_dB_normalization: bool :param num_threads: Number of CPU threads for processing data. :type num_threads: int @@ -87,7 +94,7 @@ class DataGenerator(object): """Load, augment, featurize and normalize for speech data. :param filename: Audio filepath - :type filename: basestring + :type filename: basestring | file :param transcript: Transcription text. :type transcript: basestring :return: Tuple of audio feature tensor and list of token ids for @@ -215,6 +222,38 @@ class DataGenerator(object): """ return self._speech_featurizer.vocab_list + def _parse_tar(self, file): + """Parse a tar file to get a tarfile object + and a map containing tarinfoes + """ + result = {} + f = tarfile.open(file) + for tarinfo in f.getmembers(): + result[tarinfo.name] = tarinfo + return f, result + + def _get_file_object(self, file): + """Get file object by file path. + + If file startwith tar, it will return a tar file object + and cached tar file info for next reading request. + It will return file directly, if the type of file is not str. + """ + if file.startswith('tar:'): + tarpath, filename = file.split(':', 1)[1].split('#', 1) + if 'tar2info' not in local_data.__dict__: + local_data.tar2info = {} + if 'tar2object' not in local_data.__dict__: + local_data.tar2object = {} + if tarpath not in local_data.tar2info: + object, infoes = self._parse_tar(tarpath) + local_data.tar2info[tarpath] = infoes + local_data.tar2object[tarpath] = object + return local_data.tar2object[tarpath].extractfile( + local_data.tar2info[tarpath][filename]) + else: + return open(file, 'r') + def _instance_reader_creator(self, manifest): """ Instance reader creator. Create a callable function to produce @@ -229,8 +268,9 @@ class DataGenerator(object): yield instance def mapper(instance): - return self.process_utterance(instance["audio_filepath"], - instance["text"]) + return self.process_utterance( + self._get_file_object(instance["audio_filepath"]), + instance["text"]) return paddle.reader.xmap_readers( mapper, reader, self._num_threads, 1024, order=True)