提交 e5325c84 编写于 作者: S seiriosPlus

fix paddle errors and unused files

上级 669efb98
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/distributed_ops/split_byref_op.h"
#include "paddle/fluid/operators/split_op.h"
namespace paddle {
namespace operators {
using framework::Tensor;
class SplitByrefOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext *ctx) const override {
PADDLE_ENFORCE(ctx->HasInput("X"),
"Input(X) of SplitOp should not be null.");
PADDLE_ENFORCE_GE(ctx->Outputs("Out").size(), 1UL,
"Outputs(Out) of SplitOp should not be empty.");
auto in_dims = ctx->GetInputDim("X");
auto outs_names = ctx->Outputs("Out");
size_t num = static_cast<size_t>(ctx->Attrs().Get<int>("num"));
auto sections = ctx->Attrs().Get<std::vector<int>>("sections");
const size_t outs_number = outs_names.size();
std::vector<framework::DDim> outs_dims;
outs_dims.reserve(outs_number);
if (num > 0) {
int64_t in_axis_dim = 0;
if (ctx->IsRuntime()) {
in_axis_dim = in_dims[0];
}
PADDLE_ENFORCE_EQ(in_axis_dim % num, 0,
"tensor split does not result"
" in an equal division");
size_t out_axis_dim = in_axis_dim / num;
for (size_t i = 0; i < outs_number; ++i) {
auto dim = in_dims;
dim[0] = out_axis_dim;
outs_dims.push_back(dim);
}
} else if (sections.size() > 0) {
PADDLE_ENFORCE_EQ(sections.size(), outs_number,
"tensor split sections size"
"should be equal to output size.");
for (size_t i = 0; i < outs_number; ++i) {
auto dim = in_dims;
dim[0] = sections[i];
outs_dims.push_back(dim);
}
}
ctx->SetOutputsDim("Out", outs_dims);
}
};
class SplitByrefOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddInput("X", "(Tensor) Input tensor of the split operator.");
AddOutput("Out", "(Tensor) Output tensors of the split operator.")
.AsDuplicable();
AddComment(R"DOC(
SplitByref operator
Split source tensor to sevaral tensors by axis 0. No copy in this operator
is performed, output tensor shares the same blocks of memory.
)DOC");
AddAttr<std::vector<int>>("sections",
"(vector<int>) "
"the length of each output along the "
"specified axis.")
.SetDefault(std::vector<int>{});
AddAttr<int>("num",
"(int, default 0)"
"Number of sub-tensors. This must evenly divide "
"Input.dims()[axis]")
.SetDefault(0);
}
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
// NOTE: concat op default axis must be 0!
USE_CPU_ONLY_OP(concat);
REGISTER_OPERATOR(split_byref, ops::SplitByrefOp, ops::SplitByrefOpMaker,
ops::SplitGradMaker<paddle::framework::OpDesc>,
ops::SplitGradMaker<paddle::imperative::OpBase>);
REGISTER_OP_CPU_KERNEL(
split_byref, ops::SplitByrefOpKernel<paddle::platform::CPUPlace, float>);
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/distributed_ops/split_byref_op.h"
namespace ops = paddle::operators;
REGISTER_OP_CUDA_KERNEL(
split_byref,
ops::SplitByrefOpKernel<paddle::platform::CUDADeviceContext, float>);
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <vector>
#include "paddle/fluid/framework/op_registry.h"
namespace paddle {
namespace operators {
template <typename DeviceContext, typename T>
class SplitByrefOpKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& ctx) const override {
auto* in = ctx.Input<framework::Tensor>("X");
auto outs = ctx.MultiOutput<framework::Tensor>("Out");
auto place = ctx.GetPlace();
size_t row_offset = 0;
for (size_t i = 0; i < outs.size(); ++i) {
// NOTE: no need to call mutable_data here to allocate memory.
auto* out = outs[i];
VLOG(3) << "spliting by ref: " << row_offset << " " << out->dims()[0];
*out = in->Slice(row_offset, row_offset + out->dims()[0]);
row_offset += out->dims()[0];
}
}
};
} // namespace operators
} // namespace paddle
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/operators/distributed_ops/split_ids_op.h"
#include <memory>
namespace paddle {
namespace operators {
class SplitIdsOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddInput("Ids", "(LoDTensor) the input ids with shape{batch_num, 1}")
.AsDuplicable();
AddOutput("Out", "(LoDTensors) The outputs of the input Ids.")
.AsDuplicable();
AddComment(R"DOC(
Split a LoDTensor of Ids into multi LoDTensors, the number is pserver's number
Example:
Input:
X = [[1,2,3,4,5,6],[2,3]]
Out(3 output):
if compress is True:
out0 = [3, 3, 6]
out1 = [1, 4]
out2 = [2, 2, 5]
else:
out0 = [3, 6]
out1 = [1, 4]
out2 = [2, 5]
)DOC");
}
};
class SplitIdsOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext *ctx) const override {
PADDLE_ENFORCE(ctx->HasInputs("Ids"), "SplitIdsOp must have input Ids.");
PADDLE_ENFORCE(ctx->HasOutputs("Out"), "SplitIdsOp must have output Out.");
auto ids_var_type = ctx->GetInputsVarType("Ids").front();
auto ids_dims = ctx->GetInputsDim("Ids");
if (ids_var_type == framework::proto::VarType::LOD_TENSOR) {
PADDLE_ENFORCE_EQ(ids_dims[0].size(), 2);
}
}
protected:
framework::OpKernelType GetExpectedKernelType(
const framework::ExecutionContext &ctx) const override {
return framework::OpKernelType(
OperatorWithKernel::IndicateVarDataType(ctx, "Ids"), ctx.GetPlace());
}
};
class SplitIdsOpInferVarType : public framework::VarTypeInference {
public:
void operator()(framework::InferVarTypeContext *ctx) const override {
auto input_type = ctx->GetInputType("Ids");
ctx->SetOutputType("Out", input_type, framework::ALL_ELEMENTS);
}
};
} // namespace operators
} // namespace paddle
namespace ops = paddle::operators;
REGISTER_OPERATOR(split_ids, ops::SplitIdsOp, ops::SplitIdsOpMaker,
ops::SplitIdsOpInferVarType);
REGISTER_OP_CPU_KERNEL(
split_ids, ops::SplitIdsOpKernel<paddle::platform::CPUPlace, int64_t>,
ops::SplitIdsOpKernel<paddle::platform::CPUPlace, float>);
/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <iterator>
#include <set>
#include <unordered_map>
#include <vector>
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/selected_rows_functor.h"
namespace paddle {
namespace operators {
template <typename DeviceContext, typename T>
class SplitIdsOpKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &ctx) const override {
auto place = ctx.GetPlace();
if (!platform::is_cpu_place(place)) {
PADDLE_THROW("SplitIds do not support GPU kernel");
}
const auto ids_vars = ctx.MultiInputVar("Ids");
PADDLE_ENFORCE_GT(ids_vars.size(), 0, "The number of Ids should > 0");
auto *ids_var = ids_vars[0];
if (ids_var->IsType<framework::LoDTensor>()) {
int batch_size = 0;
const auto ids_tensors = ctx.MultiInput<framework::LoDTensor>("Ids");
for (size_t i = 0; i < ids_tensors.size(); ++i) {
batch_size += ids_tensors[i]->dims()[0];
}
VLOG(4) << "Get Total BatchSize is: " << batch_size;
std::vector<T> all_ids(batch_size);
int offset = 0;
for (size_t i = 0; i < ids_tensors.size(); ++i) {
const auto *ids = ids_tensors[i];
std::memcpy(all_ids.data() + offset, ids->data<T>(),
ids->numel() * sizeof(T));
offset += ids->numel();
}
std::set<T> st(all_ids.begin(), all_ids.end());
all_ids.assign(st.begin(), st.end());
auto outs = ctx.MultiOutput<framework::LoDTensor>("Out");
const size_t shard_num = outs.size();
std::vector<std::vector<T>> out_ids;
out_ids.resize(outs.size());
// split id by their shard_num.
for (size_t i = 0; i < all_ids.size(); ++i) {
T id = all_ids[i];
size_t shard_id = static_cast<size_t>(id) % shard_num;
out_ids[shard_id].push_back(id);
}
// create tensor for each shard and send to parameter server
for (size_t i = 0; i < out_ids.size(); ++i) {
auto *shard_t = outs[i];
std::vector<T> ids = out_ids[i];
auto *shard_data = shard_t->mutable_data<T>(
framework::make_ddim({static_cast<int64_t>(ids.size()), 1}), place);
for (size_t i = 0; i < ids.size(); ++i) {
shard_data[i] = ids[i];
}
}
} else if (ids_var->IsType<framework::SelectedRows>()) {
const auto *ids_selected_rows = ctx.Input<framework::SelectedRows>("Ids");
auto &ids_dims = ids_selected_rows->value().dims();
PADDLE_ENFORCE_EQ(ids_dims[0],
static_cast<int64_t>(ids_selected_rows->rows().size()),
"");
const T *ids_data = ids_selected_rows->value().data<T>();
const auto &ids_rows = ids_selected_rows->rows();
auto outs = ctx.MultiOutput<framework::SelectedRows>("Out");
const size_t shard_num = outs.size();
for (auto &out : outs) {
out->mutable_rows()->clear();
}
// get rows for outputs
std::unordered_map<int64_t, size_t> id_to_index;
for (size_t i = 0; i < ids_rows.size(); ++i) {
id_to_index[ids_rows[i]] = i;
size_t shard_id = static_cast<size_t>(ids_rows[i]) % shard_num;
outs[shard_id]->mutable_rows()->push_back(ids_rows[i]);
}
int64_t row_width = ids_dims[1];
for (auto &out : outs) {
out->set_height(ids_selected_rows->height());
framework::DDim ddim = framework::make_ddim(
{static_cast<int64_t>(out->rows().size()), row_width});
T *output = out->mutable_value()->mutable_data<T>(ddim, place);
for (int64_t i = 0; i < ddim[0]; ++i) {
memcpy(output + i * row_width,
ids_data + id_to_index[out->rows()[i]] * row_width,
row_width * sizeof(T));
}
}
} else {
PADDLE_THROW(
"% should be LoDTensor or SelectedRows, but the received type is %s",
ctx.InputNames("Ids")[0], framework::ToTypeName(ids_var->Type()));
}
}
};
} // namespace operators
} // namespace paddle
...@@ -25,8 +25,6 @@ from .quantize import * ...@@ -25,8 +25,6 @@ from .quantize import *
from . import reader from . import reader
from .reader import * from .reader import *
from . import slim from . import slim
from . import utils
from .utils import *
from . import extend_optimizer from . import extend_optimizer
from .extend_optimizer import * from .extend_optimizer import *
from . import model_stat from . import model_stat
...@@ -42,7 +40,6 @@ __all__ += memory_usage_calc.__all__ ...@@ -42,7 +40,6 @@ __all__ += memory_usage_calc.__all__
__all__ += op_frequence.__all__ __all__ += op_frequence.__all__
__all__ += quantize.__all__ __all__ += quantize.__all__
__all__ += reader.__all__ __all__ += reader.__all__
__all__ += utils.__all__
__all__ += extend_optimizer.__all__ __all__ += extend_optimizer.__all__
__all__ += ['mixed_precision'] __all__ += ['mixed_precision']
__all__ += layers.__all__ __all__ += layers.__all__
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from . import lookup_table_utils
from .lookup_table_utils import *
from . import hdfs_utils
from .hdfs_utils import *
__all__ = []
__all__ += lookup_table_utils.__all__
__all__ += hdfs_utils.__all__
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""hdfs_utils.py will move to fluid/incubate/fleet/utils/hdfs.py"""
import os
import sys
import subprocess
import multiprocessing
from datetime import datetime
import re
import copy
import errno
import logging
from paddle.fluid.log_helper import get_logger
__all__ = ["HDFSClient", "multi_download", "multi_upload"]
_logger = get_logger(
__name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s')
class HDFSClient(object):
"""
A tool of HDFS
Args:
hadoop_home (string): hadoop_home
configs (dict): hadoop config, it is a dict, please contain \
key "fs.default.name" and "hadoop.job.ugi"
Can be a float value
Examples:
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
client.ls("/user/com/train-25")
files = client.lsr("/user/com/train-25/models")
"""
def __init__(self, hadoop_home, configs):
self.pre_commands = []
hadoop_bin = '%s/bin/hadoop' % hadoop_home
self.pre_commands.append(hadoop_bin)
dfs = 'fs'
self.pre_commands.append(dfs)
for k, v in configs.items():
config_command = '-D%s=%s' % (k, v)
self.pre_commands.append(config_command)
def __run_hdfs_cmd(self, commands, retry_times=5):
whole_commands = copy.deepcopy(self.pre_commands)
whole_commands.extend(commands)
print('Running system command: {0}'.format(' '.join(whole_commands)))
ret_code = 0
ret_out = None
ret_err = None
whole_commands = " ".join(whole_commands)
for x in range(retry_times + 1):
proc = subprocess.Popen(
whole_commands,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
(output, errors) = proc.communicate()
ret_code, ret_out, ret_err = proc.returncode, output, errors
if ret_code:
_logger.warn(
'Times: %d, Error running command: %s. Return code: %d, Error: %s'
% (x, ' '.join(whole_commands), proc.returncode, errors))
else:
break
return ret_code, ret_out, ret_err
def upload(self, hdfs_path, local_path, overwrite=False, retry_times=5):
"""
upload the local file to hdfs
Args:
hdfs_path(str): the hdfs file path
local_path(str): the local file path
overwrite(bool|None): will overwrite the file on HDFS or not
retry_times(int|5): retry times
Returns:
True or False
"""
assert hdfs_path is not None
assert local_path is not None and os.path.exists(local_path)
if os.path.isdir(local_path):
_logger.warn(
"The Local path: {} is dir and I will support it later, return".
format(local_path))
return False
base = os.path.basename(local_path)
if not self.is_exist(hdfs_path):
self.makedirs(hdfs_path)
else:
if self.is_exist(os.path.join(hdfs_path, base)):
if overwrite:
_logger.error(
"The HDFS path: {} is exist and overwrite is True, delete it".
format(hdfs_path))
self.delete(hdfs_path)
else:
_logger.error(
"The HDFS path: {} is exist and overwrite is False, return".
format(hdfs_path))
return False
put_commands = ["-put", local_path, hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(put_commands,
retry_times)
if returncode:
_logger.error("Put local path: {} to HDFS path: {} failed".format(
local_path, hdfs_path))
return False
else:
_logger.info("Put local path: {} to HDFS path: {} successfully".
format(local_path, hdfs_path))
return True
def download(self, hdfs_path, local_path, overwrite=False, unzip=False):
"""
download file from HDFS
Args:
hdfs_path(str): the hdfs file path
local_path(str): the local file path
overwrite(bool|None): will overwrite the file on HDFS or not
unzip(bool|False): if the download file is compressed by zip, unzip it or not.
Returns:
True or False
"""
_logger.info('Downloading %r to %r.', hdfs_path, local_path)
_logger.info('Download of %s to %r complete.', hdfs_path, local_path)
if not self.is_exist(hdfs_path):
print("HDFS path: {} do not exist".format(hdfs_path))
return False
if self.is_dir(hdfs_path):
_logger.error(
"The HDFS path: {} is dir and I will support it later, return".
format(hdfs_path))
if os.path.exists(local_path):
base = os.path.basename(hdfs_path)
local_file = os.path.join(local_path, base)
if os.path.exists(local_file):
if overwrite:
os.remove(local_file)
else:
_logger.error(
"The Local path: {} is exist and overwrite is False, return".
format(local_file))
return False
self.make_local_dirs(local_path)
download_commands = ["-get", hdfs_path, local_path]
returncode, output, errors = self.__run_hdfs_cmd(download_commands)
if returncode:
_logger.error("Get local path: {} from HDFS path: {} failed".format(
local_path, hdfs_path))
return False
else:
_logger.info("Get local path: {} from HDFS path: {} successfully".
format(local_path, hdfs_path))
return True
def is_exist(self, hdfs_path=None):
"""
whether the remote HDFS path exists
Args:
hdfs_path(str): the hdfs file path
Returns:
True or False
"""
exist_cmd = ['-test', '-e', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(
exist_cmd, retry_times=1)
if returncode:
_logger.error("HDFS is_exist HDFS path: {} failed".format(
hdfs_path))
return False
else:
_logger.info("HDFS is_exist HDFS path: {} successfully".format(
hdfs_path))
return True
def is_dir(self, hdfs_path=None):
"""
whether the remote HDFS path is directory
Args:
hdfs_path(str): the hdfs file path
Returns:
True or False
"""
if not self.is_exist(hdfs_path):
return False
dir_cmd = ['-test', '-d', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(dir_cmd, retry_times=1)
if returncode:
_logger.error("HDFS path: {} failed is not a directory".format(
hdfs_path))
return False
else:
_logger.info("HDFS path: {} successfully is a directory".format(
hdfs_path))
return True
def delete(self, hdfs_path):
"""
Remove a file or directory from HDFS.
whether the remote HDFS path exists
Args:
hdfs_path: HDFS path.
Returns:
True or False
This function returns `True` if the deletion was successful and `False` if
no file or directory previously existed at `hdfs_path`.
"""
_logger.info('Deleting %r.', hdfs_path)
if not self.is_exist(hdfs_path):
_logger.warn("HDFS path: {} do not exist".format(hdfs_path))
return True
if self.is_dir(hdfs_path):
del_cmd = ['-rmr', hdfs_path]
else:
del_cmd = ['-rm', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(del_cmd, retry_times=0)
if returncode:
_logger.error("HDFS path: {} delete files failure".format(
hdfs_path))
return False
else:
_logger.info("HDFS path: {} delete files successfully".format(
hdfs_path))
return True
def rename(self, hdfs_src_path, hdfs_dst_path, overwrite=False):
"""
Move a file or folder on HDFS.
Args:
hdfs_path(str): HDFS path.
overwrite(bool|False): If the path already exists and overwrite is False, will return False.
Returns:
True or False
"""
assert hdfs_src_path is not None
assert hdfs_dst_path is not None
if not self.is_exist(hdfs_src_path):
_logger.info("HDFS path do not exist: {}".format(hdfs_src_path))
if self.is_exist(hdfs_dst_path) and not overwrite:
_logger.error("HDFS path is exist: {} and overwrite=False".format(
hdfs_dst_path))
rename_command = ['-mv', hdfs_src_path, hdfs_dst_path]
returncode, output, errors = self.__run_hdfs_cmd(
rename_command, retry_times=1)
if returncode:
_logger.error("HDFS rename path: {} to {} failed".format(
hdfs_src_path, hdfs_dst_path))
return False
else:
_logger.info("HDFS rename path: {} to {} successfully".format(
hdfs_src_path, hdfs_dst_path))
return True
@staticmethod
def make_local_dirs(local_path):
"""
create a directory local, is same to mkdir
Args:
local_path: local path that wants to create a directory.
"""
try:
os.makedirs(local_path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def makedirs(self, hdfs_path):
"""
Create a remote directory, recursively if necessary.
Args:
hdfs_path(str): Remote path. Intermediate directories will be created appropriately.
Returns:
True or False
"""
_logger.info('Creating directories to %r.', hdfs_path)
assert hdfs_path is not None
if self.is_exist(hdfs_path):
_logger.error("HDFS path is exist: {}".format(hdfs_path))
return
mkdirs_commands = ['-mkdir', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(
mkdirs_commands, retry_times=1)
if returncode:
_logger.error("HDFS mkdir path: {} failed".format(hdfs_path))
return False
else:
_logger.error("HDFS mkdir path: {} successfully".format(hdfs_path))
return True
def ls(self, hdfs_path):
"""
ls directory contents about HDFS hdfs_path
Args:
hdfs_path(str): Remote HDFS path will be ls.
Returns:
List: a contents list about hdfs_path.
"""
assert hdfs_path is not None
if not self.is_exist(hdfs_path):
return []
ls_commands = ['-ls', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(
ls_commands, retry_times=1)
if returncode:
_logger.error("HDFS list path: {} failed".format(hdfs_path))
return []
else:
_logger.info("HDFS list path: {} successfully".format(hdfs_path))
ret_lines = []
regex = re.compile('\s+')
out_lines = output.strip().split("\n")
for line in out_lines:
re_line = regex.split(line)
if len(re_line) == 8:
ret_lines.append(re_line[7])
return ret_lines
def lsr(self, hdfs_path, only_file=True, sort=True):
"""
list directory contents about HDFS hdfs_path recursively
Args:
hdfs_path(str): Remote HDFS path.
only_file(bool|True): will discard folders.
sort(bool|True): will be sorted by create time.
Returns:
List: a contents list about hdfs_path.
"""
def sort_by_time(v1, v2):
v1_time = datetime.strptime(v1[1], '%Y-%m-%d %H:%M')
v2_time = datetime.strptime(v2[1], '%Y-%m-%d %H:%M')
return v1_time > v2_time
assert hdfs_path is not None
if not self.is_exist(hdfs_path):
return []
ls_commands = ['-lsr', hdfs_path]
returncode, output, errors = self.__run_hdfs_cmd(
ls_commands, retry_times=1)
if returncode:
_logger.error("HDFS list all files: {} failed".format(hdfs_path))
return []
else:
_logger.info("HDFS list all files: {} successfully".format(
hdfs_path))
lines = []
regex = re.compile('\s+')
out_lines = output.strip().split("\n")
for line in out_lines:
re_line = regex.split(line)
if len(re_line) == 8:
if only_file and re_line[0][0] == "d":
continue
else:
lines.append(
(re_line[7], re_line[5] + " " + re_line[6]))
if sort:
sorted(lines, cmp=sort_by_time)
ret_lines = [ret[0] for ret in lines]
return ret_lines
def multi_download(client,
hdfs_path,
local_path,
trainer_id,
trainers,
multi_processes=5):
"""
Download files from HDFS using multi process.
Args:
client(HDFSClient): instance of HDFSClient
hdfs_path(str): path on hdfs
local_path(str): path on local
trainer_id(int): current trainer id
trainers(int): all trainers number
multi_processes(int|5): the download data process at the same time, default=5
Returns:
List:
Download files in local folder.
"""
def __subprocess_download(datas):
for data in datas:
re_path = os.path.relpath(os.path.dirname(data), hdfs_path)
if re_path == os.curdir:
sub_local_re_path = local_path
else:
sub_local_re_path = os.path.join(local_path, re_path)
client.download(data, sub_local_re_path)
assert isinstance(client, HDFSClient)
client.make_local_dirs(local_path)
_logger.info("Make local dir {} successfully".format(local_path))
all_need_download = client.lsr(hdfs_path, sort=True)
need_download = all_need_download[trainer_id::trainers]
_logger.info("Get {} files From all {} files need to be download from {}".
format(len(need_download), len(all_need_download), hdfs_path))
_logger.info("Start {} multi process to download datas".format(
multi_processes))
procs = []
for i in range(multi_processes):
process_datas = need_download[i::multi_processes]
p = multiprocessing.Process(
target=__subprocess_download, args=(process_datas, ))
procs.append(p)
p.start()
# complete the processes
for proc in procs:
proc.join()
_logger.info("Finish {} multi process to download datas".format(
multi_processes))
local_downloads = []
for data in need_download:
data_name = os.path.basename(data)
re_path = os.path.relpath(os.path.dirname(data), hdfs_path)
if re_path == os.curdir:
local_re_path = os.path.join(local_path, data_name)
else:
local_re_path = os.path.join(local_path, re_path, data_name)
local_downloads.append(local_re_path)
return local_downloads
def getfilelist(path):
rlist = []
for dir, folder, file in os.walk(path):
for i in file:
t = os.path.join(dir, i)
rlist.append(t)
for r in rlist:
print(r)
def multi_upload(client,
hdfs_path,
local_path,
multi_processes=5,
overwrite=False,
sync=True):
"""
Upload files to HDFS using multi process.
Args:
client(HDFSClient): instance of HDFSClient
hdfs_path(str): path on hdfs
local_path(str): path on local
multi_processes(int|5): the upload data process at the same time, default=5
overwrite(bool|False): will overwrite file on HDFS or not
sync(bool|True): upload files sync or not.
Returns:
None
"""
def __subprocess_upload(datas):
for data in datas:
re_path = os.path.relpath(os.path.dirname(data), local_path)
hdfs_re_path = os.path.join(hdfs_path, re_path)
client.upload(hdfs_re_path, data, overwrite, retry_times=5)
def get_local_files(path):
rlist = []
if not os.path.isdir(path):
return rlist
for dirname, folder, files in os.walk(path):
for i in files:
t = os.path.join(dirname, i)
rlist.append(t)
return rlist
assert isinstance(client, HDFSClient)
all_files = get_local_files(local_path)
if not all_files:
_logger.info("there are nothing need to upload, exit")
return
_logger.info("Start {} multi process to upload datas".format(
multi_processes))
procs = []
for i in range(multi_processes):
process_datas = all_files[i::multi_processes]
p = multiprocessing.Process(
target=__subprocess_upload, args=(process_datas, ))
procs.append(p)
p.start()
# complete the processes
for proc in procs:
proc.join()
_logger.info("Finish {} multi process to upload datas".format(
multi_processes))
if __name__ == "__main__":
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
client.ls("/user/com/train-25")
files = client.lsr("/user/com/train-25/models")
downloads = multi_download(
client,
"/user/com/train-25/model",
"/home/xx/data1",
1,
5,
100,
multi_processes=5)
multi_upload(client, "/user/com/train-25/model", "/home/xx/data1")
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""lookup_table_utils.py will move to fluid/incubate/fleet/utils/lookup_table.py"""
from __future__ import print_function
import os
import time
import logging
import paddle
from paddle.fluid import core
from paddle.fluid import io
from paddle.fluid import Program
from paddle.fluid.log_helper import get_logger
__all__ = [
"load_persistables_for_increment", "load_persistables_for_inference",
"convert_dist_to_sparse_program"
]
_logger = get_logger(
'lookup_table_utils',
logging.INFO,
fmt='%(asctime)s-%(levelname)s: %(message)s')
model_filename = "__model__"
lookup_table_dir = "__lookup_table__"
def __insert_lookup_sparse_table_op(main_program, idx, ids, w, out):
main_program.global_block()._insert_op(
index=idx,
type="lookup_sparse_table",
inputs={"Ids": [ids],
"W": [w]},
outputs={"Out": [out]},
attrs={
"is_distributed": False,
"is_sparse": True,
"grad_inplace": False
})
def __get_prefetch_op_tuples(main_program):
# current lookup tables op is split_ids->prefetch->merge_ids
prefetch_op_tuples = None
op_types = [op.type for op in main_program.global_block().ops]
for i in range(len(op_types)):
if op_types[i] == "prefetch":
if op_types[i - 1] == "split_ids" and op_types[i +
1] == "merge_ids":
split_ids_op_id = i - 1
split_ids_inputs = main_program.global_block().ops[i - 1].input(
"Ids")
prefetch_op_inputs = main_program.global_block().ops[i].input(
"X")
prefetch_op_outputs = main_program.global_block().ops[i].output(
"Out")
merge_ids_outputs = main_program.global_block().ops[
i + 1].output("Out")
need_delete_vars = []
need_delete_vars.extend(prefetch_op_inputs)
need_delete_vars.extend(prefetch_op_outputs)
prefetch_op_tuples = (split_ids_op_id, split_ids_inputs,
merge_ids_outputs, need_delete_vars)
break
return prefetch_op_tuples
def convert_dist_to_sparse_program(program):
"""
WARNING: this function will only be used for distributed training with distributed lookup table.
when we train model with distributed lookup table but want to do the local inference, we can use
this function to convert the train program with distributed lookup table to sparse lookup table.
Args:
program(Program): the program must be the trainer program, which will be get by the distribute transpiler.
Returns:
program: The `program` is a Program, it's the program replace distributed lookup table to sparse lookup table.
"""
if not program._distributed_lookup_table:
_logger.warn(
"There are no distributed lookup tables need to be converted")
return
# create table param and grad var in pserver program
origin_emb_var = "{}.origin".format(program._distributed_lookup_table)
emb_var = program._distributed_lookup_table
program.global_block()._rename_var(emb_var, origin_emb_var)
origin_param_var = program.global_block().vars[origin_emb_var]
param_var = program.global_block().create_var(
name=emb_var,
shape=origin_param_var.shape,
dtype=origin_param_var.dtype,
type=core.VarDesc.VarType.SELECTED_ROWS,
persistable=True)
# parameter must be selected rows
param_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS)
program._sync_with_cpp()
prefetch_op_tuples = __get_prefetch_op_tuples(program)
split_ids_id = prefetch_op_tuples[0]
for idx in range(split_ids_id + 2, split_ids_id - 1, -1):
program.global_block()._remove_op(idx)
program.desc.flush()
in_out_pairs = zip(prefetch_op_tuples[1], prefetch_op_tuples[2])
for in_out_pair in in_out_pairs:
idx = split_ids_id
ids = program.global_block().vars[in_out_pair[0]]
out = program.global_block().vars[in_out_pair[1]]
__insert_lookup_sparse_table_op(program, idx, ids, param_var, out)
program.desc.flush()
return program
def load_persistables_for_increment(dirname, executor, program,
lookup_table_var, lookup_table_var_path):
"""
WARNING: this function will only be used for distributed training with distributed lookup table.
for increment training, the pserver will not only load dense variables,
but also load the suitable lookup table var. Because of sliced lookup table
var with HASH, we must load the correct sliced var.
Args:
dirname(str): The directory path
executor(Executor): The executor to run for loading inference model.
program(Program): The parameter server program, which will run on Pserver.
lookup_table_var: the distributed lookup tables var name.
lookup_table_var_path: the the distributed lookup tables var location.
Returns:
None
"""
def _load_persistable_vars(executor, dirname, need_load_vars):
load_prog = Program()
load_block = load_prog.global_block()
need_delete_vars = []
for param in need_load_vars:
origin_var = param.origin
slice_var = param.slice
is_slice = param.is_slice
offset = param.offset
if is_slice:
origin = load_block.create_var(
name="{}.load".format(origin_var.name),
type=origin_var.type,
shape=origin_var.shape,
dtype=origin_var.dtype,
persistable=True)
load_block.append_op(
type='load',
inputs={},
outputs={'Out': [origin]},
attrs={
'file_path': os.path.join(dirname, origin_var.name)
})
slice = load_block.create_var(
name=slice_var.name,
type=slice_var.type,
shape=slice_var.shape,
dtype=slice_var.dtype,
persistable=True)
dim1_flatten = reduce(lambda x, y: x * y, slice.shape[1:])
start = int(offset / dim1_flatten)
end = int(offset / dim1_flatten + slice.shape[0])
load_block.append_op(
type="slice",
inputs={'Input': origin},
outputs={'Out': slice},
attrs={'axes': [0],
'starts': [start],
'ends': [end]})
need_delete_vars.append(origin)
else:
origin = load_block.create_var(
name="{}".format(origin_var.name),
type=origin_var.type,
shape=origin_var.shape,
dtype=origin_var.dtype,
persistable=True)
load_block.append_op(
type='load',
inputs={},
outputs={'Out': [origin]},
attrs={
'file_path': os.path.join(dirname, origin_var.name)
})
load_block.append_op(
type='delete_var',
inputs={'X': need_delete_vars}, )
executor.run(load_prog)
def __load_lookup_table_vars(executor, main_program, lookup_table_var,
lookup_table_var_path):
emb_var = main_program.global_block().var(lookup_table_var)
load_program = Program()
load_block = load_program.global_block()
load_block.append_op(
type='load',
inputs={},
outputs={'Out': [emb_var]},
attrs={'file_path': lookup_table_var_path})
executor.run(load_program)
if not os.path.isdir(dirname):
raise ValueError("There is no directory named '%s'", dirname)
if not os.path.exists(lookup_table_var_path):
raise ValueError("There is no file named '%s'", lookup_table_var_path)
if not isinstance(program, Program):
raise ValueError("program must be an instance of fluid.Program")
_logger.info("Start Load Sparse Program With "
"Distributed Lookup Table Vars from {}, time = {}".format(
dirname, time.ctime()))
need_load_vars = program._parameters_on_pservers.get_distributed_vars_by_ep(
program._ps_endpoint)
_load_persistable_vars(executor, dirname, need_load_vars)
__load_lookup_table_vars(executor, program, lookup_table_var,
lookup_table_var_path)
_logger.info("Finish Load Sparse Program With "
"Distributed Lookup Table Vars from {}, time = {}".format(
dirname, time.ctime()))
def load_persistables_for_inference(dirname, executor, program,
lookup_table_var_name):
"""
WARNING: this function will only be used for inference with distributed lookup table.
Inference with distributed lookup table is a little funky, this function will load distributed
lookup table vars into sparse var, can be used in local inference mode.
Args:
dirname(str): The directory path
executor(Executor): The executor to run for loading inference model.
program(Program): The parameter server program, which will run on Pserver.
lookup_table_var_name: the distributed lookup tables var name.
Returns:
None
"""
def _load_persistable_vars(executor, dirname, program, lookup_table_vars):
def _is_checkpoint_var(exclude_fluid_vars=None):
"""
the checkpoint will not save or load all the variables.
var type is FEED_MINIBATCH/FETCH_LIST/RAW or var name ends with @GRAD are discarded.
: param var(Variable)
"""
if exclude_fluid_vars is None:
exclude_fluid_vars = []
def is_valid(var):
if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \
var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \
var.desc.type() == core.VarDesc.VarType.RAW:
return False
# @GRAD are named for gradient variables, checkpoint will not save it.
if "@GRAD" in var.name:
return False
# .trainer_ are named for distribute train variables, checkpoint will not save it.
if ".trainer_" in var.name:
return False
# .block is named for distribute train variables, checkpoint will not save it.
if ".block" in var.name:
return False
if "tmp_" in var.name:
return False
if var.name in exclude_fluid_vars:
return False
return var.persistable
return is_valid
io.load_vars(
executor,
dirname=dirname,
main_program=program,
predicate=_is_checkpoint_var(lookup_table_vars),
filename=None)
def _load_lookup_table_vars(executor, dirname, main_program,
lookup_table_vars):
if not os.path.isdir(dirname):
raise ValueError("There is no directory named '%s'", dirname)
lookup_table_dirname = os.path.join(dirname, lookup_table_dir)
emb_var_name = lookup_table_vars[0]
emb_var = main_program.global_block().var(emb_var_name)
emb_files = []
for emb_name in os.listdir(lookup_table_dirname):
if emb_var_name in emb_name:
emb_files.append(emb_name)
convert_program = Program()
global_block = convert_program.global_block()
emb_var = global_block.create_var(
name=emb_var.name,
shape=emb_var.shape,
dtype=emb_var.dtype,
type=core.VarDesc.VarType.SELECTED_ROWS,
persistable=True)
emb_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS)
sums = []
for i, emb_file in enumerate(emb_files):
var_name = "{}_{}".format(emb_var.name, i)
param_var = global_block.create_var(
name=var_name,
shape=emb_var.shape,
dtype=emb_var.dtype,
type=core.VarDesc.VarType.SELECTED_ROWS,
persistable=True)
param_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS)
global_block.append_op(
type='load',
inputs={},
outputs={'Out': [param_var]},
attrs={
'file_path': os.path.join(lookup_table_dirname, var_name)
})
sums.append(param_var)
global_block.append_op(
type='merge_sparse_lookup_table',
inputs={"X": sums},
outputs={'Out': emb_var},
attrs={})
global_block.append_op(
type='save',
inputs={"X": [emb_var]},
outputs={},
attrs={
'file_path': os.path.join(lookup_table_dirname, emb_var.name)
})
global_block.append_op(type='delete_var', inputs={'X': sums})
executor.run(convert_program)
if not os.path.isdir(dirname):
raise ValueError("There is no directory named '%s'", dirname)
if program:
if not isinstance(program, Program):
raise ValueError("program must be an instance of fluid.Program")
else:
local_model = os.path.join(dirname, model_filename)
with open(local_model, "rb") as f:
program_desc_str = f.read()
program = Program.parse_from_string(program_desc_str)
if not core._is_program_version_supported(program._version()):
raise ValueError("Unsupported program version: %d\n" %
program._version())
_logger.info("Start Load Sparse Program With "
"Distributed Lookup Table Vars from {}, time = {}".format(
dirname, time.ctime()))
_load_persistable_vars(executor, dirname, program, [lookup_table_var_name])
_load_lookup_table_vars(executor, dirname, program, [lookup_table_var_name])
_logger.info("Finish Load Sparse Program With "
"Distributed Lookup Table Vars from {}, time = {}".format(
dirname, time.ctime()))
return program
def get_inference_model(main_program, feeded_var_names, target_vars):
"""
Prune the given `main_program` to build a new program especially for inference with distributed lookup table ,
and then add `feeded_vars` and `target_vars` in this program.
Args:
main_program(Program|None): The original program, which will be pruned to
build the inference model. If is set None,
the default main program will be used.
Default: None.
feeded_var_names(list[str]): Names of variables that need to be fed data
during inference.
target_vars(list[Variable]): Variables from which we can get inference
results.
Returns:
program(Program)
Raises:
ValueError: If `feed_var_names` is not a list of basestring.
ValueError: If `target_vars` is not a list of Variable.
"""
def prepend_feed_ops(inference_program,
feed_target_names,
feed_holder_name='feed'):
if len(feed_target_names) == 0:
return
global_block = inference_program.global_block()
feed_var = global_block.create_var(
name=feed_holder_name,
type=core.VarDesc.VarType.FEED_MINIBATCH,
persistable=True)
for i, name in enumerate(feed_target_names):
out = global_block.var(name)
global_block._prepend_op(
type='feed',
inputs={'X': [feed_var]},
outputs={'Out': [out]},
attrs={'col': i})
def append_fetch_ops(inference_program,
fetch_target_names,
fetch_holder_name='fetch'):
global_block = inference_program.global_block()
fetch_var = global_block.create_var(
name=fetch_holder_name,
type=core.VarDesc.VarType.FETCH_LIST,
persistable=True)
for i, name in enumerate(fetch_target_names):
global_block.append_op(
type='fetch',
inputs={'X': [name]},
outputs={'Out': [fetch_var]},
attrs={'col': i})
origin_program = main_program.clone()
main_program = main_program.clone()
global_block = main_program.global_block()
need_to_remove_op_index = []
for i, op in enumerate(global_block.ops):
op.desc.set_is_target(False)
if op.type == "feed" or op.type == "fetch":
need_to_remove_op_index.append(i)
for index in need_to_remove_op_index[::-1]:
global_block._remove_op(index)
main_program.desc.flush()
main_program = main_program._prune(targets=target_vars)
main_program = main_program._inference_optimize(prune_read_op=True)
fetch_var_names = [v.name for v in target_vars]
prepend_feed_ops(main_program, feeded_var_names)
append_fetch_ops(main_program, fetch_var_names)
return main_program
...@@ -98,7 +98,6 @@ if(WIN32) ...@@ -98,7 +98,6 @@ if(WIN32)
LIST(REMOVE_ITEM TEST_OPS test_debugger) LIST(REMOVE_ITEM TEST_OPS test_debugger)
list(REMOVE_ITEM TEST_OPS test_fake_init_op) list(REMOVE_ITEM TEST_OPS test_fake_init_op)
list(REMOVE_ITEM TEST_OPS test_merge_ids_op) list(REMOVE_ITEM TEST_OPS test_merge_ids_op)
list(REMOVE_ITEM TEST_OPS test_split_ids_op)
LIST(REMOVE_ITEM TEST_OPS test_ref_by_trainer_id_op) LIST(REMOVE_ITEM TEST_OPS test_ref_by_trainer_id_op)
endif() endif()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册