提交 1fe54416 编写于 作者: D dongdaxiang

move fs.cc and shell.cc into paddle/fluid/framework/io

test=develop
上级 53fbab5d
......@@ -23,11 +23,9 @@ endfunction()
add_subdirectory(ir)
add_subdirectory(details)
<<<<<<< HEAD
add_subdirectory(fleet)
=======
add_subdirectory(common)
>>>>>>> add fs_local_open example
add_subdirectory(io)
#ddim lib
proto_library(framework_proto SRCS framework.proto)
proto_library(async_executor_proto SRCS data_feed.proto)
......
......@@ -18,8 +18,6 @@ limitations under the License. */
#include "google/protobuf/text_format.h"
#include "gflags/gflags.h"
#include "paddle/fluid/framework/common/fs.h"
#include "paddle/fluid/framework/common/shell.h"
#include "paddle/fluid/framework/data_feed_factory.h"
#include "paddle/fluid/framework/executor_thread_worker.h"
#include "paddle/fluid/framework/feed_fetch_method.h"
......
......@@ -12,15 +12,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/data_feed.h"
#include <stdio_ext.h>
#include "gflags/gflags.h"
#include "google/protobuf/io/zero_copy_stream_impl.h"
#include "google/protobuf/message.h"
#include "google/protobuf/text_format.h"
#include "common/fs.h"
#include "common/shell.h"
#include "gflags/gflags.h"
#include "paddle/fluid/framework/data_feed.h"
#include "io/fs.h"
#include "io/shell.h"
#include "paddle/fluid/framework/feed_fetch_method.h"
#include "paddle/fluid/framework/feed_fetch_type.h"
......@@ -94,24 +93,8 @@ void PrivateQueueDataFeed<T>::SetQueueSize(int queue_size) {
template <typename T>
bool PrivateQueueDataFeed<T>::Start() {
CheckSetFileList();
std::string filename;
if (PickOneFile(&filename)) {
int err_no = 0;
std::string pipeline_cmd = "cat";
std::string path =
"/home/users/dongdaxiang/pslib_ctr/local/data_mod/part-00012";
fp_ = fs_open_read(path, &err_no, pipeline_cmd);
__fsetlocking(&*fp_, FSETLOCKING_BYCALLER);
thread_local LineFileReader reader;
while (reader.getline(&*(fp_.get()))) {
LOG(ERROR) << "read a line";
}
read_thread_ = std::thread(&PrivateQueueDataFeed::ReadThread, this);
read_thread_.detach();
}
queue_->Close();
read_thread_ = std::thread(&PrivateQueueDataFeed::ReadThread, this);
read_thread_.detach();
finish_start_ = true;
return true;
......@@ -119,10 +102,18 @@ bool PrivateQueueDataFeed<T>::Start() {
template <typename T>
void PrivateQueueDataFeed<T>::ReadThread() {
T instance;
while (ParseOneInstanceFromPipe(&instance)) {
queue_->Send(instance);
std::string filename;
while (PickOneFile(&filename)) {
int err_no = 0;
fp_ = fs_open_read(filename, &err_no, pipe_command_);
__fsetlocking(&*fp_, FSETLOCKING_BYCALLER);
thread_local string::LineFileReader reader;
T instance;
while (ParseOneInstanceFromPipe(&instance)) {
queue_->Send(instance);
}
}
queue_->Close();
}
template <typename T>
......@@ -177,15 +168,26 @@ void MultiSlotDataFeed::Init(
}
}
feed_vec_.resize(use_slots_.size());
pipe_command_ = data_feed_desc.pipe_command();
finish_init_ = true;
}
void MultiSlotDataFeed::ReadThread() {
LOG(ERROR) << "Haha";
std::vector<MultiSlotType> instance;
while (ParseOneInstanceFromPipe(&instance)) {
queue_->Send(instance);
std::string filename;
while (PickOneFile(&filename)) {
int err_no = 0;
fp_ = fs_open_read(filename, &err_no, pipe_command_);
__fsetlocking(&*fp_, FSETLOCKING_BYCALLER);
thread_local string::LineFileReader reader;
std::vector<MultiSlotType> instance;
int ins_num = 0;
while (ParseOneInstanceFromPipe(&instance)) {
ins_num++;
queue_->Send(instance);
}
LOG(ERROR) << "filename: " << filename << " inst num: " << ins_num;
}
queue_->Close();
}
bool MultiSlotDataFeed::CheckFile(const char* filename) {
......@@ -301,26 +303,32 @@ bool MultiSlotDataFeed::CheckFile(const char* filename) {
bool MultiSlotDataFeed::ParseOneInstanceFromPipe(
std::vector<MultiSlotType>* instance) {
LOG(ERROR) << "hehe";
thread_local LineFileReader reader;
thread_local string::LineFileReader reader;
/*
while (reader.getline(&*(fp_.get()))) {
/*
*/
/*
const char* str = reader.get();
std::string line = std::string(str);
LOG(ERROR) << line;
*/
*/
/*
LOG(ERROR) << "read a line";
}
return true;
/*
if (!reader.getline(fp_.get())) {
*/
if (!reader.getline(&*(fp_.get()))) {
return false;
} else {
// std::string& line = reader_.get();
// const char* str = line.c_str();
int use_slots_num = use_slots_.size();
instance->resize(use_slots_num);
const char* str = reader.get();
std::string line = std::string(str);
LOG(ERROR) << line;
// LOG(ERROR) << line;
char* endptr = const_cast<char*>(str);
int pos = 0;
for (size_t i = 0; i < use_slots_index_.size(); ++i) {
......@@ -355,7 +363,6 @@ bool MultiSlotDataFeed::ParseOneInstanceFromPipe(
}
return true;
}
*/
}
bool MultiSlotDataFeed::ParseOneInstance(std::vector<MultiSlotType>* instance) {
......
......@@ -21,12 +21,12 @@ limitations under the License. */
#include <thread> // NOLINT
#include <vector>
#include "paddle/fluid/framework/common/ps_string.h"
#include "paddle/fluid/framework/data_feed.pb.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/reader.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/operators/reader/blocking_queue.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace framework {
......@@ -115,6 +115,7 @@ class DataFeed {
bool finish_init_;
static bool finish_set_filelist_;
bool finish_start_;
std::string pipe_command_;
};
// PrivateQueueDataFeed is the base virtual class for ohther DataFeeds.
......@@ -154,7 +155,7 @@ class PrivateQueueDataFeed : public DataFeed {
std::ifstream file_;
std::shared_ptr<FILE> fp_;
size_t queue_size_;
LineFileReader reader_;
string::LineFileReader reader_;
// The queue for store parsed data
std::unique_ptr<paddle::operators::reader::BlockingQueue<T>> queue_;
};
......
......@@ -27,4 +27,5 @@ message DataFeedDesc {
optional string name = 1;
optional int32 batch_size = 2 [ default = 32 ];
optional MultiSlotDesc multi_slot_desc = 3;
optional string pipe_command = 4;
}
......@@ -13,15 +13,12 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/executor_thread_worker.h"
#include <stdio_ext.h>
#include <algorithm>
#include "google/protobuf/io/zero_copy_stream_impl.h"
#include "google/protobuf/message.h"
#include "google/protobuf/text_format.h"
#include "gflags/gflags.h"
#include "paddle/fluid/framework/common/fs.h"
#include "paddle/fluid/framework/common/shell.h"
#include "paddle/fluid/framework/feed_fetch_method.h"
#include "paddle/fluid/framework/feed_fetch_type.h"
#include "paddle/fluid/framework/lod_rank_table.h"
......@@ -247,8 +244,7 @@ void ExecutorThreadWorker::TrainFilesWithTimer() {
platform::SetNumThreads(1);
SetDevice();
thread_reader_->Start();
exit(0);
/*
std::vector<double> op_total_time;
std::vector<std::string> op_name;
for (auto& op : ops_) {
......@@ -292,7 +288,6 @@ void ExecutorThreadWorker::TrainFilesWithTimer() {
}
timeline.Start();
}
*/
}
void ExecutorThreadWorker::TrainFiles() {
......
......@@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/common/fs.h"
#include "paddle/fluid/framework/io/fs.h"
namespace paddle {
namespace framework {
......@@ -25,10 +25,11 @@ static void fs_add_read_converter_internal(std::string& path, // NOLINT
}
if (!is_pipe) {
path = format_string("( %s ) < \"%s\"", converter.c_str(), path.c_str());
path = string::format_string("( %s ) < \"%s\"", converter.c_str(),
path.c_str());
is_pipe = true;
} else {
path = format_string("%s | %s", path.c_str(), converter.c_str());
path = string::format_string("%s | %s", path.c_str(), converter.c_str());
}
}
......@@ -40,10 +41,11 @@ static void fs_add_write_converter_internal(std::string& path, // NOLINT
}
if (!is_pipe) {
path = format_string("( %s ) > \"%s\"", converter.c_str(), path.c_str());
path = string::format_string("( %s ) > \"%s\"", converter.c_str(),
path.c_str());
is_pipe = true;
} else {
path = format_string("%s | %s", converter.c_str(), path.c_str());
path = string::format_string("%s | %s", converter.c_str(), path.c_str());
}
}
......@@ -108,7 +110,8 @@ std::shared_ptr<FILE> localfs_open_read(std::string path,
std::shared_ptr<FILE> localfs_open_write(std::string path,
const std::string& converter) {
shell_execute(format_string("mkdir -p $(dirname \"%s\")", path.c_str()));
shell_execute(
string::format_string("mkdir -p $(dirname \"%s\")", path.c_str()));
bool is_pipe = false;
......@@ -134,7 +137,7 @@ void localfs_remove(const std::string& path) {
return;
}
shell_execute(format_string("rm -rf %s", path.c_str()));
shell_execute(string::format_string("rm -rf %s", path.c_str()));
}
std::vector<std::string> localfs_list(const std::string& path) {
......@@ -144,9 +147,10 @@ std::vector<std::string> localfs_list(const std::string& path) {
std::shared_ptr<FILE> pipe;
int err_no = 0;
pipe = shell_popen(format_string("find %s -type f -maxdepth 1", path.c_str()),
"r", &err_no);
LineFileReader reader;
pipe = shell_popen(
string::format_string("find %s -type f -maxdepth 1", path.c_str()), "r",
&err_no);
string::LineFileReader reader;
std::vector<std::string> list;
while (reader.getline(&*pipe)) {
......@@ -161,21 +165,22 @@ std::string localfs_tail(const std::string& path) {
return "";
}
return shell_get_command_output(format_string("tail -1 %s ", path.c_str()));
return shell_get_command_output(
string::format_string("tail -1 %s ", path.c_str()));
}
bool localfs_exists(const std::string& path) {
std::string test_f = shell_get_command_output(
format_string("[ -f %s ] ; echo $?", path.c_str()));
string::format_string("[ -f %s ] ; echo $?", path.c_str()));
if (trim_spaces(test_f) == "0") {
if (string::trim_spaces(test_f) == "0") {
return true;
}
std::string test_d = shell_get_command_output(
format_string("[ -d %s ] ; echo $?", path.c_str()));
string::format_string("[ -d %s ] ; echo $?", path.c_str()));
if (trim_spaces(test_d) == "0") {
if (string::trim_spaces(test_d) == "0") {
return true;
}
......@@ -187,7 +192,7 @@ void localfs_mkdir(const std::string& path) {
return;
}
shell_execute(format_string("mkdir -p %s", path.c_str()));
shell_execute(string::format_string("mkdir -p %s", path.c_str()));
}
static size_t& hdfs_buffer_size_internal() {
......@@ -211,11 +216,11 @@ void hdfs_set_command(const std::string& x) { hdfs_command_internal() = x; }
std::shared_ptr<FILE> hdfs_open_read(std::string path, int* err_no,
const std::string& converter) {
if (fs_end_with_internal(path, ".gz")) {
path =
format_string("%s -text \"%s\"", hdfs_command().c_str(), path.c_str());
path = string::format_string("%s -text \"%s\"", hdfs_command().c_str(),
path.c_str());
} else {
path =
format_string("%s -cat \"%s\"", hdfs_command().c_str(), path.c_str());
path = string::format_string("%s -cat \"%s\"", hdfs_command().c_str(),
path.c_str());
}
bool is_pipe = true;
......@@ -225,8 +230,8 @@ std::shared_ptr<FILE> hdfs_open_read(std::string path, int* err_no,
std::shared_ptr<FILE> hdfs_open_write(std::string path, int* err_no,
const std::string& converter) {
path =
format_string("%s -put - \"%s\"", hdfs_command().c_str(), path.c_str());
path = string::format_string("%s -put - \"%s\"", hdfs_command().c_str(),
path.c_str());
bool is_pipe = true;
if (fs_end_with_internal(path, ".gz\"")) {
......@@ -242,8 +247,8 @@ void hdfs_remove(const std::string& path) {
return;
}
shell_execute(format_string("%s -rmr %s &>/dev/null; true",
hdfs_command().c_str(), path.c_str()));
shell_execute(string::format_string("%s -rmr %s &>/dev/null; true",
hdfs_command().c_str(), path.c_str()));
}
std::vector<std::string> hdfs_list(const std::string& path) {
......@@ -261,14 +266,15 @@ std::vector<std::string> hdfs_list(const std::string& path) {
do {
err_no = 0;
std::shared_ptr<FILE> pipe;
pipe = shell_popen(format_string("%s -ls %s | ( grep ^- ; [ $? != 2 ] )",
hdfs_command().c_str(), path.c_str()),
"r", &err_no);
LineFileReader reader;
pipe = shell_popen(
string::format_string("%s -ls %s | ( grep ^- ; [ $? != 2 ] )",
hdfs_command().c_str(), path.c_str()),
"r", &err_no);
string::LineFileReader reader;
list.clear();
while (reader.getline(&*pipe)) {
std::vector<std::string> line = split_string(reader.get());
std::vector<std::string> line = string::split_string(reader.get());
if (line.size() != 8) {
continue;
}
......@@ -283,15 +289,15 @@ std::string hdfs_tail(const std::string& path) {
return "";
}
return shell_get_command_output(format_string(
return shell_get_command_output(string::format_string(
"%s -text %s | tail -1 ", hdfs_command().c_str(), path.c_str()));
}
bool hdfs_exists(const std::string& path) {
std::string test = shell_get_command_output(format_string(
std::string test = shell_get_command_output(string::format_string(
"%s -test -e %s ; echo $?", hdfs_command().c_str(), path.c_str()));
if (trim_spaces(test) == "0") {
if (string::trim_spaces(test) == "0") {
return true;
}
......@@ -303,8 +309,8 @@ void hdfs_mkdir(const std::string& path) {
return;
}
shell_execute(format_string("%s -mkdir %s; true", hdfs_command().c_str(),
path.c_str()));
shell_execute(string::format_string("%s -mkdir %s; true",
hdfs_command().c_str(), path.c_str()));
}
int fs_select_internal(const std::string& path) {
......@@ -445,5 +451,5 @@ void fs_mkdir(const std::string& path) {
LOG(FATAL) << "Not supported";
}
}
} // namespace framework
} // namespace paddle
} // end namespace framework
} // end namespace paddle
......@@ -18,8 +18,8 @@
#include <string>
#include <vector>
#include "glog/logging.h"
#include "paddle/fluid/framework/common/ps_string.h"
#include "paddle/fluid/framework/common/shell.h"
#include "paddle/fluid/framework/io/shell.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace framework {
......
......@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/common/shell.h"
#include "paddle/fluid/framework/io/shell.h"
namespace paddle {
namespace framework {
......@@ -282,7 +282,7 @@ std::string shell_get_command_output(const std::string& cmd) {
do {
err_no = 0;
std::shared_ptr<FILE> pipe = shell_popen(cmd, "r", &err_no);
LineFileReader reader;
string::LineFileReader reader;
if (reader.getdelim(&*pipe, 0)) {
pipe = nullptr;
......@@ -294,5 +294,5 @@ std::string shell_get_command_output(const std::string& cmd) {
return "";
}
} // namespace framework
} // namespace paddle
} // end namespace framework
} // end namespace paddle
......@@ -23,7 +23,7 @@
#include <string>
#include <utility>
#include "glog/logging.h"
#include "paddle/fluid/framework/common/ps_string.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace framework {
......
......@@ -23,7 +23,7 @@
#include "glog/logging.h"
namespace paddle {
namespace framework {
namespace string {
inline size_t count_spaces(const char* s) {
size_t count = 0;
......@@ -234,5 +234,5 @@ class LineFileReader {
size_t _buf_size = 0;
size_t _length = 0;
};
} // end namespace framework
} // end namespace string
} // end namespace paddle
......@@ -114,6 +114,24 @@ class DataFeedDesc(object):
self.proto_desc.multi_slot_desc.slots[self.__name_to_index[
name]].is_dense = True
def set_pipe_command(self, pipe_command):
"""
Pipeline command will be set with this function. In IO runtime,
pipeline command will be executed given user provided input raw
files.
Example:
>>> data_feed = fluid.DataFeedDesc('data.proto')
>>> data_feed.set_pipe_command('awk -F '\t' '{print $2}'')
Args:
pipe_command: a command string of shell command
Note:
Default is cat, i.e., cat user's input file list to data feed
"""
self.proto_desc.pipe_command = pipe_command
def set_use_slots(self, use_slots_name):
"""
Set if a specific slot will be used for training. A dataset shall
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册