diff --git a/paddle/fluid/operators/distributed_ops/split_byref_op.cc b/paddle/fluid/operators/distributed_ops/split_byref_op.cc deleted file mode 100644 index 79d9c272e8b4e5241ec5f5d79a706fd5376788d9..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/distributed_ops/split_byref_op.cc +++ /dev/null @@ -1,103 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include "paddle/fluid/operators/distributed_ops/split_byref_op.h" -#include "paddle/fluid/operators/split_op.h" - -namespace paddle { -namespace operators { -using framework::Tensor; - -class SplitByrefOp : public framework::OperatorWithKernel { - public: - using framework::OperatorWithKernel::OperatorWithKernel; - - void InferShape(framework::InferShapeContext *ctx) const override { - PADDLE_ENFORCE(ctx->HasInput("X"), - "Input(X) of SplitOp should not be null."); - PADDLE_ENFORCE_GE(ctx->Outputs("Out").size(), 1UL, - "Outputs(Out) of SplitOp should not be empty."); - auto in_dims = ctx->GetInputDim("X"); - auto outs_names = ctx->Outputs("Out"); - size_t num = static_cast(ctx->Attrs().Get("num")); - auto sections = ctx->Attrs().Get>("sections"); - const size_t outs_number = outs_names.size(); - std::vector outs_dims; - outs_dims.reserve(outs_number); - - if (num > 0) { - int64_t in_axis_dim = 0; - if (ctx->IsRuntime()) { - in_axis_dim = in_dims[0]; - } - PADDLE_ENFORCE_EQ(in_axis_dim % num, 0, - "tensor split does not result" - " in an equal division"); - size_t out_axis_dim = in_axis_dim / num; - for (size_t i = 0; i < outs_number; ++i) { - auto dim = in_dims; - dim[0] = out_axis_dim; - outs_dims.push_back(dim); - } - } else if (sections.size() > 0) { - PADDLE_ENFORCE_EQ(sections.size(), outs_number, - "tensor split sections size" - "should be equal to output size."); - for (size_t i = 0; i < outs_number; ++i) { - auto dim = in_dims; - dim[0] = sections[i]; - outs_dims.push_back(dim); - } - } - ctx->SetOutputsDim("Out", outs_dims); - } -}; - -class SplitByrefOpMaker : public framework::OpProtoAndCheckerMaker { - public: - void Make() override { - AddInput("X", "(Tensor) Input tensor of the split operator."); - AddOutput("Out", "(Tensor) Output tensors of the split operator.") - .AsDuplicable(); - AddComment(R"DOC( -SplitByref operator - -Split source tensor to sevaral tensors by axis 0. No copy in this operator -is performed, output tensor shares the same blocks of memory. -)DOC"); - AddAttr>("sections", - "(vector) " - "the length of each output along the " - "specified axis.") - .SetDefault(std::vector{}); - AddAttr("num", - "(int, default 0)" - "Number of sub-tensors. This must evenly divide " - "Input.dims()[axis]") - .SetDefault(0); - } -}; - -} // namespace operators -} // namespace paddle - -namespace ops = paddle::operators; -// NOTE: concat op default axis must be 0! -USE_CPU_ONLY_OP(concat); - -REGISTER_OPERATOR(split_byref, ops::SplitByrefOp, ops::SplitByrefOpMaker, - ops::SplitGradMaker, - ops::SplitGradMaker); -REGISTER_OP_CPU_KERNEL( - split_byref, ops::SplitByrefOpKernel); diff --git a/paddle/fluid/operators/distributed_ops/split_byref_op.cu.cc b/paddle/fluid/operators/distributed_ops/split_byref_op.cu.cc deleted file mode 100644 index 056659c3ea61f6233a6dda56ca1e272e72770d4a..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/distributed_ops/split_byref_op.cu.cc +++ /dev/null @@ -1,19 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include "paddle/fluid/operators/distributed_ops/split_byref_op.h" -namespace ops = paddle::operators; -REGISTER_OP_CUDA_KERNEL( - split_byref, - ops::SplitByrefOpKernel); diff --git a/paddle/fluid/operators/distributed_ops/split_byref_op.h b/paddle/fluid/operators/distributed_ops/split_byref_op.h deleted file mode 100644 index fedd7218dd6cc9481e94a92a3820cafbe4157bd0..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/distributed_ops/split_byref_op.h +++ /dev/null @@ -1,43 +0,0 @@ -/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#pragma once - -#include -#include "paddle/fluid/framework/op_registry.h" - -namespace paddle { -namespace operators { - -template -class SplitByrefOpKernel : public framework::OpKernel { - public: - void Compute(const framework::ExecutionContext& ctx) const override { - auto* in = ctx.Input("X"); - auto outs = ctx.MultiOutput("Out"); - auto place = ctx.GetPlace(); - - size_t row_offset = 0; - for (size_t i = 0; i < outs.size(); ++i) { - // NOTE: no need to call mutable_data here to allocate memory. - auto* out = outs[i]; - VLOG(3) << "spliting by ref: " << row_offset << " " << out->dims()[0]; - *out = in->Slice(row_offset, row_offset + out->dims()[0]); - row_offset += out->dims()[0]; - } - } -}; - -} // namespace operators -} // namespace paddle diff --git a/paddle/fluid/operators/distributed_ops/split_ids_op.cc b/paddle/fluid/operators/distributed_ops/split_ids_op.cc deleted file mode 100644 index df9681c315c67299189c203591f533efb93489a5..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/distributed_ops/split_ids_op.cc +++ /dev/null @@ -1,90 +0,0 @@ -/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include "paddle/fluid/operators/distributed_ops/split_ids_op.h" - -#include - -namespace paddle { -namespace operators { - -class SplitIdsOpMaker : public framework::OpProtoAndCheckerMaker { - public: - void Make() override { - AddInput("Ids", "(LoDTensor) the input ids with shape{batch_num, 1}") - .AsDuplicable(); - - AddOutput("Out", "(LoDTensors) The outputs of the input Ids.") - .AsDuplicable(); - - AddComment(R"DOC( -Split a LoDTensor of Ids into multi LoDTensors, the number is pserver's number -Example: - Input: - X = [[1,2,3,4,5,6],[2,3]] - - Out(3 output): - if compress is True: - out0 = [3, 3, 6] - out1 = [1, 4] - out2 = [2, 2, 5] - else: - out0 = [3, 6] - out1 = [1, 4] - out2 = [2, 5] -)DOC"); - } -}; - -class SplitIdsOp : public framework::OperatorWithKernel { - public: - using framework::OperatorWithKernel::OperatorWithKernel; - - void InferShape(framework::InferShapeContext *ctx) const override { - PADDLE_ENFORCE(ctx->HasInputs("Ids"), "SplitIdsOp must have input Ids."); - PADDLE_ENFORCE(ctx->HasOutputs("Out"), "SplitIdsOp must have output Out."); - - auto ids_var_type = ctx->GetInputsVarType("Ids").front(); - auto ids_dims = ctx->GetInputsDim("Ids"); - if (ids_var_type == framework::proto::VarType::LOD_TENSOR) { - PADDLE_ENFORCE_EQ(ids_dims[0].size(), 2); - } - } - - protected: - framework::OpKernelType GetExpectedKernelType( - const framework::ExecutionContext &ctx) const override { - return framework::OpKernelType( - OperatorWithKernel::IndicateVarDataType(ctx, "Ids"), ctx.GetPlace()); - } -}; - -class SplitIdsOpInferVarType : public framework::VarTypeInference { - public: - void operator()(framework::InferVarTypeContext *ctx) const override { - auto input_type = ctx->GetInputType("Ids"); - ctx->SetOutputType("Out", input_type, framework::ALL_ELEMENTS); - } -}; - -} // namespace operators -} // namespace paddle - -namespace ops = paddle::operators; -REGISTER_OPERATOR(split_ids, ops::SplitIdsOp, ops::SplitIdsOpMaker, - ops::SplitIdsOpInferVarType); - -REGISTER_OP_CPU_KERNEL( - split_ids, ops::SplitIdsOpKernel, - ops::SplitIdsOpKernel); diff --git a/paddle/fluid/operators/distributed_ops/split_ids_op.h b/paddle/fluid/operators/distributed_ops/split_ids_op.h deleted file mode 100644 index 8a75dd8062359a4e2fdbbeffb24c3b92d71b87bd..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/distributed_ops/split_ids_op.h +++ /dev/null @@ -1,125 +0,0 @@ -/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#pragma once - -#include -#include -#include -#include -#include "paddle/fluid/framework/op_registry.h" -#include "paddle/fluid/operators/math/selected_rows_functor.h" - -namespace paddle { -namespace operators { - -template -class SplitIdsOpKernel : public framework::OpKernel { - public: - void Compute(const framework::ExecutionContext &ctx) const override { - auto place = ctx.GetPlace(); - if (!platform::is_cpu_place(place)) { - PADDLE_THROW("SplitIds do not support GPU kernel"); - } - - const auto ids_vars = ctx.MultiInputVar("Ids"); - - PADDLE_ENFORCE_GT(ids_vars.size(), 0, "The number of Ids should > 0"); - auto *ids_var = ids_vars[0]; - - if (ids_var->IsType()) { - int batch_size = 0; - const auto ids_tensors = ctx.MultiInput("Ids"); - for (size_t i = 0; i < ids_tensors.size(); ++i) { - batch_size += ids_tensors[i]->dims()[0]; - } - VLOG(4) << "Get Total BatchSize is: " << batch_size; - - std::vector all_ids(batch_size); - int offset = 0; - for (size_t i = 0; i < ids_tensors.size(); ++i) { - const auto *ids = ids_tensors[i]; - std::memcpy(all_ids.data() + offset, ids->data(), - ids->numel() * sizeof(T)); - offset += ids->numel(); - } - - std::set st(all_ids.begin(), all_ids.end()); - all_ids.assign(st.begin(), st.end()); - - auto outs = ctx.MultiOutput("Out"); - const size_t shard_num = outs.size(); - std::vector> out_ids; - out_ids.resize(outs.size()); - - // split id by their shard_num. - for (size_t i = 0; i < all_ids.size(); ++i) { - T id = all_ids[i]; - size_t shard_id = static_cast(id) % shard_num; - out_ids[shard_id].push_back(id); - } - - // create tensor for each shard and send to parameter server - for (size_t i = 0; i < out_ids.size(); ++i) { - auto *shard_t = outs[i]; - std::vector ids = out_ids[i]; - auto *shard_data = shard_t->mutable_data( - framework::make_ddim({static_cast(ids.size()), 1}), place); - for (size_t i = 0; i < ids.size(); ++i) { - shard_data[i] = ids[i]; - } - } - } else if (ids_var->IsType()) { - const auto *ids_selected_rows = ctx.Input("Ids"); - auto &ids_dims = ids_selected_rows->value().dims(); - PADDLE_ENFORCE_EQ(ids_dims[0], - static_cast(ids_selected_rows->rows().size()), - ""); - const T *ids_data = ids_selected_rows->value().data(); - const auto &ids_rows = ids_selected_rows->rows(); - auto outs = ctx.MultiOutput("Out"); - const size_t shard_num = outs.size(); - for (auto &out : outs) { - out->mutable_rows()->clear(); - } - // get rows for outputs - std::unordered_map id_to_index; - for (size_t i = 0; i < ids_rows.size(); ++i) { - id_to_index[ids_rows[i]] = i; - size_t shard_id = static_cast(ids_rows[i]) % shard_num; - outs[shard_id]->mutable_rows()->push_back(ids_rows[i]); - } - - int64_t row_width = ids_dims[1]; - for (auto &out : outs) { - out->set_height(ids_selected_rows->height()); - framework::DDim ddim = framework::make_ddim( - {static_cast(out->rows().size()), row_width}); - T *output = out->mutable_value()->mutable_data(ddim, place); - for (int64_t i = 0; i < ddim[0]; ++i) { - memcpy(output + i * row_width, - ids_data + id_to_index[out->rows()[i]] * row_width, - row_width * sizeof(T)); - } - } - } else { - PADDLE_THROW( - "% should be LoDTensor or SelectedRows, but the received type is %s", - ctx.InputNames("Ids")[0], framework::ToTypeName(ids_var->Type())); - } - } -}; - -} // namespace operators -} // namespace paddle diff --git a/python/paddle/fluid/contrib/__init__.py b/python/paddle/fluid/contrib/__init__.py index 5ae06cb1a0fb1a75226824545834b6ddc9676a5e..1ea6fa26c025a34fa96237b0ae1e0c607357090f 100644 --- a/python/paddle/fluid/contrib/__init__.py +++ b/python/paddle/fluid/contrib/__init__.py @@ -25,8 +25,6 @@ from .quantize import * from . import reader from .reader import * from . import slim -from . import utils -from .utils import * from . import extend_optimizer from .extend_optimizer import * from . import model_stat @@ -42,7 +40,6 @@ __all__ += memory_usage_calc.__all__ __all__ += op_frequence.__all__ __all__ += quantize.__all__ __all__ += reader.__all__ -__all__ += utils.__all__ __all__ += extend_optimizer.__all__ __all__ += ['mixed_precision'] __all__ += layers.__all__ diff --git a/python/paddle/fluid/contrib/utils/__init__.py b/python/paddle/fluid/contrib/utils/__init__.py deleted file mode 100644 index 1c1c2fb22709189ca03dc543ca551257c8031c1a..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/contrib/utils/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function -from . import lookup_table_utils -from .lookup_table_utils import * -from . import hdfs_utils -from .hdfs_utils import * - -__all__ = [] -__all__ += lookup_table_utils.__all__ -__all__ += hdfs_utils.__all__ diff --git a/python/paddle/fluid/contrib/utils/hdfs_utils.py b/python/paddle/fluid/contrib/utils/hdfs_utils.py deleted file mode 100644 index 2de4f82bd14559a99581c5716523b2a78c2d7998..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/contrib/utils/hdfs_utils.py +++ /dev/null @@ -1,603 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""hdfs_utils.py will move to fluid/incubate/fleet/utils/hdfs.py""" - -import os -import sys -import subprocess -import multiprocessing -from datetime import datetime - -import re -import copy -import errno - -import logging -from paddle.fluid.log_helper import get_logger - -__all__ = ["HDFSClient", "multi_download", "multi_upload"] - -_logger = get_logger( - __name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s') - - -class HDFSClient(object): - """ - A tool of HDFS - - Args: - hadoop_home (string): hadoop_home - configs (dict): hadoop config, it is a dict, please contain \ - key "fs.default.name" and "hadoop.job.ugi" - Can be a float value - Examples: - hadoop_home = "/home/client/hadoop-client/hadoop/" - - configs = { - "fs.default.name": "hdfs://xxx.hadoop.com:54310", - "hadoop.job.ugi": "hello,hello123" - } - - client = HDFSClient(hadoop_home, configs) - - client.ls("/user/com/train-25") - files = client.lsr("/user/com/train-25/models") - """ - - def __init__(self, hadoop_home, configs): - self.pre_commands = [] - hadoop_bin = '%s/bin/hadoop' % hadoop_home - self.pre_commands.append(hadoop_bin) - dfs = 'fs' - self.pre_commands.append(dfs) - - for k, v in configs.items(): - config_command = '-D%s=%s' % (k, v) - self.pre_commands.append(config_command) - - def __run_hdfs_cmd(self, commands, retry_times=5): - whole_commands = copy.deepcopy(self.pre_commands) - whole_commands.extend(commands) - - print('Running system command: {0}'.format(' '.join(whole_commands))) - - ret_code = 0 - ret_out = None - ret_err = None - whole_commands = " ".join(whole_commands) - for x in range(retry_times + 1): - proc = subprocess.Popen( - whole_commands, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True) - (output, errors) = proc.communicate() - ret_code, ret_out, ret_err = proc.returncode, output, errors - if ret_code: - _logger.warn( - 'Times: %d, Error running command: %s. Return code: %d, Error: %s' - % (x, ' '.join(whole_commands), proc.returncode, errors)) - else: - break - return ret_code, ret_out, ret_err - - def upload(self, hdfs_path, local_path, overwrite=False, retry_times=5): - """ - upload the local file to hdfs - - Args: - hdfs_path(str): the hdfs file path - local_path(str): the local file path - overwrite(bool|None): will overwrite the file on HDFS or not - retry_times(int|5): retry times - - Returns: - True or False - """ - assert hdfs_path is not None - assert local_path is not None and os.path.exists(local_path) - - if os.path.isdir(local_path): - _logger.warn( - "The Local path: {} is dir and I will support it later, return". - format(local_path)) - return False - - base = os.path.basename(local_path) - if not self.is_exist(hdfs_path): - self.makedirs(hdfs_path) - else: - if self.is_exist(os.path.join(hdfs_path, base)): - if overwrite: - _logger.error( - "The HDFS path: {} is exist and overwrite is True, delete it". - format(hdfs_path)) - self.delete(hdfs_path) - else: - _logger.error( - "The HDFS path: {} is exist and overwrite is False, return". - format(hdfs_path)) - return False - - put_commands = ["-put", local_path, hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd(put_commands, - retry_times) - if returncode: - _logger.error("Put local path: {} to HDFS path: {} failed".format( - local_path, hdfs_path)) - return False - else: - _logger.info("Put local path: {} to HDFS path: {} successfully". - format(local_path, hdfs_path)) - return True - - def download(self, hdfs_path, local_path, overwrite=False, unzip=False): - """ - download file from HDFS - - Args: - hdfs_path(str): the hdfs file path - local_path(str): the local file path - overwrite(bool|None): will overwrite the file on HDFS or not - unzip(bool|False): if the download file is compressed by zip, unzip it or not. - - Returns: - True or False - """ - _logger.info('Downloading %r to %r.', hdfs_path, local_path) - _logger.info('Download of %s to %r complete.', hdfs_path, local_path) - - if not self.is_exist(hdfs_path): - print("HDFS path: {} do not exist".format(hdfs_path)) - return False - if self.is_dir(hdfs_path): - _logger.error( - "The HDFS path: {} is dir and I will support it later, return". - format(hdfs_path)) - - if os.path.exists(local_path): - base = os.path.basename(hdfs_path) - local_file = os.path.join(local_path, base) - if os.path.exists(local_file): - if overwrite: - os.remove(local_file) - else: - _logger.error( - "The Local path: {} is exist and overwrite is False, return". - format(local_file)) - return False - - self.make_local_dirs(local_path) - - download_commands = ["-get", hdfs_path, local_path] - returncode, output, errors = self.__run_hdfs_cmd(download_commands) - if returncode: - _logger.error("Get local path: {} from HDFS path: {} failed".format( - local_path, hdfs_path)) - return False - else: - _logger.info("Get local path: {} from HDFS path: {} successfully". - format(local_path, hdfs_path)) - return True - - def is_exist(self, hdfs_path=None): - """ - whether the remote HDFS path exists - - Args: - hdfs_path(str): the hdfs file path - - Returns: - True or False - """ - exist_cmd = ['-test', '-e', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd( - exist_cmd, retry_times=1) - - if returncode: - _logger.error("HDFS is_exist HDFS path: {} failed".format( - hdfs_path)) - return False - else: - _logger.info("HDFS is_exist HDFS path: {} successfully".format( - hdfs_path)) - return True - - def is_dir(self, hdfs_path=None): - """ - whether the remote HDFS path is directory - - Args: - hdfs_path(str): the hdfs file path - - Returns: - True or False - """ - - if not self.is_exist(hdfs_path): - return False - - dir_cmd = ['-test', '-d', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd(dir_cmd, retry_times=1) - - if returncode: - _logger.error("HDFS path: {} failed is not a directory".format( - hdfs_path)) - return False - else: - _logger.info("HDFS path: {} successfully is a directory".format( - hdfs_path)) - return True - - def delete(self, hdfs_path): - """ - Remove a file or directory from HDFS. - - whether the remote HDFS path exists - - Args: - hdfs_path: HDFS path. - - Returns: - True or False - This function returns `True` if the deletion was successful and `False` if - no file or directory previously existed at `hdfs_path`. - """ - _logger.info('Deleting %r.', hdfs_path) - - if not self.is_exist(hdfs_path): - _logger.warn("HDFS path: {} do not exist".format(hdfs_path)) - return True - - if self.is_dir(hdfs_path): - del_cmd = ['-rmr', hdfs_path] - else: - del_cmd = ['-rm', hdfs_path] - - returncode, output, errors = self.__run_hdfs_cmd(del_cmd, retry_times=0) - - if returncode: - _logger.error("HDFS path: {} delete files failure".format( - hdfs_path)) - return False - else: - _logger.info("HDFS path: {} delete files successfully".format( - hdfs_path)) - return True - - def rename(self, hdfs_src_path, hdfs_dst_path, overwrite=False): - """ - Move a file or folder on HDFS. - - Args: - hdfs_path(str): HDFS path. - overwrite(bool|False): If the path already exists and overwrite is False, will return False. - - Returns: - True or False - """ - assert hdfs_src_path is not None - assert hdfs_dst_path is not None - - if not self.is_exist(hdfs_src_path): - _logger.info("HDFS path do not exist: {}".format(hdfs_src_path)) - if self.is_exist(hdfs_dst_path) and not overwrite: - _logger.error("HDFS path is exist: {} and overwrite=False".format( - hdfs_dst_path)) - - rename_command = ['-mv', hdfs_src_path, hdfs_dst_path] - returncode, output, errors = self.__run_hdfs_cmd( - rename_command, retry_times=1) - - if returncode: - _logger.error("HDFS rename path: {} to {} failed".format( - hdfs_src_path, hdfs_dst_path)) - return False - else: - _logger.info("HDFS rename path: {} to {} successfully".format( - hdfs_src_path, hdfs_dst_path)) - return True - - @staticmethod - def make_local_dirs(local_path): - """ - create a directory local, is same to mkdir - Args: - local_path: local path that wants to create a directory. - """ - try: - os.makedirs(local_path) - except OSError as e: - if e.errno != errno.EEXIST: - raise - - def makedirs(self, hdfs_path): - """ - Create a remote directory, recursively if necessary. - - Args: - hdfs_path(str): Remote path. Intermediate directories will be created appropriately. - - Returns: - True or False - """ - _logger.info('Creating directories to %r.', hdfs_path) - assert hdfs_path is not None - - if self.is_exist(hdfs_path): - _logger.error("HDFS path is exist: {}".format(hdfs_path)) - return - - mkdirs_commands = ['-mkdir', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd( - mkdirs_commands, retry_times=1) - - if returncode: - _logger.error("HDFS mkdir path: {} failed".format(hdfs_path)) - return False - else: - _logger.error("HDFS mkdir path: {} successfully".format(hdfs_path)) - return True - - def ls(self, hdfs_path): - """ - ls directory contents about HDFS hdfs_path - - Args: - hdfs_path(str): Remote HDFS path will be ls. - - Returns: - List: a contents list about hdfs_path. - """ - assert hdfs_path is not None - - if not self.is_exist(hdfs_path): - return [] - - ls_commands = ['-ls', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd( - ls_commands, retry_times=1) - - if returncode: - _logger.error("HDFS list path: {} failed".format(hdfs_path)) - return [] - else: - _logger.info("HDFS list path: {} successfully".format(hdfs_path)) - - ret_lines = [] - regex = re.compile('\s+') - out_lines = output.strip().split("\n") - for line in out_lines: - re_line = regex.split(line) - if len(re_line) == 8: - ret_lines.append(re_line[7]) - return ret_lines - - def lsr(self, hdfs_path, only_file=True, sort=True): - """ - list directory contents about HDFS hdfs_path recursively - - Args: - hdfs_path(str): Remote HDFS path. - only_file(bool|True): will discard folders. - sort(bool|True): will be sorted by create time. - - Returns: - List: a contents list about hdfs_path. - """ - - def sort_by_time(v1, v2): - v1_time = datetime.strptime(v1[1], '%Y-%m-%d %H:%M') - v2_time = datetime.strptime(v2[1], '%Y-%m-%d %H:%M') - return v1_time > v2_time - - assert hdfs_path is not None - - if not self.is_exist(hdfs_path): - return [] - - ls_commands = ['-lsr', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd( - ls_commands, retry_times=1) - - if returncode: - _logger.error("HDFS list all files: {} failed".format(hdfs_path)) - return [] - else: - _logger.info("HDFS list all files: {} successfully".format( - hdfs_path)) - lines = [] - regex = re.compile('\s+') - out_lines = output.strip().split("\n") - for line in out_lines: - re_line = regex.split(line) - if len(re_line) == 8: - if only_file and re_line[0][0] == "d": - continue - else: - lines.append( - (re_line[7], re_line[5] + " " + re_line[6])) - if sort: - sorted(lines, cmp=sort_by_time) - ret_lines = [ret[0] for ret in lines] - return ret_lines - - -def multi_download(client, - hdfs_path, - local_path, - trainer_id, - trainers, - multi_processes=5): - """ - Download files from HDFS using multi process. - - Args: - client(HDFSClient): instance of HDFSClient - hdfs_path(str): path on hdfs - local_path(str): path on local - trainer_id(int): current trainer id - trainers(int): all trainers number - multi_processes(int|5): the download data process at the same time, default=5 - - Returns: - List: - Download files in local folder. - """ - - def __subprocess_download(datas): - for data in datas: - re_path = os.path.relpath(os.path.dirname(data), hdfs_path) - if re_path == os.curdir: - sub_local_re_path = local_path - else: - sub_local_re_path = os.path.join(local_path, re_path) - client.download(data, sub_local_re_path) - - assert isinstance(client, HDFSClient) - - client.make_local_dirs(local_path) - _logger.info("Make local dir {} successfully".format(local_path)) - - all_need_download = client.lsr(hdfs_path, sort=True) - need_download = all_need_download[trainer_id::trainers] - _logger.info("Get {} files From all {} files need to be download from {}". - format(len(need_download), len(all_need_download), hdfs_path)) - - _logger.info("Start {} multi process to download datas".format( - multi_processes)) - procs = [] - for i in range(multi_processes): - process_datas = need_download[i::multi_processes] - p = multiprocessing.Process( - target=__subprocess_download, args=(process_datas, )) - procs.append(p) - p.start() - - # complete the processes - for proc in procs: - proc.join() - - _logger.info("Finish {} multi process to download datas".format( - multi_processes)) - - local_downloads = [] - for data in need_download: - data_name = os.path.basename(data) - re_path = os.path.relpath(os.path.dirname(data), hdfs_path) - if re_path == os.curdir: - local_re_path = os.path.join(local_path, data_name) - else: - local_re_path = os.path.join(local_path, re_path, data_name) - local_downloads.append(local_re_path) - - return local_downloads - - -def getfilelist(path): - rlist = [] - for dir, folder, file in os.walk(path): - for i in file: - t = os.path.join(dir, i) - rlist.append(t) - for r in rlist: - print(r) - - -def multi_upload(client, - hdfs_path, - local_path, - multi_processes=5, - overwrite=False, - sync=True): - """ - Upload files to HDFS using multi process. - - Args: - client(HDFSClient): instance of HDFSClient - hdfs_path(str): path on hdfs - local_path(str): path on local - multi_processes(int|5): the upload data process at the same time, default=5 - overwrite(bool|False): will overwrite file on HDFS or not - sync(bool|True): upload files sync or not. - - Returns: - None - """ - - def __subprocess_upload(datas): - for data in datas: - re_path = os.path.relpath(os.path.dirname(data), local_path) - hdfs_re_path = os.path.join(hdfs_path, re_path) - client.upload(hdfs_re_path, data, overwrite, retry_times=5) - - def get_local_files(path): - rlist = [] - - if not os.path.isdir(path): - return rlist - - for dirname, folder, files in os.walk(path): - for i in files: - t = os.path.join(dirname, i) - rlist.append(t) - return rlist - - assert isinstance(client, HDFSClient) - - all_files = get_local_files(local_path) - if not all_files: - _logger.info("there are nothing need to upload, exit") - return - _logger.info("Start {} multi process to upload datas".format( - multi_processes)) - procs = [] - for i in range(multi_processes): - process_datas = all_files[i::multi_processes] - p = multiprocessing.Process( - target=__subprocess_upload, args=(process_datas, )) - procs.append(p) - p.start() - - # complete the processes - for proc in procs: - proc.join() - - _logger.info("Finish {} multi process to upload datas".format( - multi_processes)) - - -if __name__ == "__main__": - hadoop_home = "/home/client/hadoop-client/hadoop/" - - configs = { - "fs.default.name": "hdfs://xxx.hadoop.com:54310", - "hadoop.job.ugi": "hello,hello123" - } - - client = HDFSClient(hadoop_home, configs) - - client.ls("/user/com/train-25") - files = client.lsr("/user/com/train-25/models") - - downloads = multi_download( - client, - "/user/com/train-25/model", - "/home/xx/data1", - 1, - 5, - 100, - multi_processes=5) - - multi_upload(client, "/user/com/train-25/model", "/home/xx/data1") diff --git a/python/paddle/fluid/contrib/utils/lookup_table_utils.py b/python/paddle/fluid/contrib/utils/lookup_table_utils.py deleted file mode 100644 index 7d30de565e7a41b02cbf37893f561283eef29b3a..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/contrib/utils/lookup_table_utils.py +++ /dev/null @@ -1,496 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""lookup_table_utils.py will move to fluid/incubate/fleet/utils/lookup_table.py""" - -from __future__ import print_function - -import os -import time -import logging - -import paddle -from paddle.fluid import core -from paddle.fluid import io -from paddle.fluid import Program -from paddle.fluid.log_helper import get_logger - -__all__ = [ - "load_persistables_for_increment", "load_persistables_for_inference", - "convert_dist_to_sparse_program" -] - -_logger = get_logger( - 'lookup_table_utils', - logging.INFO, - fmt='%(asctime)s-%(levelname)s: %(message)s') - -model_filename = "__model__" -lookup_table_dir = "__lookup_table__" - - -def __insert_lookup_sparse_table_op(main_program, idx, ids, w, out): - main_program.global_block()._insert_op( - index=idx, - type="lookup_sparse_table", - inputs={"Ids": [ids], - "W": [w]}, - outputs={"Out": [out]}, - attrs={ - "is_distributed": False, - "is_sparse": True, - "grad_inplace": False - }) - - -def __get_prefetch_op_tuples(main_program): - # current lookup tables op is split_ids->prefetch->merge_ids - prefetch_op_tuples = None - op_types = [op.type for op in main_program.global_block().ops] - - for i in range(len(op_types)): - if op_types[i] == "prefetch": - if op_types[i - 1] == "split_ids" and op_types[i + - 1] == "merge_ids": - split_ids_op_id = i - 1 - split_ids_inputs = main_program.global_block().ops[i - 1].input( - "Ids") - prefetch_op_inputs = main_program.global_block().ops[i].input( - "X") - prefetch_op_outputs = main_program.global_block().ops[i].output( - "Out") - merge_ids_outputs = main_program.global_block().ops[ - i + 1].output("Out") - - need_delete_vars = [] - need_delete_vars.extend(prefetch_op_inputs) - need_delete_vars.extend(prefetch_op_outputs) - - prefetch_op_tuples = (split_ids_op_id, split_ids_inputs, - merge_ids_outputs, need_delete_vars) - break - return prefetch_op_tuples - - -def convert_dist_to_sparse_program(program): - """ - WARNING: this function will only be used for distributed training with distributed lookup table. - when we train model with distributed lookup table but want to do the local inference, we can use - this function to convert the train program with distributed lookup table to sparse lookup table. - - Args: - program(Program): the program must be the trainer program, which will be get by the distribute transpiler. - Returns: - program: The `program` is a Program, it's the program replace distributed lookup table to sparse lookup table. - """ - if not program._distributed_lookup_table: - _logger.warn( - "There are no distributed lookup tables need to be converted") - return - - # create table param and grad var in pserver program - origin_emb_var = "{}.origin".format(program._distributed_lookup_table) - emb_var = program._distributed_lookup_table - program.global_block()._rename_var(emb_var, origin_emb_var) - origin_param_var = program.global_block().vars[origin_emb_var] - - param_var = program.global_block().create_var( - name=emb_var, - shape=origin_param_var.shape, - dtype=origin_param_var.dtype, - type=core.VarDesc.VarType.SELECTED_ROWS, - persistable=True) - # parameter must be selected rows - param_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS) - program._sync_with_cpp() - - prefetch_op_tuples = __get_prefetch_op_tuples(program) - - split_ids_id = prefetch_op_tuples[0] - - for idx in range(split_ids_id + 2, split_ids_id - 1, -1): - program.global_block()._remove_op(idx) - program.desc.flush() - - in_out_pairs = zip(prefetch_op_tuples[1], prefetch_op_tuples[2]) - - for in_out_pair in in_out_pairs: - idx = split_ids_id - ids = program.global_block().vars[in_out_pair[0]] - out = program.global_block().vars[in_out_pair[1]] - __insert_lookup_sparse_table_op(program, idx, ids, param_var, out) - program.desc.flush() - return program - - -def load_persistables_for_increment(dirname, executor, program, - lookup_table_var, lookup_table_var_path): - """ - WARNING: this function will only be used for distributed training with distributed lookup table. - for increment training, the pserver will not only load dense variables, - but also load the suitable lookup table var. Because of sliced lookup table - var with HASH, we must load the correct sliced var. - - Args: - dirname(str): The directory path - executor(Executor): The executor to run for loading inference model. - program(Program): The parameter server program, which will run on Pserver. - lookup_table_var: the distributed lookup tables var name. - lookup_table_var_path: the the distributed lookup tables var location. - - Returns: - None - """ - - def _load_persistable_vars(executor, dirname, need_load_vars): - load_prog = Program() - load_block = load_prog.global_block() - need_delete_vars = [] - - for param in need_load_vars: - origin_var = param.origin - slice_var = param.slice - is_slice = param.is_slice - offset = param.offset - - if is_slice: - origin = load_block.create_var( - name="{}.load".format(origin_var.name), - type=origin_var.type, - shape=origin_var.shape, - dtype=origin_var.dtype, - persistable=True) - - load_block.append_op( - type='load', - inputs={}, - outputs={'Out': [origin]}, - attrs={ - 'file_path': os.path.join(dirname, origin_var.name) - }) - - slice = load_block.create_var( - name=slice_var.name, - type=slice_var.type, - shape=slice_var.shape, - dtype=slice_var.dtype, - persistable=True) - - dim1_flatten = reduce(lambda x, y: x * y, slice.shape[1:]) - start = int(offset / dim1_flatten) - end = int(offset / dim1_flatten + slice.shape[0]) - - load_block.append_op( - type="slice", - inputs={'Input': origin}, - outputs={'Out': slice}, - attrs={'axes': [0], - 'starts': [start], - 'ends': [end]}) - - need_delete_vars.append(origin) - else: - origin = load_block.create_var( - name="{}".format(origin_var.name), - type=origin_var.type, - shape=origin_var.shape, - dtype=origin_var.dtype, - persistable=True) - load_block.append_op( - type='load', - inputs={}, - outputs={'Out': [origin]}, - attrs={ - 'file_path': os.path.join(dirname, origin_var.name) - }) - - load_block.append_op( - type='delete_var', - inputs={'X': need_delete_vars}, ) - - executor.run(load_prog) - - def __load_lookup_table_vars(executor, main_program, lookup_table_var, - lookup_table_var_path): - emb_var = main_program.global_block().var(lookup_table_var) - - load_program = Program() - load_block = load_program.global_block() - load_block.append_op( - type='load', - inputs={}, - outputs={'Out': [emb_var]}, - attrs={'file_path': lookup_table_var_path}) - executor.run(load_program) - - if not os.path.isdir(dirname): - raise ValueError("There is no directory named '%s'", dirname) - - if not os.path.exists(lookup_table_var_path): - raise ValueError("There is no file named '%s'", lookup_table_var_path) - - if not isinstance(program, Program): - raise ValueError("program must be an instance of fluid.Program") - - _logger.info("Start Load Sparse Program With " - "Distributed Lookup Table Vars from {}, time = {}".format( - dirname, time.ctime())) - - need_load_vars = program._parameters_on_pservers.get_distributed_vars_by_ep( - program._ps_endpoint) - _load_persistable_vars(executor, dirname, need_load_vars) - __load_lookup_table_vars(executor, program, lookup_table_var, - lookup_table_var_path) - - _logger.info("Finish Load Sparse Program With " - "Distributed Lookup Table Vars from {}, time = {}".format( - dirname, time.ctime())) - - -def load_persistables_for_inference(dirname, executor, program, - lookup_table_var_name): - """ - WARNING: this function will only be used for inference with distributed lookup table. - Inference with distributed lookup table is a little funky, this function will load distributed - lookup table vars into sparse var, can be used in local inference mode. - - Args: - dirname(str): The directory path - executor(Executor): The executor to run for loading inference model. - program(Program): The parameter server program, which will run on Pserver. - lookup_table_var_name: the distributed lookup tables var name. - Returns: - None - """ - - def _load_persistable_vars(executor, dirname, program, lookup_table_vars): - def _is_checkpoint_var(exclude_fluid_vars=None): - """ - the checkpoint will not save or load all the variables. - var type is FEED_MINIBATCH/FETCH_LIST/RAW or var name ends with @GRAD are discarded. - - : param var(Variable) - """ - - if exclude_fluid_vars is None: - exclude_fluid_vars = [] - - def is_valid(var): - if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \ - var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ - var.desc.type() == core.VarDesc.VarType.RAW: - return False - # @GRAD are named for gradient variables, checkpoint will not save it. - if "@GRAD" in var.name: - return False - # .trainer_ are named for distribute train variables, checkpoint will not save it. - if ".trainer_" in var.name: - return False - - # .block is named for distribute train variables, checkpoint will not save it. - if ".block" in var.name: - return False - - if "tmp_" in var.name: - return False - - if var.name in exclude_fluid_vars: - return False - - return var.persistable - - return is_valid - - io.load_vars( - executor, - dirname=dirname, - main_program=program, - predicate=_is_checkpoint_var(lookup_table_vars), - filename=None) - - def _load_lookup_table_vars(executor, dirname, main_program, - lookup_table_vars): - if not os.path.isdir(dirname): - raise ValueError("There is no directory named '%s'", dirname) - - lookup_table_dirname = os.path.join(dirname, lookup_table_dir) - - emb_var_name = lookup_table_vars[0] - emb_var = main_program.global_block().var(emb_var_name) - - emb_files = [] - for emb_name in os.listdir(lookup_table_dirname): - if emb_var_name in emb_name: - emb_files.append(emb_name) - - convert_program = Program() - global_block = convert_program.global_block() - - emb_var = global_block.create_var( - name=emb_var.name, - shape=emb_var.shape, - dtype=emb_var.dtype, - type=core.VarDesc.VarType.SELECTED_ROWS, - persistable=True) - emb_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS) - - sums = [] - - for i, emb_file in enumerate(emb_files): - var_name = "{}_{}".format(emb_var.name, i) - param_var = global_block.create_var( - name=var_name, - shape=emb_var.shape, - dtype=emb_var.dtype, - type=core.VarDesc.VarType.SELECTED_ROWS, - persistable=True) - param_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS) - global_block.append_op( - type='load', - inputs={}, - outputs={'Out': [param_var]}, - attrs={ - 'file_path': os.path.join(lookup_table_dirname, var_name) - }) - sums.append(param_var) - global_block.append_op( - type='merge_sparse_lookup_table', - inputs={"X": sums}, - outputs={'Out': emb_var}, - attrs={}) - global_block.append_op( - type='save', - inputs={"X": [emb_var]}, - outputs={}, - attrs={ - 'file_path': os.path.join(lookup_table_dirname, emb_var.name) - }) - global_block.append_op(type='delete_var', inputs={'X': sums}) - executor.run(convert_program) - - if not os.path.isdir(dirname): - raise ValueError("There is no directory named '%s'", dirname) - - if program: - if not isinstance(program, Program): - raise ValueError("program must be an instance of fluid.Program") - else: - local_model = os.path.join(dirname, model_filename) - - with open(local_model, "rb") as f: - program_desc_str = f.read() - - program = Program.parse_from_string(program_desc_str) - - if not core._is_program_version_supported(program._version()): - raise ValueError("Unsupported program version: %d\n" % - program._version()) - - _logger.info("Start Load Sparse Program With " - "Distributed Lookup Table Vars from {}, time = {}".format( - dirname, time.ctime())) - - _load_persistable_vars(executor, dirname, program, [lookup_table_var_name]) - _load_lookup_table_vars(executor, dirname, program, [lookup_table_var_name]) - - _logger.info("Finish Load Sparse Program With " - "Distributed Lookup Table Vars from {}, time = {}".format( - dirname, time.ctime())) - - return program - - -def get_inference_model(main_program, feeded_var_names, target_vars): - """ - Prune the given `main_program` to build a new program especially for inference with distributed lookup table , - and then add `feeded_vars` and `target_vars` in this program. - - Args: - main_program(Program|None): The original program, which will be pruned to - build the inference model. If is set None, - the default main program will be used. - Default: None. - feeded_var_names(list[str]): Names of variables that need to be fed data - during inference. - target_vars(list[Variable]): Variables from which we can get inference - results. - Returns: - program(Program) - - Raises: - ValueError: If `feed_var_names` is not a list of basestring. - ValueError: If `target_vars` is not a list of Variable. - - """ - - def prepend_feed_ops(inference_program, - feed_target_names, - feed_holder_name='feed'): - if len(feed_target_names) == 0: - return - - global_block = inference_program.global_block() - - feed_var = global_block.create_var( - name=feed_holder_name, - type=core.VarDesc.VarType.FEED_MINIBATCH, - persistable=True) - - for i, name in enumerate(feed_target_names): - out = global_block.var(name) - global_block._prepend_op( - type='feed', - inputs={'X': [feed_var]}, - outputs={'Out': [out]}, - attrs={'col': i}) - - def append_fetch_ops(inference_program, - fetch_target_names, - fetch_holder_name='fetch'): - global_block = inference_program.global_block() - fetch_var = global_block.create_var( - name=fetch_holder_name, - type=core.VarDesc.VarType.FETCH_LIST, - persistable=True) - - for i, name in enumerate(fetch_target_names): - global_block.append_op( - type='fetch', - inputs={'X': [name]}, - outputs={'Out': [fetch_var]}, - attrs={'col': i}) - - origin_program = main_program.clone() - main_program = main_program.clone() - global_block = main_program.global_block() - - need_to_remove_op_index = [] - for i, op in enumerate(global_block.ops): - op.desc.set_is_target(False) - if op.type == "feed" or op.type == "fetch": - need_to_remove_op_index.append(i) - - for index in need_to_remove_op_index[::-1]: - global_block._remove_op(index) - - main_program.desc.flush() - - main_program = main_program._prune(targets=target_vars) - main_program = main_program._inference_optimize(prune_read_op=True) - - fetch_var_names = [v.name for v in target_vars] - - prepend_feed_ops(main_program, feeded_var_names) - append_fetch_ops(main_program, fetch_var_names) - - return main_program diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 8d236dca22f2266771a029b7cdbf7db21aefb1fe..59c93ef0f77091e712dc95726583f0dfeb35c214 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -98,7 +98,6 @@ if(WIN32) LIST(REMOVE_ITEM TEST_OPS test_debugger) list(REMOVE_ITEM TEST_OPS test_fake_init_op) list(REMOVE_ITEM TEST_OPS test_merge_ids_op) - list(REMOVE_ITEM TEST_OPS test_split_ids_op) LIST(REMOVE_ITEM TEST_OPS test_ref_by_trainer_id_op) endif()