dataset_container.cc 6.7 KB
Newer Older
X
xiexionghang 已提交
1 2 3 4 5 6 7 8 9 10
/* DatasetContainer
 * 保存一个数据源的样本,并驱动样本的异步加载
 */
#include <map>
#include <string>
#include <vector>
#include <memory>
#include <yaml-cpp/yaml.h>
#include "paddle/fluid/framework/io/shell.h"
#include "paddle/fluid/string/string_helper.h"
X
xiexionghang 已提交
11 12
#include "paddle/fluid/train/custom_trainer/feed/trainer_context.h"
#include "paddle/fluid/train/custom_trainer/feed/accessor/epoch_accessor.h"
X
xiexionghang 已提交
13 14 15 16 17
#include "paddle/fluid/train/custom_trainer/feed/dataset/dataset_container.h"

namespace paddle {
namespace custom_trainer {
namespace feed {
X
xiexionghang 已提交
18 19 20 21 22 23 24 25

int DatasetContainer::initialize(
        const YAML::Node& config, std::shared_ptr<TrainerContext> context) {
    _dataset_config = config;
    _trainer_context = context.get();
    //预取n轮样本数据
    _prefetch_num = config["prefetch_num"].as<int>();
    _dataset_list.resize(_prefetch_num);
X
xiexionghang 已提交
26 27 28
    for (int i = 0; i < _prefetch_num; ++i) {
        _dataset_list[i].reset(new DatasetInfo);
    }
X
xiexionghang 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53

    _data_root_paths = paddle::string::split_string(
        config["root_path"].as<std::string>(), " ");
    _data_split_interval = config["data_spit_interval"].as<int>();
    _data_path_formater = config["data_path_formater"].as<std::string>();
    std::string data_reader_class = config["data_reader"].as<std::string>();
    DataReader* data_reader = CREATE_CLASS(DataReader, data_reader_class);
    _data_reader.reset(data_reader);
    return _data_reader->initialize(config, context);
}   

std::shared_ptr<DatasetInfo> DatasetContainer::dataset(uint64_t timestamp) {
    auto* epoch_accessor = _trainer_context->epoch_accessor.get();
    auto data_idx = timestamp / epoch_accessor->epoch_time_interval();
    return _dataset_list[data_idx % _prefetch_num];
}

void DatasetContainer::pre_detect_data(uint64_t epoch_id) {
    int status = 0;
    auto* epoch_accessor = _trainer_context->epoch_accessor.get();
    time_t timestamp = epoch_accessor->epoch_timestamp(epoch_id);
    if (timestamp % epoch_accessor->epoch_time_interval() != 0) {
        LOG(FATAL) << "timestamp:" << timestamp << " don't match interval:" << epoch_accessor->epoch_time_interval();
        return;
    }
X
xiexionghang 已提交
54 55 56 57
    if (_downloader_thread == nullptr) {
        _downloader_thread.reset(new std::thread([this, timestamp](){
            async_download_data(timestamp);
        }));
X
xiexionghang 已提交
58
    }
X
xiexionghang 已提交
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
    for (int detect_idx = 0 ; detect_idx < _prefetch_num; ++detect_idx) {
        if (DatasetStatus::Empty != data_status(timestamp)) {
            continue;
        }
        size_t data_num = data_num_for_train(timestamp, epoch_accessor->epoch_time_interval(), _data_split_interval);
        uint64_t data_timestamp = timestamp % _data_split_interval == 0 ? timestamp : (timestamp / _data_split_interval + 1) * _data_split_interval;
        std::vector<std::string> data_path_list;
        for (int i = 0; i < _data_root_paths.size() && status == 0; ++i) {
            for (int j = 0; j < data_num && status == 0; ++j) {
                std::string path_suffix = format_timestamp(data_timestamp + j * _data_split_interval, _data_path_formater);
                std::string data_dir = _data_root_paths[i] + "/" + path_suffix;
                status = read_data_list(data_dir, data_path_list);
            }
        }
        if (status == 0) {
            auto dataset_info = dataset(timestamp);
            dataset_info->timestamp = timestamp;
            dataset_info->file_path_list = std::move(data_path_list);
            dataset_info->status = DatasetStatus::Detected;
        }
        timestamp += epoch_accessor->epoch_time_interval();
X
xiexionghang 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
    }
    return;
}

int DatasetContainer::read_data_list(const std::string& data_dir, std::vector<std::string>& data_list) {
    auto* environment = _trainer_context->environment.get();
    
    // 检查数据Ready
    int data_status = -1;
    if (environment->is_master_node(EnvironmentRole::WORKER)) {
        if (_data_reader->is_data_ready(data_dir)) {
            data_status = 0;
        }
    }
    paddle::framework::BinaryArchive ar;
    ar << data_status; 
    environment->bcast(ar, 0, EnvironmentRole::WORKER);
    ar >> data_status;
    if (data_status != 0) {
        return -1;
    } 
    
    // 读取文件列表
    ar.Clear();
    std::vector<std::string> data_path_list;
    if (environment->is_master_node(EnvironmentRole::WORKER)) {
         data_path_list = _data_reader->data_file_list(data_dir);
        ar << data_path_list;
    }
    environment->bcast(ar, 0, EnvironmentRole::WORKER);
    ar >> data_path_list;
    auto worker_id = environment->rank_id(EnvironmentRole::WORKER);
    auto worker_num = environment->node_num(EnvironmentRole::WORKER); 
    for (int i = worker_id; i < data_path_list.size(); i+=worker_num) {
        data_list.push_back(data_path_list[i]);
    }
    environment->barrier(EnvironmentRole::WORKER);
    return 0;
}

DatasetStatus DatasetContainer::epoch_data_status(uint64_t epoch_id) {
    auto* epoch_accessor = _trainer_context->epoch_accessor.get();
    time_t timestamp = epoch_accessor->epoch_timestamp(epoch_id);
    return data_status(timestamp);
}

DatasetStatus DatasetContainer::data_status(uint64_t timestamp) {
    auto dataset_info = dataset(timestamp);
    if (dataset_info->timestamp != timestamp) {
        return DatasetStatus::Empty;
    }
    return dataset_info->status;
}
X
xiexionghang 已提交
133
     
X
xiexionghang 已提交
134
paddle::framework::Channel<DataItem> DatasetContainer::fetch(uint64_t epoch_id) {
X
xiexionghang 已提交
135
    paddle::framework::Channel<DataItem> result;
X
xiexionghang 已提交
136 137 138
    auto* epoch_accessor = _trainer_context->epoch_accessor.get();
    time_t timestamp = epoch_accessor->epoch_timestamp(epoch_id);
    if (data_status(timestamp) != DatasetStatus::Ready) {
X
xiexionghang 已提交
139 140
        return result;
    }
X
xiexionghang 已提交
141 142
    auto dataset_info = dataset(timestamp);
    return dataset_info->data_channel;
X
xiexionghang 已提交
143 144
}  

X
xiexionghang 已提交
145 146 147 148 149 150
void DatasetContainer::async_download_data(uint64_t start_timestamp) {
    auto* epoch_accessor = _trainer_context->epoch_accessor.get();
    if (start_timestamp % epoch_accessor->epoch_time_interval() != 0) {
        LOG(FATAL) << "timestamp:" << start_timestamp << " don't match interval:" << epoch_accessor->epoch_time_interval();
        return;
    }
X
xiexionghang 已提交
151
    while (!_stop_download) {
X
xiexionghang 已提交
152 153 154 155 156 157 158 159 160 161 162 163
        auto dataset_info = dataset(start_timestamp);
        while (data_status(start_timestamp) != DatasetStatus::Detected) {
            sleep(30);
        }
        const auto& file_list = dataset_info->file_path_list;
        dataset_info->data_channel->Clear();
        while (_data_reader->read_all(file_list, dataset_info->data_channel) != 0) {
            dataset_info->data_channel->Clear();
            VLOG(0) << "timestamp:" << start_timestamp << " data read failed, retry";
            sleep(30); 
        }
        start_timestamp += epoch_accessor->epoch_time_interval();
X
xiexionghang 已提交
164 165 166
    }
}

X
xiexionghang 已提交
167 168 169
} // namespace feed
} // namespace custom_trainer
} // namespace paddle