提交 7b7b2fa7 编写于 作者: J Jamie Nisbet

remove column name id map from data buffer

clang format updates

self-review cleanups

cpplint fix

review updates

ci fixes
上级 9433ca64
...@@ -17,10 +17,8 @@ ...@@ -17,10 +17,8 @@
#define DATASET_ENGINE_DATA_BUFFER_H_ #define DATASET_ENGINE_DATA_BUFFER_H_
#include <iostream> #include <iostream>
#include <map>
#include <memory> #include <memory>
#include <string> #include <string>
#include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "dataset/util/allocator.h" #include "dataset/util/allocator.h"
...@@ -106,14 +104,6 @@ class DataBuffer { ...@@ -106,14 +104,6 @@ class DataBuffer {
Status SliceOff(int64_t number_of_rows); Status SliceOff(int64_t number_of_rows);
// Return a mapping from col names to col id.
std::unordered_map<std::string, int32_t> column_name_map() const { return column_name_map_; }
// Update the column name to index mapping.
void set_column_name_map(const std::unordered_map<std::string, int32_t> &new_col_name_map) {
column_name_map_ = new_col_name_map;
}
// Replacing mTensorTable, the unique_ptr assignment will release the old TensorTable. // Replacing mTensorTable, the unique_ptr assignment will release the old TensorTable.
void set_tensor_table(std::unique_ptr<TensorQTable> new_table) { tensor_table_ = std::move(new_table); } void set_tensor_table(std::unique_ptr<TensorQTable> new_table) { tensor_table_ = std::move(new_table); }
...@@ -123,18 +113,10 @@ class DataBuffer { ...@@ -123,18 +113,10 @@ class DataBuffer {
void Shuffle() {} // does nothing right now. possibly remove later void Shuffle() {} // does nothing right now. possibly remove later
// ***** column_name_map_ manipulation methods *****
// Append Column to mColumnNameMap
Status AppendColumn(const std::string &name, const int32_t &old_id) const { // does nothing right now
return Status::OK();
}
protected: protected:
int32_t buffer_id_; // An id for the buffer. int32_t buffer_id_; // An id for the buffer.
std::unique_ptr<TensorQTable> tensor_table_; // A table (row major) of Tensors std::unique_ptr<TensorQTable> tensor_table_; // A table (row major) of Tensors
BufferFlags buffer_flags_; // bit mask for various buffer properties BufferFlags buffer_flags_; // bit mask for various buffer properties
std::unordered_map<std::string, int32_t> column_name_map_; // A mapping between column index to column name.
}; };
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
#include "dataset/engine/dataset_iterator.h" #include "dataset/engine/dataset_iterator.h"
#include <unordered_map>
#include <utility> #include <utility>
#include "dataset/core/data_type.h" #include "dataset/core/data_type.h"
#include "dataset/core/tensor.h" #include "dataset/core/tensor.h"
...@@ -26,7 +27,7 @@ ...@@ -26,7 +27,7 @@
namespace mindspore { namespace mindspore {
namespace dataset { namespace dataset {
// Constructor of the IteratorBase // Constructor of the IteratorBase
IteratorBase::IteratorBase() : curr_buffer_(nullptr), eof_handled_(false) {} IteratorBase::IteratorBase() : curr_buffer_(nullptr), eof_handled_(false), first_row_(true) {}
IteratorBase::~IteratorBase() = default; IteratorBase::~IteratorBase() = default;
...@@ -46,6 +47,19 @@ Status IteratorBase::GetNextAsMap(TensorMap *out_map) { ...@@ -46,6 +47,19 @@ Status IteratorBase::GetNextAsMap(TensorMap *out_map) {
return Status::OK(); return Status::OK();
} }
// The column name mapping is needed to be able to produce the tensor map output.
// The column name mapping comes from the source operator that is producing the data into the iterator.
// To avoid having to fetch this for every time, we'll take a local copy of the column name id mapping
// and save in the iterator. We only have to do this once. All subsequent iterations use the same mapping.
// Note: This can only be done after the first row has been produced, as this guarantees the the child has
// it's column mapping set up.
if (first_row_) {
// Determine the column name map by calling the derived class method to retrieve the column
// name map
col_name_id_map_ = this->GetColumnNameMap();
first_row_ = false;
}
// Populate the out map from the row and return it // Populate the out map from the row and return it
for (auto colMap : col_name_id_map_) { for (auto colMap : col_name_id_map_) {
(*out_map)[colMap.first] = std::move(curr_row[colMap.second]); (*out_map)[colMap.first] = std::move(curr_row[colMap.second]);
...@@ -87,7 +101,6 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) { ...@@ -87,7 +101,6 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
// Check if we need to get a new DataBuffer to iterate. // Check if we need to get a new DataBuffer to iterate.
if (curr_buffer_ == nullptr || curr_buffer_->NumRows() == 0) { if (curr_buffer_ == nullptr || curr_buffer_->NumRows() == 0) {
col_name_id_map_.clear();
RETURN_IF_NOT_OK(root_->GetNextBuffer(&curr_buffer_)); RETURN_IF_NOT_OK(root_->GetNextBuffer(&curr_buffer_));
// Since GetNextBuffer was used rather than GetNextInput(), it means we need to manually // Since GetNextBuffer was used rather than GetNextInput(), it means we need to manually
...@@ -120,8 +133,6 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) { ...@@ -120,8 +133,6 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
curr_buffer_.reset(); // explicitly free the eof buffer curr_buffer_.reset(); // explicitly free the eof buffer
return Status::OK(); return Status::OK();
} }
col_name_id_map_ = curr_buffer_->column_name_map();
} }
// If we got this far, now it's time to pop that next row for return to caller // If we got this far, now it's time to pop that next row for return to caller
...@@ -157,6 +168,11 @@ Status DatasetIterator::GetOutputTypes(std::vector<DataType> *out_types) { ...@@ -157,6 +168,11 @@ Status DatasetIterator::GetOutputTypes(std::vector<DataType> *out_types) {
return Status::OK(); return Status::OK();
} }
// Getter
std::unordered_map<std::string, int32_t> DatasetIterator::GetColumnNameMap() const {
return root_->column_name_id_map();
}
// Constructor of the ChildIterator // Constructor of the ChildIterator
ChildIterator::ChildIterator(DatasetOp *current_op, int32_t worker_id, int32_t child_idx) ChildIterator::ChildIterator(DatasetOp *current_op, int32_t worker_id, int32_t child_idx)
: IteratorBase(), current_op_(current_op), child_idx_(child_idx), worker_id_(worker_id), end_epoch_(false) {} : IteratorBase(), current_op_(current_op), child_idx_(child_idx), worker_id_(worker_id), end_epoch_(false) {}
...@@ -177,7 +193,6 @@ Status ChildIterator::FetchNextTensorRow(TensorRow *out_row) { ...@@ -177,7 +193,6 @@ Status ChildIterator::FetchNextTensorRow(TensorRow *out_row) {
// Check if we need to get a new DataBuffer to iterate. // Check if we need to get a new DataBuffer to iterate.
if (curr_buffer_ == nullptr || curr_buffer_->NumRows() == 0) { if (curr_buffer_ == nullptr || curr_buffer_->NumRows() == 0) {
col_name_id_map_.clear();
RETURN_IF_NOT_OK(current_op_->GetNextInput(&curr_buffer_, worker_id_, child_idx_)); RETURN_IF_NOT_OK(current_op_->GetNextInput(&curr_buffer_, worker_id_, child_idx_));
// Unlike the DatasetIterator, this child iterator does not quit after eoe. // Unlike the DatasetIterator, this child iterator does not quit after eoe.
...@@ -194,8 +209,6 @@ Status ChildIterator::FetchNextTensorRow(TensorRow *out_row) { ...@@ -194,8 +209,6 @@ Status ChildIterator::FetchNextTensorRow(TensorRow *out_row) {
eof_handled_ = true; eof_handled_ = true;
return Status::OK(); return Status::OK();
} }
col_name_id_map_ = curr_buffer_->column_name_map();
} }
// If we got this far, now it's time to pop that next row for return to caller // If we got this far, now it's time to pop that next row for return to caller
...@@ -226,5 +239,10 @@ Status ChildIterator::Drain() { ...@@ -226,5 +239,10 @@ Status ChildIterator::Drain() {
} }
return Status::OK(); return Status::OK();
} }
// Getter
std::unordered_map<std::string, int32_t> ChildIterator::GetColumnNameMap() const {
return current_op_->child(child_idx_)->column_name_id_map();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore
...@@ -66,15 +66,13 @@ class IteratorBase { ...@@ -66,15 +66,13 @@ class IteratorBase {
// Getter // Getter
// @return The string to column id mapping. // @return The string to column id mapping.
std::unordered_map<std::string, int32_t> col_name_id_map() const { return col_name_id_map_; } virtual std::unordered_map<std::string, int32_t> GetColumnNameMap() const = 0;
protected: protected:
std::unique_ptr<DataBuffer> curr_buffer_; // holds the current buffer std::unique_ptr<DataBuffer> curr_buffer_; // holds the current buffer
bool eof_handled_; // T/F if this op got an eof
// The column name-id mapping for the current data buffer. bool first_row_; // internal tracking for first row case
std::unordered_map<std::string, int32_t> col_name_id_map_; std::unordered_map<std::string, int32_t> col_name_id_map_;
bool eof_handled_; // T/F if this op got an eof
}; };
// The DatasetIterator derived class is for fetching rows off the end/root of the execution tree. // The DatasetIterator derived class is for fetching rows off the end/root of the execution tree.
...@@ -104,6 +102,10 @@ class DatasetIterator : public IteratorBase { ...@@ -104,6 +102,10 @@ class DatasetIterator : public IteratorBase {
// @return Status - The error code return // @return Status - The error code return
Status GetOutputTypes(std::vector<DataType> *out_types); Status GetOutputTypes(std::vector<DataType> *out_types);
// Getter
// @return The string to column id mapping.
std::unordered_map<std::string, int32_t> GetColumnNameMap() const override;
private: private:
std::shared_ptr<DatasetOp> root_; // saves the root of the executionTree std::shared_ptr<DatasetOp> root_; // saves the root of the executionTree
TensorRow device_queue_row_; TensorRow device_queue_row_;
...@@ -134,6 +136,10 @@ class ChildIterator : public IteratorBase { ...@@ -134,6 +136,10 @@ class ChildIterator : public IteratorBase {
// @return Status - The error code return // @return Status - The error code return
Status Drain(); Status Drain();
// Getter
// @return The string to column id mapping.
std::unordered_map<std::string, int32_t> GetColumnNameMap() const override;
private: private:
DatasetOp *current_op_; // The parent operator. We consume from it's children. DatasetOp *current_op_; // The parent operator. We consume from it's children.
int32_t child_idx_; // The specific child this iterator will fetch from. int32_t child_idx_; // The specific child this iterator will fetch from.
......
...@@ -96,9 +96,8 @@ Status BarrierOp::operator()() { ...@@ -96,9 +96,8 @@ Status BarrierOp::operator()() {
if (!curr_table->empty()) { if (!curr_table->empty()) {
std::unique_ptr<DataBuffer> curr_buffer = std::make_unique<DataBuffer>(buffer_id_, DataBuffer::kDeBFlagNone); std::unique_ptr<DataBuffer> curr_buffer = std::make_unique<DataBuffer>(buffer_id_, DataBuffer::kDeBFlagNone);
curr_buffer->set_tensor_table(std::move(curr_table)); curr_buffer->set_tensor_table(std::move(curr_table));
curr_buffer->set_column_name_map(col_name_id_map_);
MS_LOG(DEBUG) << "Barrier operator finished one buffer, pushing, rows " << curr_buffer->NumRows() << ", cols " MS_LOG(DEBUG) << "Barrier operator finished one buffer, pushing, rows " << curr_buffer->NumRows() << ", cols "
<< curr_buffer->NumCols() << ", map " << col_name_id_map_.size() << "."; << curr_buffer->NumCols() << ", map " << column_name_id_map_.size() << ".";
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(curr_buffer))); RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(curr_buffer)));
buffer_id_++; buffer_id_++;
} }
...@@ -144,9 +143,10 @@ Status BarrierOp::prepare(TensorQTable *const table) { ...@@ -144,9 +143,10 @@ Status BarrierOp::prepare(TensorQTable *const table) {
RETURN_IF_NOT_OK(blockCond()); RETURN_IF_NOT_OK(blockCond());
table->push_back(std::move(new_row)); table->push_back(std::move(new_row));
// At this point we have 1 row produced, we take the old column map id and use it in the new table
// Initializing col_name_id_map_ from the first data buffer. // Assign the column name id map
col_name_id_map_ = child_iterator_->col_name_id_map(); RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild());
// the update code below shouldn't do anything bad if the column name already exists. // the update code below shouldn't do anything bad if the column name already exists.
return Status::OK(); return Status::OK();
} }
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "dataset/core/tensor.h" #include "dataset/core/tensor.h"
...@@ -157,8 +156,6 @@ class BarrierOp : public PipelineOp { ...@@ -157,8 +156,6 @@ class BarrierOp : public PipelineOp {
int32_t rows_per_buffer_; int32_t rows_per_buffer_;
// buffer_id // buffer_id
int32_t buffer_id_; int32_t buffer_id_;
// local variable to keep track of the buffer information
std::unordered_map<std::string, int32_t> col_name_id_map_;
// iterator to pull new rows, we only have one child // iterator to pull new rows, we only have one child
std::unique_ptr<ChildIterator> child_iterator_; std::unique_ptr<ChildIterator> child_iterator_;
// condition name, to support multiple barriers // condition name, to support multiple barriers
......
...@@ -74,7 +74,7 @@ Status BatchOp::operator()() { ...@@ -74,7 +74,7 @@ Status BatchOp::operator()() {
std::unique_ptr<TensorQTable> table = std::make_unique<TensorQTable>(); std::unique_ptr<TensorQTable> table = std::make_unique<TensorQTable>();
child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0); child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row)); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
column_name_map_ = child_iterator_->col_name_id_map(); RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild()); // must come after the first fetch above
int32_t cur_batch_size = 0; int32_t cur_batch_size = 0;
RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(0, 0, 0))); RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(0, 0, 0)));
while (child_iterator_->eof_handled() == false) { while (child_iterator_->eof_handled() == false) {
...@@ -196,7 +196,6 @@ Status BatchOp::MakeBatchedBuffer(std::pair<std::unique_ptr<TensorQTable>, CBatc ...@@ -196,7 +196,6 @@ Status BatchOp::MakeBatchedBuffer(std::pair<std::unique_ptr<TensorQTable>, CBatc
std::unique_ptr<TensorQTable> dest_table = std::make_unique<TensorQTable>(); std::unique_ptr<TensorQTable> dest_table = std::make_unique<TensorQTable>();
RETURN_IF_NOT_OK(BatchRows(&table_pair.first, &dest_table, table_pair.first->size())); RETURN_IF_NOT_OK(BatchRows(&table_pair.first, &dest_table, table_pair.first->size()));
(*db)->set_tensor_table(std::move(dest_table)); (*db)->set_tensor_table(std::move(dest_table));
(*db)->set_column_name_map(column_name_map_);
return Status::OK(); return Status::OK();
} }
...@@ -218,12 +217,12 @@ Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> ...@@ -218,12 +217,12 @@ Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo>
TensorBatchTable input_table; TensorBatchTable input_table;
input_table.reserve(pyfunc_column_names_.size()); input_table.reserve(pyfunc_column_names_.size());
for (std::string col_name : pyfunc_column_names_) { for (std::string col_name : pyfunc_column_names_) {
if (column_name_map_.find(col_name) == column_name_map_.end()) { if (column_name_id_map_.find(col_name) == column_name_id_map_.end()) {
RETURN_STATUS_UNEXPECTED("column : '" + col_name + "' does not exist\n"); RETURN_STATUS_UNEXPECTED("column : '" + col_name + "' does not exist\n");
} }
TensorBatch tensor_batch; TensorBatch tensor_batch;
tensor_batch.reserve(table_pair->first->size()); tensor_batch.reserve(table_pair->first->size());
size_t col_idx = static_cast<size_t>(column_name_map_[col_name]); size_t col_idx = static_cast<size_t>(column_name_id_map_[col_name]);
for (size_t row_idx = 0; row_idx < table_pair->first->size(); row_idx++) { for (size_t row_idx = 0; row_idx < table_pair->first->size(); row_idx++) {
tensor_batch.push_back(std::move(table_pair->first->at(row_idx)[col_idx])); tensor_batch.push_back(std::move(table_pair->first->at(row_idx)[col_idx]));
} }
...@@ -236,7 +235,7 @@ Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> ...@@ -236,7 +235,7 @@ Status BatchOp::MapColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo>
// Write back to TensorQTable // Write back to TensorQTable
for (size_t input_idx = 0; input_idx < pyfunc_column_names_.size(); input_idx++) { for (size_t input_idx = 0; input_idx < pyfunc_column_names_.size(); input_idx++) {
size_t col_idx = static_cast<size_t>(column_name_map_[pyfunc_column_names_[input_idx]]); size_t col_idx = static_cast<size_t>(column_name_id_map_[pyfunc_column_names_[input_idx]]);
size_t row_id = 0; size_t row_id = 0;
for (TensorRow &row : *(table_pair->first)) { for (TensorRow &row : *(table_pair->first)) {
row[col_idx] = std::move(output_table[input_idx][row_id++]); row[col_idx] = std::move(output_table[input_idx][row_id++]);
...@@ -372,11 +371,12 @@ Status BatchOp::PadTensor(std::shared_ptr<Tensor> src, std::shared_ptr<Tensor> * ...@@ -372,11 +371,12 @@ Status BatchOp::PadTensor(std::shared_ptr<Tensor> src, std::shared_ptr<Tensor> *
Status BatchOp::PadColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> *table_pair) { Status BatchOp::PadColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> *table_pair) {
RETURN_UNEXPECTED_IF_NULL(table_pair); // placeholder for now, might need this in the future RETURN_UNEXPECTED_IF_NULL(table_pair); // placeholder for now, might need this in the future
CHECK_FAIL_RETURN_UNEXPECTED(table_pair->first->front().size() == column_name_map_.size(), "col_name_map mismatch"); CHECK_FAIL_RETURN_UNEXPECTED(table_pair->first->front().size() == column_name_id_map_.size(),
std::vector<float> pad_vals(column_name_map_.size(), 0); // value to pad each column's tensor with, default 0 "col_name_map mismatch");
std::vector<float> pad_vals(column_name_id_map_.size(), 0); // value to pad each column's tensor with, default 0
std::set<int32_t> pad_cols; std::set<int32_t> pad_cols;
// padded_shape provided by user, maximum shapes of current batch of tensors // padded_shape provided by user, maximum shapes of current batch of tensors
std::vector<std::vector<dsize_t>> pad_shapes(column_name_map_.size()), max_shapes(column_name_map_.size()); std::vector<std::vector<dsize_t>> pad_shapes(column_name_id_map_.size()), max_shapes(column_name_id_map_.size());
RETURN_IF_NOT_OK(UnpackPadInfo(&pad_cols, &pad_vals, &pad_shapes)); RETURN_IF_NOT_OK(UnpackPadInfo(&pad_cols, &pad_vals, &pad_shapes));
// init each shape in max_shape to {-1,-1...} init each unspecified shape in pad_shape to -1 as well // init each shape in max_shape to {-1,-1...} init each unspecified shape in pad_shape to -1 as well
...@@ -418,14 +418,14 @@ Status BatchOp::PadColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> ...@@ -418,14 +418,14 @@ Status BatchOp::PadColumns(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo>
Status BatchOp::UnpackPadInfo(std::set<int32_t> *pad_cols, std::vector<float> *pad_vals, Status BatchOp::UnpackPadInfo(std::set<int32_t> *pad_cols, std::vector<float> *pad_vals,
std::vector<std::vector<dsize_t>> *pad_shapes) { std::vector<std::vector<dsize_t>> *pad_shapes) {
if (pad_info_.empty()) { // if pad_info empty, pad every columns automatically if (pad_info_.empty()) { // if pad_info empty, pad every columns automatically
for (dsize_t col_id = 0; col_id < column_name_map_.size(); col_id++) { for (dsize_t col_id = 0; col_id < column_name_id_map_.size(); col_id++) {
pad_cols->insert(col_id); pad_cols->insert(col_id);
} }
} else { } else {
for (auto p : pad_info_) { for (auto p : pad_info_) {
CHECK_FAIL_RETURN_UNEXPECTED(column_name_map_.find(p.first) != column_name_map_.end(), CHECK_FAIL_RETURN_UNEXPECTED(column_name_id_map_.find(p.first) != column_name_id_map_.end(),
"no column exists with name:" + p.first); "no column exists with name:" + p.first);
dsize_t col_id = static_cast<dsize_t>(column_name_map_[p.first]); dsize_t col_id = static_cast<dsize_t>(column_name_id_map_[p.first]);
CHECK_FAIL_RETURN_UNEXPECTED(col_id < pad_vals->size() && col_id < pad_shapes->size(), "col_id out of bound"); CHECK_FAIL_RETURN_UNEXPECTED(col_id < pad_vals->size() && col_id < pad_shapes->size(), "col_id out of bound");
pad_cols->insert(col_id); pad_cols->insert(col_id);
(*pad_vals)[col_id] = p.second.second; // set pad values (*pad_vals)[col_id] = p.second.second; // set pad values
......
...@@ -263,7 +263,6 @@ class BatchOp : public ParallelOp { ...@@ -263,7 +263,6 @@ class BatchOp : public ParallelOp {
std::vector<std::string> pyfunc_column_names_; // Name of the columns to perform map op on std::vector<std::string> pyfunc_column_names_; // Name of the columns to perform map op on
std::map<std::string, std::pair<TensorShape, float>> pad_info_; // column names to perform padding on std::map<std::string, std::pair<TensorShape, float>> pad_info_; // column names to perform padding on
std::unique_ptr<ChildIterator> child_iterator_; // child iterator for fetching TensorRows 1 by 1 std::unique_ptr<ChildIterator> child_iterator_; // child iterator for fetching TensorRows 1 by 1
std::unordered_map<std::string, int32_t> column_name_map_; // Map of column_name: column_index
QueueList<std::pair<std::unique_ptr<TensorQTable>, CBatchInfo>> worker_queues_; // internal queue for syncing worker QueueList<std::pair<std::unique_ptr<TensorQTable>, CBatchInfo>> worker_queues_; // internal queue for syncing worker
py::function batch_size_func_; // Function pointer of batch size function py::function batch_size_func_; // Function pointer of batch size function
py::function batch_map_func_; // Function pointer of per batch map function py::function batch_map_func_; // Function pointer of per batch map function
......
...@@ -36,7 +36,8 @@ DatasetOp::DatasetOp(int32_t op_connector_size) ...@@ -36,7 +36,8 @@ DatasetOp::DatasetOp(int32_t op_connector_size)
operator_id_(kInvalidOperatorId), operator_id_(kInvalidOperatorId),
tree_(nullptr), tree_(nullptr),
state_(OpState::kDeOpIdle), state_(OpState::kDeOpIdle),
op_ctrl_flags_(kDeOpNone) { op_ctrl_flags_(kDeOpNone),
first_fetch_(true) {
// The operator starts out with an invalid operator id. The only way to // The operator starts out with an invalid operator id. The only way to
// get it out of invalid state is to assign the operator to an execution tree. // get it out of invalid state is to assign the operator to an execution tree.
} }
...@@ -211,5 +212,42 @@ Status DatasetOp::Reset() { ...@@ -211,5 +212,42 @@ Status DatasetOp::Reset() {
state_ = OpState::kDeOpRunning; state_ = OpState::kDeOpRunning;
return Status::OK(); return Status::OK();
} }
// gives a string output for the column map for handy debug printing
std::string DatasetOp::ColumnNameMapAsString() const {
std::string outStr = "Column name id map: ";
for (auto &it : column_name_id_map_) {
outStr += (" " + it.first + ":" + std::to_string(it.second));
}
return outStr;
}
// A helper function for providing assignment of the column name map.
// This grabs the map from child 0 and assigns it into this op.
// Can only be used if number of children is 1.
Status DatasetOp::AssignColMapFromChild() {
if (child_.size() > 1) {
RETURN_STATUS_UNEXPECTED("Assigning column name map from child only works for single-child operators.");
}
// Assign the correct column name map to this op by taking it from the input child.
// This must be done AFTER the first fetch, but only needs to be done once by the first worker to
// do the first fetch.
if (first_fetch_) {
// If there was a single worker, or this is being called from a master thread in a parallel op,
// then the mutex is not really needed here, although it's harmless.
std::unique_lock<std::mutex> lock(column_name_map_mutex_);
// If the map has not been set up yet, then we are the first one in to set it up. The first_fetch_ (dirty read)
// bool allows us to avoid acquiring the lock if the map has already been set.
if (column_name_id_map_.empty()) {
column_name_id_map_ = child_[0]->column_name_id_map();
first_fetch_ = false;
if (column_name_id_map_.empty()) {
RETURN_STATUS_UNEXPECTED("Child column name map cannot be empty!");
}
}
MS_LOG(INFO) << "Setting column map after first fetch:\n" << DatasetOp::ColumnNameMapAsString();
}
return Status::OK();
}
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore
...@@ -17,6 +17,9 @@ ...@@ -17,6 +17,9 @@
#define DATASET_ENGINE_DATASETOPS_DATASET_OP_H_ #define DATASET_ENGINE_DATASETOPS_DATASET_OP_H_
#include <memory> #include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector> #include <vector>
#include "dataset/core/constants.h" #include "dataset/core/constants.h"
#include "dataset/engine/db_connector.h" #include "dataset/engine/db_connector.h"
...@@ -59,7 +62,7 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> { ...@@ -59,7 +62,7 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
// @param child - shared pointer to the child to add. // @param child - shared pointer to the child to add.
Status AddChild(std::shared_ptr<DatasetOp> child); Status AddChild(std::shared_ptr<DatasetOp> child);
// Getter function to get a shared pointer to our childAdds a operator to become our child. // Getter function to get a shared pointer to our child
// @param child_index - An operator can have n children. Indicates choose which child to return. // @param child_index - An operator can have n children. Indicates choose which child to return.
std::shared_ptr<DatasetOp> child(int32_t child_index) const; std::shared_ptr<DatasetOp> child(int32_t child_index) const;
...@@ -194,20 +197,41 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> { ...@@ -194,20 +197,41 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
// @return Status // @return Status
virtual Status RegisterWorkerConnectors() { return Status::OK(); } virtual Status RegisterWorkerConnectors() { return Status::OK(); }
// Getter for the column name mapping
// @return The returned map
std::unordered_map<std::string, int32_t> column_name_id_map() const { return column_name_id_map_; }
// Checks if the column name map has been set up yet for this op
// @return - T/F if the operator has the map set up
bool HasColumnNameMap() const { return (column_name_id_map_.empty()); }
// gives a string output for the column map for handy debug printing
// @return - the column name map as a string
std::string ColumnNameMapAsString() const;
protected: protected:
// Adds a parent operator to this operator // Adds a parent operator to this operator
// @notes External callers do not have access to this function. // @notes External callers do not have access to this function.
// @param parent - The parent node to add // @param parent - The parent node to add
void AddParent(const DatasetOp *parent); void AddParent(const DatasetOp *parent);
std::vector<std::shared_ptr<DatasetOp>> child_; // Child nodes // A helper function for providing an assignment of the column name map.
std::vector<const DatasetOp *> parent_; // Parent nodes. No ownership and read-only // This grabs the map from child 0 and assigns it into this op.
int32_t oc_queue_size_; // Capacity for each out_connector_ // Can only be used if number of children is 1.
int32_t operator_id_; // Generated id for the node // @return - Status
ExecutionTree *tree_; // Back pointer to our tree. Status AssignColMapFromChild();
OpState state_; // The state of the operator, Running, Idle, Terminated
uint32_t op_ctrl_flags_; // Flags for the operator std::vector<std::shared_ptr<DatasetOp>> child_; // Child nodes
std::unique_ptr<DbConnector> out_connector_; // Output Connector std::vector<const DatasetOp *> parent_; // Parent nodes. No ownership and read-only
int32_t oc_queue_size_; // Capacity for each out_connector_
int32_t operator_id_; // Generated id for the node
ExecutionTree *tree_; // Back pointer to our tree.
OpState state_; // The state of the operator, Running, Idle, Terminated
uint32_t op_ctrl_flags_; // Flags for the operator
std::unique_ptr<DbConnector> out_connector_; // Output Connector
std::unordered_map<std::string, int32_t> column_name_id_map_; // Mapping between col index and col name
bool first_fetch_; // For use when setting column map
std::mutex column_name_map_mutex_; // For protecting shared access to the column map
private: private:
// Sets the operator id. // Sets the operator id.
......
...@@ -75,10 +75,9 @@ Status FilterOp::EofReceived(int32_t) { return Status::OK(); } ...@@ -75,10 +75,9 @@ Status FilterOp::EofReceived(int32_t) { return Status::OK(); }
Status FilterOp::EoeReceived(int32_t) { return Status::OK(); } Status FilterOp::EoeReceived(int32_t) { return Status::OK(); }
// Validating if each of the input_columns exists in the DataBuffer. // Validating if each of the input_columns exists in the DataBuffer.
Status FilterOp::ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map, Status FilterOp::ValidateInColumns(const std::vector<std::string> *input_columns) {
const std::vector<std::string> *input_columns) {
for (const auto &inCol : *input_columns) { for (const auto &inCol : *input_columns) {
bool found = col_name_id_map.find(inCol) != col_name_id_map.end() ? true : false; bool found = column_name_id_map_.find(inCol) != column_name_id_map_.end() ? true : false;
if (!found) { if (!found) {
std::string err_msg = "input column name: " + inCol + " doesn't exist in the dataset columns."; std::string err_msg = "input column name: " + inCol + " doesn't exist in the dataset columns.";
RETURN_STATUS_UNEXPECTED(err_msg); RETURN_STATUS_UNEXPECTED(err_msg);
...@@ -125,6 +124,9 @@ Status FilterOp::WorkerEntry(int32_t worker_id) { ...@@ -125,6 +124,9 @@ Status FilterOp::WorkerEntry(int32_t worker_id) {
continue; continue;
} }
// Now that the first fetch is in, use the helper function to assign the column name map to this op.
RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild());
RETURN_IF_NOT_OK(CheckColumns(in_buffer.get(), &in_columns_)); RETURN_IF_NOT_OK(CheckColumns(in_buffer.get(), &in_columns_));
// if the databuffer was all filtered, it is marked as kFilterEmpty. // if the databuffer was all filtered, it is marked as kFilterEmpty.
...@@ -161,10 +163,9 @@ Status FilterOp::WorkerCompute(DataBuffer *in_buffer, std::unique_ptr<TensorQTab ...@@ -161,10 +163,9 @@ Status FilterOp::WorkerCompute(DataBuffer *in_buffer, std::unique_ptr<TensorQTab
MS_LOG(INFO) << "Input columns in filter operator is empty, will apply to the all column in the current table."; MS_LOG(INFO) << "Input columns in filter operator is empty, will apply to the all column in the current table.";
to_process = cur_row; to_process = cur_row;
} else { } else {
std::unordered_map<std::string, int32_t> col_map = in_buffer->column_name_map();
(void)std::transform( (void)std::transform(
in_columns_.begin(), in_columns_.end(), std::back_inserter(to_process), in_columns_.begin(), in_columns_.end(), std::back_inserter(to_process),
[&cur_row, &col_map](const auto &it) -> std::shared_ptr<Tensor> { return cur_row[col_map[it]]; }); [&cur_row, this](const auto &it) -> std::shared_ptr<Tensor> { return cur_row[column_name_id_map_[it]]; });
} }
bool predicate = true; bool predicate = true;
RETURN_IF_NOT_OK(InvokePredicateFunc(to_process, &predicate)); RETURN_IF_NOT_OK(InvokePredicateFunc(to_process, &predicate));
...@@ -217,9 +218,8 @@ Status FilterOp::CheckColumns(const DataBuffer *in_buf, const std::vector<std::s ...@@ -217,9 +218,8 @@ Status FilterOp::CheckColumns(const DataBuffer *in_buf, const std::vector<std::s
if (num_rows == 0 || num_cols == 0) { if (num_rows == 0 || num_cols == 0) {
RETURN_STATUS_UNEXPECTED("FilterOp is getting an empty DataBuffer."); RETURN_STATUS_UNEXPECTED("FilterOp is getting an empty DataBuffer.");
} }
std::unordered_map<std::string, int32_t> col_name_id_map = in_buf->column_name_map();
// Check if there is invalid column name in the inColumns. // Check if there is invalid column name in the inColumns.
RETURN_IF_NOT_OK(ValidateInColumns(col_name_id_map, input_columns)); RETURN_IF_NOT_OK(ValidateInColumns(input_columns));
return Status::OK(); return Status::OK();
} }
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
#include <memory> #include <memory>
#include <queue> #include <queue>
#include <string> #include <string>
#include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "dataset/engine/datasetops/parallel_op.h" #include "dataset/engine/datasetops/parallel_op.h"
...@@ -162,11 +161,9 @@ class FilterOp : public ParallelOp { ...@@ -162,11 +161,9 @@ class FilterOp : public ParallelOp {
// Private function for validating if each of the user specified input column names // Private function for validating if each of the user specified input column names
// exist in the DataBuffer. // exist in the DataBuffer.
// @param col_name_id_map The column name to index mapping obtained from DataBuffer.
// @param input_columns The vector of input column names used in the current thread. // @param input_columns The vector of input column names used in the current thread.
// @return Status The error code return. // @return Status The error code return.
Status ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map, Status ValidateInColumns(const std::vector<std::string> *input_columns);
const std::vector<std::string> *input_columns);
// Private function for checking the column legality // Private function for checking the column legality
// @param in_buf A raw pointer to the DataBuffer. A raw pointer is fine because this function does not manage memory // @param in_buf A raw pointer to the DataBuffer. A raw pointer is fine because this function does not manage memory
......
...@@ -147,28 +147,33 @@ Status MapOp::operator()() { ...@@ -147,28 +147,33 @@ Status MapOp::operator()() {
Status MapOp::WorkerEntry(int32_t worker_id) { Status MapOp::WorkerEntry(int32_t worker_id) {
// Handshake with TaskManager that thread creation is successful. // Handshake with TaskManager that thread creation is successful.
TaskManager::FindMe()->Post(); TaskManager::FindMe()->Post();
// MapOp is not using the ChildIterator class to fetch rows because it needs to track
// rows at a per-buffer level. ChildIterator abstracts the concept of buffers making it
// less convenient to use that interface for fetching.
std::unique_ptr<DataBuffer> in_buffer; std::unique_ptr<DataBuffer> in_buffer;
// Loop until eof buffer is encountered // Getting a databuffer to work on.
while (true) { // Perform the first fetch here outside of the loop. This allows us to execute one-time only
// Getting a databuffer to work on. // initializations that happen after the first fetch.
// When PerformanceMode is enabled, workers pop from the local queue. RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id));
// Otherwise, workers pop from the first child output Connector.
if (perf_mode_) { // Initialize details related to column selections and column map by calling WorkerEntryInit.
RETURN_IF_NOT_OK(local_queues_[worker_id]->PopFront(&in_buffer)); // WorkerEntryInit contains thread-safe lock to ensure that this init work is only performed once
} else { // by the first worker to enter the codepath. All other threads will share the const info that
RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&in_buffer, worker_id)); // gets set up here going forward.
} // Special case: if there's more threads than buffers, some threads simply get the final control
// messages (eoe/eof), and so they will not perform the init work.
if (!in_buffer->eoe() && !in_buffer->eof()) {
RETURN_IF_NOT_OK(WorkerEntryInit(in_buffer.get()));
}
// Now that init work is done, drop into the main fetching loop.
// Map op does not use child iterator, and it needs to manually handle eoe and eof's itself
// rather than use the base-class defaults.
while (true) {
// Handle EOE and EOF ourselves. Implicit eoe/eof handling in GetNextInput does not work // Handle EOE and EOF ourselves. Implicit eoe/eof handling in GetNextInput does not work
// with Performance Mode design. // with Performance Mode design.
if (in_buffer->eoe()) { if (in_buffer->eoe()) {
// Calling base class EoeReceived to forward eoe buffer. // Calling base class EoeReceived to forward eoe buffer.
RETURN_IF_NOT_OK(EoeReceived(worker_id)); RETURN_IF_NOT_OK(EoeReceived(worker_id));
RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id));
continue; continue;
} else if (in_buffer->eof()) { } else if (in_buffer->eof()) {
// Calling base class EofReceived to forward eof buffer. // Calling base class EofReceived to forward eof buffer.
...@@ -176,42 +181,24 @@ Status MapOp::WorkerEntry(int32_t worker_id) { ...@@ -176,42 +181,24 @@ Status MapOp::WorkerEntry(int32_t worker_id) {
break; break;
} }
// Boolean mapping, true means to keep the column.
std::vector<bool> keep_input_columns;
// Indices of the columns to process.
std::vector<size_t> to_process_indices;
// The final column mapping after performing this map
std::unordered_map<std::string, int32_t> final_col_name_id_map;
// Thread local variables to avoid lock. When in_columns_ is empty and workers will write
// the name of the first column into input_columns (thread local) instead of in_columns_ (thread global).
std::vector<std::string> input_columns = in_columns_;
std::vector<std::string> output_columns = out_columns_;
// Initialize the above data structures
RETURN_IF_NOT_OK(WorkerEntryInit(in_buffer.get(), &keep_input_columns, &to_process_indices, &final_col_name_id_map,
&input_columns, &output_columns));
std::unique_ptr<TensorQTable> new_tensor_table(std::make_unique<TensorQTable>()); std::unique_ptr<TensorQTable> new_tensor_table(std::make_unique<TensorQTable>());
// Perform the compute function of TensorOp(s) and store the result in new_tensor_table. // Perform the compute function of TensorOp(s) and store the result in new_tensor_table.
RETURN_IF_NOT_OK(WorkerCompute(in_buffer.get(), to_process_indices, new_tensor_table.get(), keep_input_columns, RETURN_IF_NOT_OK(WorkerCompute(in_buffer.get(), new_tensor_table.get()));
&input_columns, &output_columns));
// Update column name to index mapping because tensorOp might add/remove column.
in_buffer->set_column_name_map(final_col_name_id_map);
// Replace the TensorTable in DataBuffer with the new one. // Replace the TensorTable in DataBuffer with the new one.
in_buffer->set_tensor_table(std::move(new_tensor_table)); in_buffer->set_tensor_table(std::move(new_tensor_table));
// Push the buffer onto the connector for next operator to consume. // Push the buffer onto the connector for next operator to consume.
RETURN_IF_NOT_OK(out_connector_->Add(static_cast<int>(worker_id), std::move(in_buffer))); RETURN_IF_NOT_OK(out_connector_->Add(static_cast<int>(worker_id), std::move(in_buffer)));
// Fetch the next buffer and loop back to the top.
RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id));
} }
return Status::OK(); return Status::OK();
} }
Status MapOp::WorkerCompute(DataBuffer *in_buffer, const std::vector<size_t> &to_process_indices, Status MapOp::WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table) {
TensorQTable *new_tensor_table, const std::vector<bool> &keep_input_columns,
std::vector<std::string> *input_columns, std::vector<std::string> *output_columns) {
// Getting number of rows and cols in this buffer. // Getting number of rows and cols in this buffer.
int32_t num_rows = in_buffer->NumRows(); int32_t num_rows = in_buffer->NumRows();
int32_t num_cols = in_buffer->NumCols(); int32_t num_cols = in_buffer->NumCols();
...@@ -224,7 +211,7 @@ Status MapOp::WorkerCompute(DataBuffer *in_buffer, const std::vector<size_t> &to ...@@ -224,7 +211,7 @@ Status MapOp::WorkerCompute(DataBuffer *in_buffer, const std::vector<size_t> &to
RETURN_IF_NOT_OK(in_buffer->PopRow(&cur_row)); RETURN_IF_NOT_OK(in_buffer->PopRow(&cur_row));
// Populate the Tensor from the current row to be processed by TensorOp // Populate the Tensor from the current row to be processed by TensorOp
for (const auto &idx : to_process_indices) { for (const auto &idx : to_process_indices_) {
to_process.push_back(std::move(cur_row[idx])); to_process.push_back(std::move(cur_row[idx]));
} }
...@@ -243,20 +230,20 @@ Status MapOp::WorkerCompute(DataBuffer *in_buffer, const std::vector<size_t> &to ...@@ -243,20 +230,20 @@ Status MapOp::WorkerCompute(DataBuffer *in_buffer, const std::vector<size_t> &to
} }
} }
if (output_columns->size() != result_row.size()) { if (out_columns_.size() != result_row.size()) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"Result of a tensorOp doesn't match output column names"); "Result of a tensorOp doesn't match output column names");
} }
if (input_columns->size() == output_columns->size()) { if (in_columns_.size() == out_columns_.size()) {
for (size_t i = 0; i < result_row.size(); i++) { for (size_t i = 0; i < result_row.size(); i++) {
cur_row[to_process_indices[i]] = std::move(result_row[i]); cur_row[to_process_indices_[i]] = std::move(result_row[i]);
} }
new_tensor_table->push_back(std::move(cur_row)); new_tensor_table->push_back(std::move(cur_row));
} else { } else {
// Add the columns we did not touch to the result_row. // Add the columns we did not touch to the result_row.
for (int32_t i = 0; i < num_cols; i++) { for (int32_t i = 0; i < num_cols; i++) {
if (keep_input_columns[i]) { if (keep_input_columns_[i]) {
result_row.push_back(std::move(cur_row[i])); result_row.push_back(std::move(cur_row[i]));
} }
} }
...@@ -269,92 +256,102 @@ Status MapOp::WorkerCompute(DataBuffer *in_buffer, const std::vector<size_t> &to ...@@ -269,92 +256,102 @@ Status MapOp::WorkerCompute(DataBuffer *in_buffer, const std::vector<size_t> &to
return Status::OK(); return Status::OK();
} }
// Validating if each of the input_columns exists in the DataBuffer.
Status MapOp::ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map,
std::vector<std::string> *input_columns) {
for (const auto &inCol : *input_columns) {
bool found = col_name_id_map.find(inCol) != col_name_id_map.end() ? true : false;
if (!found) {
std::string err_msg = "input column name: " + inCol + " doesn't exist in the dataset columns.";
RETURN_STATUS_UNEXPECTED(err_msg);
}
}
return Status::OK();
}
// initialize some internal data structure used by WorkerEntry() // initialize some internal data structure used by WorkerEntry()
Status MapOp::WorkerEntryInit(const DataBuffer *in_buf, std::vector<bool> *keep_input_columns, Status MapOp::WorkerEntryInit(const DataBuffer *in_buf) {
std::vector<size_t> *to_process_indices,
std::unordered_map<std::string, int32_t> *final_col_name_id_map,
std::vector<std::string> *input_columns, std::vector<std::string> *output_columns) {
int32_t num_rows = in_buf->NumRows(); int32_t num_rows = in_buf->NumRows();
int32_t num_cols = in_buf->NumCols(); int32_t num_cols = in_buf->NumCols();
if (num_rows == 0 || num_cols == 0) { if (num_rows == 0 || num_cols == 0) {
RETURN_STATUS_UNEXPECTED("MapOp is getting an empty DataBuffer."); RETURN_STATUS_UNEXPECTED("MapOp is getting an empty DataBuffer.");
} }
std::unordered_map<std::string, int32_t> col_name_id_map = in_buf->column_name_map();
// Check if there is invalid column name in the inColumns. // We can't use AssignColMapFromChild() here since we need to modify the column map. We need to be threadsafe
RETURN_IF_NOT_OK(ValidateInColumns(col_name_id_map, input_columns)); // though for saving the final map in the op, so use the lock here.
if (first_fetch_) {
// If input_columns is empty(), The col at index-0 will be picked. std::unique_lock<std::mutex> lock(column_name_map_mutex_);
if (input_columns->empty()) { // If the map has not been set up yet in the base class, then we are the first one in to set it up
for (const auto &pair : col_name_id_map) { // (and we are under protection of the mutex lock)
if (pair.second == 0) { if (column_name_id_map_.empty()) {
MS_LOG(INFO) << "Input columns in map operator is empty, will apply to the first column in the current table."; std::unordered_map<std::string, int32_t> current_name_id_map = child_[0]->column_name_id_map();
input_columns->push_back(pair.first);
break; // If input_columns is empty(), The col at index-0 will be picked.
if (in_columns_.empty()) {
for (const auto &pair : current_name_id_map) {
if (pair.second == 0) {
MS_LOG(INFO) << "Input columns empty for map op, will apply to the first column in the current table.";
in_columns_.push_back(pair.first);
break;
}
}
// If caller didn't specify the out_col_names, assume they are same as the input_columns.
// This was done in the constructor, but if input columns was empty to start we have to redo it here.
if (out_columns_.empty() || out_columns_[0].empty()) {
out_columns_ = in_columns_;
}
} }
}
// If caller didn't specify the out_col_names, assume they are same as the input_columns. // Before we continue, issue a sanity check to make sure the input columns from user and the incoming
if (output_columns->empty() || (*output_columns)[0].empty()) { // columns from child are correct
*output_columns = *input_columns; this->ValidateInColumns(current_name_id_map);
}
}
// initialize keep_input_columns, true means to keep the column. // initialize keep_input_columns, true means to keep the column.
keep_input_columns->resize(num_cols, true); keep_input_columns_.resize(num_cols, true);
for (const auto &col_name : *input_columns) { for (const auto &col_name : in_columns_) {
int32_t missed = col_name_id_map[col_name]; int32_t missed = current_name_id_map[col_name];
(*keep_input_columns)[missed] = false; keep_input_columns_[missed] = false;
} }
// initialize to_process_indices. // initialize to_process_indices.
for (const auto &col_name : *input_columns) { for (const auto &col_name : in_columns_) {
to_process_indices->push_back(col_name_id_map[col_name]); to_process_indices_.push_back(current_name_id_map[col_name]);
} }
// Create the final column name to index mapping. // Create the final column name to index mapping in the base class field
*final_col_name_id_map = CreateFinalColMap(&col_name_id_map, *keep_input_columns, input_columns, output_columns); CreateFinalColMap(&current_name_id_map);
first_fetch_ = false;
}
} // mutex lock will release here
MS_LOG(INFO) << "Column name map for map op set: " << this->ColumnNameMapAsString();
return Status::OK();
}
// Validating if each of the input_columns exists in the DataBuffer.
Status MapOp::ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map) {
for (const auto &inCol : in_columns_) {
bool found = col_name_id_map.find(inCol) != col_name_id_map.end() ? true : false;
if (!found) {
std::string err_msg = "input column name: " + inCol + " doesn't exist in the dataset columns.";
RETURN_STATUS_UNEXPECTED(err_msg);
}
}
return Status::OK(); return Status::OK();
} }
// Create the final column name to index mapping and get indices of the columns this mapop does not use. // Create the final column name to index mapping and get indices of the columns this mapop does not use.
std::unordered_map<std::string, int32_t> MapOp::CreateFinalColMap( void MapOp::CreateFinalColMap(std::unordered_map<std::string, int32_t> *col_name_id_map) {
std::unordered_map<std::string, int32_t> *col_name_id_map, const std::vector<bool> &keep_input_columns,
std::vector<std::string> *input_columns, std::vector<std::string> *output_columns) {
std::unordered_map<std::string, int32_t> final_col_name_id_map; std::unordered_map<std::string, int32_t> final_col_name_id_map;
size_t num_cols = col_name_id_map->size(); size_t num_cols = col_name_id_map->size();
std::vector<int32_t> new_ids(num_cols); std::vector<int32_t> new_ids(num_cols);
if (input_columns->size() == output_columns->size()) { if (in_columns_.size() == out_columns_.size()) {
for (size_t i = 0; i < input_columns->size(); i++) { for (size_t i = 0; i < in_columns_.size(); i++) {
int32_t loc = (*col_name_id_map)[(*input_columns)[i]]; int32_t loc = (*col_name_id_map)[in_columns_[i]];
(void)col_name_id_map->erase((*input_columns)[i]); (void)col_name_id_map->erase(in_columns_[i]);
(*col_name_id_map)[(*output_columns)[i]] = loc; (*col_name_id_map)[out_columns_[i]] = loc;
} }
return *col_name_id_map; // Set the base class final column id map result
column_name_id_map_ = *col_name_id_map;
} else { } else {
int32_t fill_idx = 0; int32_t fill_idx = 0;
// First columns of the tables are occupied by the output columns from tensorOp. // First columns of the tables are occupied by the output columns from tensorOp.
for (const auto &col_name : *output_columns) { for (const auto &col_name : out_columns_) {
final_col_name_id_map[col_name] = fill_idx++; final_col_name_id_map[col_name] = fill_idx++;
} }
// Creating new_ids mapping for the columns we keep. // Creating new_ids mapping for the columns we keep.
for (size_t i = 0; i < num_cols; i++) { for (size_t i = 0; i < num_cols; i++) {
if (keep_input_columns[i]) { if (keep_input_columns_[i]) {
new_ids[i] = fill_idx++; new_ids[i] = fill_idx++;
} }
} }
...@@ -364,12 +361,13 @@ std::unordered_map<std::string, int32_t> MapOp::CreateFinalColMap( ...@@ -364,12 +361,13 @@ std::unordered_map<std::string, int32_t> MapOp::CreateFinalColMap(
for (const auto &pair : *col_name_id_map) { for (const auto &pair : *col_name_id_map) {
name = pair.first; name = pair.first;
int32_t old_id = pair.second; int32_t old_id = pair.second;
if (keep_input_columns[old_id]) { if (keep_input_columns_[old_id]) {
final_col_name_id_map[name] = new_ids[old_id]; final_col_name_id_map[name] = new_ids[old_id];
} }
} }
return final_col_name_id_map; // Set the base class final column id map result
column_name_id_map_ = final_col_name_id_map;
} }
} }
} // namespace dataset } // namespace dataset
......
...@@ -186,6 +186,12 @@ class MapOp : public ParallelOp { ...@@ -186,6 +186,12 @@ class MapOp : public ParallelOp {
// Variable to store the column name that the tensorOps are producing // Variable to store the column name that the tensorOps are producing
std::vector<std::string> out_columns_; std::vector<std::string> out_columns_;
// Boolean mapping, true means to keep the column.
std::vector<bool> keep_input_columns_;
// Indices of the columns to process.
std::vector<size_t> to_process_indices_;
// Performance mode is when the main thread creates local queues, pulls databuffers from the previous // Performance mode is when the main thread creates local queues, pulls databuffers from the previous
// op's Connector and distributes them to the local queues. Workers pull from the local queues. // op's Connector and distributes them to the local queues. Workers pull from the local queues.
// If this flag is false, each worker pulls directly from the Connector. This use less resources // If this flag is false, each worker pulls directly from the Connector. This use less resources
...@@ -201,51 +207,40 @@ class MapOp : public ParallelOp { ...@@ -201,51 +207,40 @@ class MapOp : public ParallelOp {
// @return Status The error code return // @return Status The error code return
Status WorkerEntry(int32_t worker_id) override; // In: workerId assigned by tree_ Status WorkerEntry(int32_t worker_id) override; // In: workerId assigned by tree_
// Private helper function for getting the next buffer
// When PerformanceMode is enabled, workers pop from the local queue.
// Otherwise, workers pop from the first child output Connector.
// @param p_buffer - the buffer to return
// @return Status return code
Status FetchNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id) {
if (perf_mode_) {
RETURN_IF_NOT_OK(local_queues_[worker_id]->PopFront(p_buffer));
} else {
RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(p_buffer, worker_id));
}
return Status::OK();
}
// Private function for worker thread to perform TensorOp's compute function and get the result. // Private function for worker thread to perform TensorOp's compute function and get the result.
// @param in_buffer A raw pointer to the DataBuffer. A raw pointer is fine because this function doesn't manage memory // @param in_buffer A raw pointer to the DataBuffer. A raw pointer is fine because this function doesn't manage memory
// and is not shared with other threads. // and is not shared with other threads.
// @param to_process_indices Indices of columns to be processed by the TensorOp.
// @param[out] new_tensor_table A new Tensor Table to be populated in this function. // @param[out] new_tensor_table A new Tensor Table to be populated in this function.
// @param keep_input_columns Keeping track of which columns to keep (not used by TensorOp). Status WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_table);
// @param input_columns The vector of input column names used in the current thread.
// @param output_columns The vector of output column names used in the current thread.
Status WorkerCompute(DataBuffer *in_buffer, const std::vector<size_t> &to_process_indices,
TensorQTable *new_tensor_table, const std::vector<bool> &keep_input_columns,
std::vector<std::string> *input_columns, std::vector<std::string> *output_columns);
// Private function for validating if each of the user specified input column names
// exist in the DataBuffer.
// @param col_name_id_map The column name to index mapping obtained from DataBuffer.
// @param input_columns The vector of input column names used in the current thread.
// @return Status The error code return
Status ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map,
std::vector<std::string> *input_columns);
// Private function that create the final column name to index mapping and // Private function that create the final column name to index mapping and
// get indices of the columns this mapop does not use. // get indices of the columns this mapop does not use.
// @param col_name_id_map The column name to index mapping obtained from DataBuffer. // @param col_name_id_map The column name to index mapping obtained from child operator
// @param keep_input_columns To mark which columns are to be kept (not used in mapOp). void CreateFinalColMap(std::unordered_map<std::string, int32_t> *col_name_id_map);
// @param input_columns The vector of input column names used in the current thread.
// @param output_columns The vector of output column names used in the current thread.
// @return finalColNameIdMap The final column name to index mapping.
std::unordered_map<std::string, int32_t> CreateFinalColMap(std::unordered_map<std::string, int32_t> *col_name_id_map,
const std::vector<bool> &keep_input_columns,
std::vector<std::string> *input_columns,
std::vector<std::string> *output_columns);
// Private function that initialize some internal data structure used by WorkerEntry() // Private function that initialize some internal data structure used by WorkerEntry()
// @param in_buf A raw pointer to the DataBuffer. A raw pointer is fine because this function does not manage memory // @param in_buf A raw pointer to the DataBuffer. A raw pointer is fine because this function does not manage memory
// and is not shared with other threads. // and is not shared with other threads.
// @param[out] keep_input_columns Keeping track of which columns to keep (not used by TensorOp) Status WorkerEntryInit(const DataBuffer *in_buf);
// @param[out] to_process_indices Indices of columns to be processed by the TensorOp
// @param[out] final_col_name_id_map Create the final column name id map. This final mapping will replace the old one // Validating if each of the input_columns exists in the DataBuffer.
// if the TensorOp Compute() is successful. // @param - the column map to check
// @param input_columns The vector of input column names used in the current thread. // @return - status return code
// @param output_columns The vector of output column names used in the current thread. Status ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map);
Status WorkerEntryInit(const DataBuffer *in_buf, std::vector<bool> *keep_input_columns,
std::vector<size_t> *to_process_indices,
std::unordered_map<std::string, int32_t> *final_col_name_id_map,
std::vector<std::string> *input_columns, std::vector<std::string> *output_columns);
}; };
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore
......
...@@ -15,17 +15,12 @@ ...@@ -15,17 +15,12 @@
*/ */
#include "dataset/engine/datasetops/parallel_op.h" #include "dataset/engine/datasetops/parallel_op.h"
#include <cstdint>
#include <iostream> #include <iostream>
#include <map>
#include <utility> #include <utility>
#include "dataset/engine/data_schema.h"
#include "dataset/engine/datasetops/dataset_op.h" #include "dataset/engine/datasetops/dataset_op.h"
#include "dataset/engine/execution_tree.h" #include "dataset/engine/execution_tree.h"
#include "dataset/core/config_manager.h" #include "dataset/core/config_manager.h"
#include "dataset/engine/db_connector.h" #include "dataset/engine/db_connector.h"
#include "dataset/engine/datasetops/source/storage_client.h"
#include "dataset/util/task_manager.h" #include "dataset/util/task_manager.h"
namespace mindspore { namespace mindspore {
......
...@@ -13,9 +13,9 @@ ...@@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
#include "dataset/engine/datasetops/pipeline_op.h"
#include <iomanip> #include <iomanip>
#include <iostream> #include <iostream>
#include "dataset/engine/datasetops/pipeline_op.h"
namespace mindspore { namespace mindspore {
namespace dataset { namespace dataset {
......
...@@ -73,35 +73,40 @@ void ProjectOp::Print(std::ostream &out, bool show_all) const { ...@@ -73,35 +73,40 @@ void ProjectOp::Print(std::ostream &out, bool show_all) const {
Status ProjectOp::GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id, bool retry_if_eoe) { Status ProjectOp::GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id, bool retry_if_eoe) {
RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(p_buffer, worker_id, retry_if_eoe)); RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(p_buffer, worker_id, retry_if_eoe));
if (!((*p_buffer)->eoe()) && !((*p_buffer)->eof())) { if (!((*p_buffer)->eoe()) && !((*p_buffer)->eof())) {
// Only for the first buffer fetched, get the column map of the incoming data and save it
// into our own column name map after making the appropriate mods
// We cannot use the super class AssignColMapFromChild here because we're making a modification of the
// map from the child map.
if (first_fetch_) {
std::unordered_map<std::string, int32_t> child_column_name_mapping = child_[0]->column_name_id_map();
for (size_t i = 0; i < columns_to_project_.size(); i++) {
std::string &current_column = columns_to_project_[i];
if (child_column_name_mapping.find(current_column) == child_column_name_mapping.end()) {
std::string err_msg = "ProjectOp: column " + current_column + " does not exist in child operator.";
RETURN_STATUS_UNEXPECTED(err_msg);
}
// Setup the new column name mapping for ourself (base class field)
column_name_id_map_[current_column] = i;
projected_column_indices_.push_back(child_column_name_mapping[current_column]);
}
first_fetch_ = false; // we only need to do this path once
}
RETURN_IF_NOT_OK(Project(p_buffer)); RETURN_IF_NOT_OK(Project(p_buffer));
} }
return Status::OK(); return Status::OK();
} }
Status ProjectOp::Project(std::unique_ptr<DataBuffer> *data_buffer) { Status ProjectOp::Project(std::unique_ptr<DataBuffer> *data_buffer) {
std::unordered_map<std::string, int32_t> column_name_mapping = (*data_buffer)->column_name_map();
std::unordered_map<std::string, int32_t> new_column_name_mapping;
std::vector<int32_t> projected_column_indices;
for (size_t i = 0; i < columns_to_project_.size(); i++) {
std::string &current_column = columns_to_project_[i];
if (column_name_mapping.find(current_column) == column_name_mapping.end()) {
std::string err_msg = "ProjectOp: column " + current_column + " does not exist in this buffer.";
RETURN_STATUS_UNEXPECTED(err_msg);
}
new_column_name_mapping[current_column] = i;
projected_column_indices.push_back(column_name_mapping[current_column]);
}
std::unique_ptr<TensorQTable> new_tensor_table = std::make_unique<TensorQTable>(); std::unique_ptr<TensorQTable> new_tensor_table = std::make_unique<TensorQTable>();
while ((*data_buffer)->NumRows() > 0) { while ((*data_buffer)->NumRows() > 0) {
TensorRow current_row; TensorRow current_row;
RETURN_IF_NOT_OK((*data_buffer)->PopRow(&current_row)); RETURN_IF_NOT_OK((*data_buffer)->PopRow(&current_row));
TensorRow new_row; TensorRow new_row;
(void)std::transform(projected_column_indices.begin(), projected_column_indices.end(), std::back_inserter(new_row), (void)std::transform(projected_column_indices_.begin(), projected_column_indices_.end(),
[&current_row](uint32_t x) { return current_row[x]; }); std::back_inserter(new_row), [&current_row](uint32_t x) { return current_row[x]; });
new_tensor_table->push_back(new_row); new_tensor_table->push_back(new_row);
} }
(*data_buffer)->set_tensor_table(std::move(new_tensor_table)); (*data_buffer)->set_tensor_table(std::move(new_tensor_table));
(*data_buffer)->set_column_name_map(new_column_name_mapping);
return Status::OK(); return Status::OK();
} }
......
...@@ -103,6 +103,7 @@ class ProjectOp : public PipelineOp { ...@@ -103,6 +103,7 @@ class ProjectOp : public PipelineOp {
private: private:
std::vector<std::string> columns_to_project_; std::vector<std::string> columns_to_project_;
std::vector<int32_t> projected_column_indices_;
Status Project(std::unique_ptr<DataBuffer> *data_buffer); Status Project(std::unique_ptr<DataBuffer> *data_buffer);
}; };
......
...@@ -67,10 +67,15 @@ Status RenameOp::operator()() { ...@@ -67,10 +67,15 @@ Status RenameOp::operator()() {
// if 1st eoe or eof, pass it on then return // if 1st eoe or eof, pass it on then return
RETURN_STATUS_UNEXPECTED(err_msg); RETURN_STATUS_UNEXPECTED(err_msg);
} }
// First, populate the column map from the input child.
// This will not be the final map for output from this op.
RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild());
// core rename functionality only needs to happen once, to identify the new column names/indexes
RETURN_IF_NOT_OK(RenameColumns());
while (curr_buffer->eof() == false) { while (curr_buffer->eof() == false) {
while (curr_buffer->eoe() == false) { while (curr_buffer->eoe() == false) {
// core rename functionality
RETURN_IF_NOT_OK(RenameBuffer(&curr_buffer));
// push the renamed input buffer // push the renamed input buffer
MS_LOG(DEBUG) << "Rename operator pushing next buffer."; MS_LOG(DEBUG) << "Rename operator pushing next buffer.";
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(curr_buffer))); RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(curr_buffer)));
...@@ -89,17 +94,16 @@ Status RenameOp::operator()() { ...@@ -89,17 +94,16 @@ Status RenameOp::operator()() {
return Status::OK(); return Status::OK();
} }
// renames buffer // renames the columns
Status RenameOp::RenameBuffer(std::unique_ptr<DataBuffer> *input_buffer) { Status RenameOp::RenameColumns() {
// iterate over my index in input vector, find the corresponding position // iterate over my index in input vector, find the corresponding position
const std::unordered_map<std::string, int32_t> col_name_id_map = (*input_buffer)->column_name_map();
std::unordered_map<std::string, int32_t> new_col_name_id_map = {}; std::unordered_map<std::string, int32_t> new_col_name_id_map = {};
// parameter for input check // parameter for input check
size_t found = 0; size_t found = 0;
// iterate over all the pairs and if there is a name match with rename, rename the column and add it to new map // iterate over all the pairs and if there is a name match with rename, rename the column and add it to new map
// by doing it this way we recreate a new ColNameIdMap and allow for switching // by doing it this way we recreate a new ColNameIdMap and allow for switching
for (const auto &pair : col_name_id_map) { for (const auto &pair : column_name_id_map_) {
std::string name = pair.first; std::string name = pair.first;
int32_t id = pair.second; int32_t id = pair.second;
// find name // find name
...@@ -126,7 +130,9 @@ Status RenameOp::RenameBuffer(std::unique_ptr<DataBuffer> *input_buffer) { ...@@ -126,7 +130,9 @@ Status RenameOp::RenameBuffer(std::unique_ptr<DataBuffer> *input_buffer) {
std::string err_msg = "Renamed column doesn't exist in dataset"; std::string err_msg = "Renamed column doesn't exist in dataset";
RETURN_STATUS_UNEXPECTED(err_msg); RETURN_STATUS_UNEXPECTED(err_msg);
} }
(*input_buffer)->set_column_name_map(new_col_name_id_map);
// Now, overwrite our column map with the new renamed columns/id's
column_name_id_map_ = new_col_name_id_map;
return Status::OK(); return Status::OK();
} }
......
...@@ -112,8 +112,7 @@ class RenameOp : public PipelineOp { ...@@ -112,8 +112,7 @@ class RenameOp : public PipelineOp {
protected: protected:
// Rename core functionality // Rename core functionality
// @param input_buffer buffer to run rename on Status RenameColumns();
Status RenameBuffer(std::unique_ptr<DataBuffer> *input_buffer);
// Variable to store the input column names // Variable to store the input column names
std::vector<std::string> in_columns_; std::vector<std::string> in_columns_;
......
...@@ -122,6 +122,8 @@ Status RepeatOp::GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t wo ...@@ -122,6 +122,8 @@ Status RepeatOp::GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t wo
if (buf->eof()) { if (buf->eof()) {
RETURN_IF_NOT_OK(EofReceived(worker_id)); RETURN_IF_NOT_OK(EofReceived(worker_id));
} }
// Update the column name map if needed
RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild());
*p_buffer = std::move(buf); *p_buffer = std::move(buf);
return Status::OK(); return Status::OK();
} }
......
...@@ -122,6 +122,7 @@ class RepeatOp : public PipelineOp { ...@@ -122,6 +122,7 @@ class RepeatOp : public PipelineOp {
int32_t max_repeats_; // The number of repeats that the user requested int32_t max_repeats_; // The number of repeats that the user requested
int32_t repeat_count_; // A counter for the current number of executed repeats int32_t repeat_count_; // A counter for the current number of executed repeats
std::vector<std::shared_ptr<DatasetOp>> eoe_ops_; // List of operators that can generate EOE underneath this repeat. std::vector<std::shared_ptr<DatasetOp>> eoe_ops_; // List of operators that can generate EOE underneath this repeat.
bool first_fetch_; // Track the first fetch from this op
}; };
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore
......
...@@ -185,7 +185,6 @@ Status ShuffleOp::operator()() { ...@@ -185,7 +185,6 @@ Status ShuffleOp::operator()() {
if (new_buffer_table->size() == rows_per_buffer_ || shuffle_last_row_idx_ == 0) { if (new_buffer_table->size() == rows_per_buffer_ || shuffle_last_row_idx_ == 0) {
auto new_buffer = std::make_unique<DataBuffer>(buffer_counter_, DataBuffer::kDeBFlagNone); auto new_buffer = std::make_unique<DataBuffer>(buffer_counter_, DataBuffer::kDeBFlagNone);
new_buffer->set_tensor_table(std::move(new_buffer_table)); new_buffer->set_tensor_table(std::move(new_buffer_table));
new_buffer->set_column_name_map(column_name_map_);
buffer_counter_++; buffer_counter_++;
MS_LOG(DEBUG) << "Shuffle operator sending a buffer to output."; MS_LOG(DEBUG) << "Shuffle operator sending a buffer to output.";
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(new_buffer))); RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(new_buffer)));
...@@ -266,8 +265,8 @@ Status ShuffleOp::InitShuffleBuffer() { ...@@ -266,8 +265,8 @@ Status ShuffleOp::InitShuffleBuffer() {
RETURN_STATUS_UNEXPECTED("Unable to fetch a single row for shuffle buffer."); RETURN_STATUS_UNEXPECTED("Unable to fetch a single row for shuffle buffer.");
} }
// Take a copy of the column name mapping. We'll use this when constructing output buffers later. // Now that a first fetch is done, assign the column map for this operator
column_name_map_ = child_iterator_->col_name_id_map(); RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild());
// Now fill the rest of the shuffle buffer until we are unable to get the next row or we reached // Now fill the rest of the shuffle buffer until we are unable to get the next row or we reached
// the desired shuffle buffer size. // the desired shuffle buffer size.
......
...@@ -185,7 +185,6 @@ class ShuffleOp : public PipelineOp { ...@@ -185,7 +185,6 @@ class ShuffleOp : public PipelineOp {
std::unique_ptr<TensorTable> shuffle_buffer_; std::unique_ptr<TensorTable> shuffle_buffer_;
int32_t shuffle_last_row_idx_; // Internal tracking of the last slot of our shuffle buffer int32_t shuffle_last_row_idx_; // Internal tracking of the last slot of our shuffle buffer
int32_t shuffle_buffer_state_; // State tracking for the shuffle buffer phases of work int32_t shuffle_buffer_state_; // State tracking for the shuffle buffer phases of work
std::unordered_map<std::string, int32_t> column_name_map_; // A mapping between column index to column name.
std::unique_ptr<ChildIterator> child_iterator_; // An iterator for fetching. std::unique_ptr<ChildIterator> child_iterator_; // An iterator for fetching.
}; };
......
...@@ -84,6 +84,10 @@ Status SkipOp::operator()() { ...@@ -84,6 +84,10 @@ Status SkipOp::operator()() {
TaskManager::FindMe()->Post(); TaskManager::FindMe()->Post();
std::unique_ptr<DataBuffer> curr_buffer; std::unique_ptr<DataBuffer> curr_buffer;
RETURN_IF_NOT_OK(GetNextInput(&curr_buffer)); RETURN_IF_NOT_OK(GetNextInput(&curr_buffer));
// After the first buffer fetch above we can do the one-time assign of the column name map
RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild());
while (curr_buffer->eof() == false) { while (curr_buffer->eof() == false) {
// Reset count // Reset count
skip_count_ = 0; skip_count_ = 0;
......
...@@ -80,8 +80,9 @@ CelebAOp::CelebAOp(int32_t num_workers, int32_t rows_per_buffer, const std::stri ...@@ -80,8 +80,9 @@ CelebAOp::CelebAOp(int32_t num_workers, int32_t rows_per_buffer, const std::stri
num_rows_exact_(0), num_rows_exact_(0),
num_samples_(num_samples), num_samples_(num_samples),
dataset_type_(dataset_type) { dataset_type_(dataset_type) {
// Set the column name map (base class field)
for (int32_t index = 0; index < data_schema_->NumColumns(); index++) { for (int32_t index = 0; index < data_schema_->NumColumns(); index++) {
col_name_map_[data_schema_->column(index).name()] = index; column_name_id_map_[data_schema_->column(index).name()] = index;
} }
attr_info_queue_ = std::make_unique<Queue<std::vector<std::string>>>(queue_size); attr_info_queue_ = std::make_unique<Queue<std::vector<std::string>>>(queue_size);
...@@ -385,7 +386,6 @@ Status CelebAOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<Da ...@@ -385,7 +386,6 @@ Status CelebAOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<Da
} }
(*db)->set_tensor_table(std::move(deq)); (*db)->set_tensor_table(std::move(deq));
(*db)->set_column_name_map(col_name_map_);
return Status::OK(); return Status::OK();
} }
......
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
#include <set> #include <set>
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <unordered_map>
#include <utility> #include <utility>
#include <fstream> #include <fstream>
...@@ -232,7 +231,6 @@ class CelebAOp : public ParallelOp, RandomAccessOp { ...@@ -232,7 +231,6 @@ class CelebAOp : public ParallelOp, RandomAccessOp {
std::set<std::string> extensions_; // extensions allowed std::set<std::string> extensions_; // extensions allowed
std::unique_ptr<DataSchema> data_schema_; std::unique_ptr<DataSchema> data_schema_;
std::shared_ptr<Sampler> sampler_; std::shared_ptr<Sampler> sampler_;
std::unordered_map<std::string, int32_t> col_name_map_;
std::unique_ptr<Queue<std::vector<std::string>>> attr_info_queue_; std::unique_ptr<Queue<std::vector<std::string>>> attr_info_queue_;
int64_t num_rows_in_attr_file_; // rows number specified in attr file int64_t num_rows_in_attr_file_; // rows number specified in attr file
int64_t num_rows_exact_; // exact rows number,maybe is less than rows_num_in_attr_file_ int64_t num_rows_exact_; // exact rows number,maybe is less than rows_num_in_attr_file_
......
...@@ -88,8 +88,9 @@ CifarOp::CifarOp(CifarType type, int32_t num_works, int32_t rows_per_buf, const ...@@ -88,8 +88,9 @@ CifarOp::CifarOp(CifarType type, int32_t num_works, int32_t rows_per_buf, const
num_rows_(0), num_rows_(0),
row_cnt_(0), row_cnt_(0),
buf_cnt_(0) { buf_cnt_(0) {
// set the column name map (base class field)
for (uint32_t i = 0; i < data_schema_->NumColumns(); ++i) { for (uint32_t i = 0; i < data_schema_->NumColumns(); ++i) {
col_name_map_[data_schema_->column(i).name()] = i; column_name_id_map_[data_schema_->column(i).name()] = i;
} }
constexpr uint64_t kUtilQueueSize = 512; constexpr uint64_t kUtilQueueSize = 512;
cifar_raw_data_block_ = std::make_unique<Queue<std::vector<unsigned char>>>(kUtilQueueSize); cifar_raw_data_block_ = std::make_unique<Queue<std::vector<unsigned char>>>(kUtilQueueSize);
...@@ -221,7 +222,6 @@ Status CifarOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<Dat ...@@ -221,7 +222,6 @@ Status CifarOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<Dat
deq->push_back(std::move(trow)); deq->push_back(std::move(trow));
} }
(*db)->set_tensor_table(std::move(deq)); (*db)->set_tensor_table(std::move(deq));
(*db)->set_column_name_map(col_name_map_);
return Status::OK(); return Status::OK();
} }
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
#include <map> #include <map>
#include <memory> #include <memory>
#include <string> #include <string>
#include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
...@@ -221,7 +220,7 @@ class CifarOp : public ParallelOp, public RandomAccessOp { ...@@ -221,7 +220,7 @@ class CifarOp : public ParallelOp, public RandomAccessOp {
Status ParseCifarData(); Status ParseCifarData();
// Method derived from RandomAccess Op, enable Sampler to get all ids for each calss // Method derived from RandomAccess Op, enable Sampler to get all ids for each calss
// @param (std::unordered_map<uint64_t, std::vector<uint64_t >> * map - key label, val all ids for this class // @param (std::map<uint64_t, std::vector<uint64_t >> * map - key label, val all ids for this class
// @return Status - The error code return // @return Status - The error code return
Status GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const override; Status GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const override;
...@@ -236,7 +235,6 @@ class CifarOp : public ParallelOp, public RandomAccessOp { ...@@ -236,7 +235,6 @@ class CifarOp : public ParallelOp, public RandomAccessOp {
int64_t buf_cnt_; int64_t buf_cnt_;
WaitPost wp_; WaitPost wp_;
std::unordered_map<std::string, int32_t> col_name_map_;
QueueList<std::unique_ptr<IOBlock>> io_block_queues_; QueueList<std::unique_ptr<IOBlock>> io_block_queues_;
std::unique_ptr<Queue<std::vector<unsigned char>>> cifar_raw_data_block_; std::unique_ptr<Queue<std::vector<unsigned char>>> cifar_raw_data_block_;
std::vector<std::string> cifar_files_; std::vector<std::string> cifar_files_;
......
...@@ -93,10 +93,10 @@ void GeneratorOp::Dealloc() noexcept { ...@@ -93,10 +93,10 @@ void GeneratorOp::Dealloc() noexcept {
Status GeneratorOp::Init() { Status GeneratorOp::Init() {
// Reset BufferID // Reset BufferID
buffer_id_ = 0; buffer_id_ = 0;
// Setup column names map. // Setup column names map (base class field)
if (column_names_map_.empty()) { if (column_name_id_map_.empty()) {
for (int i = 0; i < column_names_.size(); ++i) { for (int i = 0; i < column_names_.size(); ++i) {
(void)column_names_map_.insert(std::make_pair(column_names_[i], i)); column_name_id_map_[column_names_[i]] = i;
} }
} }
Status ret; Status ret;
...@@ -195,7 +195,6 @@ Status GeneratorOp::operator()() { ...@@ -195,7 +195,6 @@ Status GeneratorOp::operator()() {
while (!eof) { while (!eof) {
// Create new buffer each iteration // Create new buffer each iteration
fetched_buffer = std::make_unique<DataBuffer>(buffer_id_++, DataBuffer::kDeBFlagNone); fetched_buffer = std::make_unique<DataBuffer>(buffer_id_++, DataBuffer::kDeBFlagNone);
fetched_buffer->set_column_name_map(column_names_map_);
std::unique_ptr<TensorQTable> fetched_table = std::make_unique<TensorQTable>(); std::unique_ptr<TensorQTable> fetched_table = std::make_unique<TensorQTable>();
bool eoe = false; bool eoe = false;
{ {
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "dataset/core/data_type.h" #include "dataset/core/data_type.h"
...@@ -130,7 +129,6 @@ class GeneratorOp : public PipelineOp { ...@@ -130,7 +129,6 @@ class GeneratorOp : public PipelineOp {
int32_t buffer_size_; int32_t buffer_size_;
py::object generator_; py::object generator_;
std::unordered_map<std::string, int32_t> column_names_map_;
int32_t buffer_id_; int32_t buffer_id_;
WaitPost wp_; WaitPost wp_;
......
...@@ -78,8 +78,9 @@ ImageFolderOp::ImageFolderOp(int32_t num_wkrs, int32_t rows_per_buffer, std::str ...@@ -78,8 +78,9 @@ ImageFolderOp::ImageFolderOp(int32_t num_wkrs, int32_t rows_per_buffer, std::str
buf_cnt_(0), buf_cnt_(0),
sampler_ind_(0), sampler_ind_(0),
dirname_offset_(0) { dirname_offset_(0) {
// Set the column name map (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
col_name_map_[data_schema_->column(i).name()] = i; column_name_id_map_[data_schema_->column(i).name()] = i;
} }
folder_name_queue_ = std::make_unique<Queue<std::string>>(num_wkrs * queue_size); folder_name_queue_ = std::make_unique<Queue<std::string>>(num_wkrs * queue_size);
image_name_queue_ = std::make_unique<Queue<FolderImagesPair>>(num_wkrs * queue_size); image_name_queue_ = std::make_unique<Queue<FolderImagesPair>>(num_wkrs * queue_size);
...@@ -237,7 +238,6 @@ Status ImageFolderOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_p ...@@ -237,7 +238,6 @@ Status ImageFolderOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_p
deq->push_back(std::move(trow)); deq->push_back(std::move(trow));
} }
(*db)->set_tensor_table(std::move(deq)); (*db)->set_tensor_table(std::move(deq));
(*db)->set_column_name_map(col_name_map_);
return Status::OK(); return Status::OK();
} }
......
...@@ -23,7 +23,6 @@ ...@@ -23,7 +23,6 @@
#include <algorithm> #include <algorithm>
#include <map> #include <map>
#include <set> #include <set>
#include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "dataset/core/tensor.h" #include "dataset/core/tensor.h"
...@@ -210,7 +209,7 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp { ...@@ -210,7 +209,7 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp {
Status GetNumRowsInDataset(int64_t *num) const override; Status GetNumRowsInDataset(int64_t *num) const override;
// Method derived from RandomAccess Op, enable Sampler to get all ids for each class // Method derived from RandomAccess Op, enable Sampler to get all ids for each class
// @param (std::unordered_map<int64_t, std::vector<int64_t >> * map - key label, val all ids for this class // @param (std::map<int64_t, std::vector<int64_t >> * map - key label, val all ids for this class
// @return Status - The error code return // @return Status - The error code return
Status GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const override; Status GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const override;
...@@ -275,7 +274,6 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp { ...@@ -275,7 +274,6 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp {
int64_t dirname_offset_; int64_t dirname_offset_;
WaitPost wp_; WaitPost wp_;
std::vector<ImageLabelPair> image_label_pairs_; std::vector<ImageLabelPair> image_label_pairs_;
std::unordered_map<std::string, int32_t> col_name_map_;
QueueList<std::unique_ptr<IOBlock>> io_block_queues_; // queues of IOBlocks QueueList<std::unique_ptr<IOBlock>> io_block_queues_; // queues of IOBlocks
std::unique_ptr<Queue<std::string>> folder_name_queue_; std::unique_ptr<Queue<std::string>> folder_name_queue_;
std::unique_ptr<Queue<FolderImagesPair>> image_name_queue_; std::unique_ptr<Queue<FolderImagesPair>> image_name_queue_;
......
...@@ -76,8 +76,9 @@ ManifestOp::ManifestOp(int32_t num_works, int32_t rows_per_buffer, std::string f ...@@ -76,8 +76,9 @@ ManifestOp::ManifestOp(int32_t num_works, int32_t rows_per_buffer, std::string f
decode_(decode), decode_(decode),
usage_(usage), usage_(usage),
buf_cnt_(0) { buf_cnt_(0) {
// Set the column name map (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
col_name_map_[data_schema_->column(i).name()] = i; column_name_id_map_[data_schema_->column(i).name()] = i;
} }
io_block_queues_.Init(num_workers_, queue_size); io_block_queues_.Init(num_workers_, queue_size);
(void)std::transform(usage_.begin(), usage_.end(), usage_.begin(), ::tolower); (void)std::transform(usage_.begin(), usage_.end(), usage_.begin(), ::tolower);
...@@ -235,7 +236,6 @@ Status ManifestOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr< ...@@ -235,7 +236,6 @@ Status ManifestOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<
deq->push_back(std::move(trow)); deq->push_back(std::move(trow));
} }
(*db)->set_tensor_table(std::move(deq)); (*db)->set_tensor_table(std::move(deq));
(*db)->set_column_name_map(col_name_map_);
return Status::OK(); return Status::OK();
} }
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
#include <map> #include <map>
#include <memory> #include <memory>
#include <string> #include <string>
#include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
...@@ -176,7 +175,7 @@ class ManifestOp : public ParallelOp, public RandomAccessOp { ...@@ -176,7 +175,7 @@ class ManifestOp : public ParallelOp, public RandomAccessOp {
Status GetNumRowsInDataset(int64_t *num) const override; Status GetNumRowsInDataset(int64_t *num) const override;
// Method derived from RandomAccess Op, enable Sampler to get all ids for each class // Method derived from RandomAccess Op, enable Sampler to get all ids for each class
// @param (std::unordered_map<int64_t, std::vector<int64_t >> * map - key label, val all ids for this class // @param (std::map<int64_t, std::vector<int64_t >> * map - key label, val all ids for this class
// @return Status - The error code return // @return Status - The error code return
Status GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const override; Status GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const override;
...@@ -248,7 +247,6 @@ class ManifestOp : public ParallelOp, public RandomAccessOp { ...@@ -248,7 +247,6 @@ class ManifestOp : public ParallelOp, public RandomAccessOp {
int64_t buf_cnt_; int64_t buf_cnt_;
WaitPost wp_; WaitPost wp_;
std::unordered_map<std::string, int32_t> col_name_map_;
QueueList<std::unique_ptr<IOBlock>> io_block_queues_; QueueList<std::unique_ptr<IOBlock>> io_block_queues_;
std::map<std::string, int32_t> label_index_; std::map<std::string, int32_t> label_index_;
std::vector<std::pair<std::string, std::vector<std::string>>> image_labelname_; std::vector<std::pair<std::string, std::vector<std::string>>> image_labelname_;
......
...@@ -152,7 +152,7 @@ Status MindRecordOp::Init() { ...@@ -152,7 +152,7 @@ Status MindRecordOp::Init() {
} }
for (int i = 0; i < static_cast<int>(columns_to_load_.size()); i++) { for (int i = 0; i < static_cast<int>(columns_to_load_.size()); i++) {
column_name_mapping_[columns_to_load_[i]] = i; column_name_id_map_[columns_to_load_[i]] = i;
} }
num_rows_ = shard_reader_->GetNumRows(); num_rows_ = shard_reader_->GetNumRows();
...@@ -180,7 +180,7 @@ Status MindRecordOp::SetColumnsBlob() { ...@@ -180,7 +180,7 @@ Status MindRecordOp::SetColumnsBlob() {
columns_blob_index_ = std::vector<int32_t>(columns_to_load_.size(), -1); columns_blob_index_ = std::vector<int32_t>(columns_to_load_.size(), -1);
int32_t iBlob = 0; int32_t iBlob = 0;
for (auto &blob_exact : columns_blob_exact) { for (auto &blob_exact : columns_blob_exact) {
columns_blob_index_[column_name_mapping_[blob_exact]] = iBlob++; columns_blob_index_[column_name_id_map_[blob_exact]] = iBlob++;
} }
return Status::OK(); return Status::OK();
} }
...@@ -501,7 +501,6 @@ Status MindRecordOp::WorkerEntry(int32_t worker_id) { ...@@ -501,7 +501,6 @@ Status MindRecordOp::WorkerEntry(int32_t worker_id) {
Status MindRecordOp::GetBufferFromReader(std::unique_ptr<DataBuffer> *fetched_buffer, int64_t buffer_id, Status MindRecordOp::GetBufferFromReader(std::unique_ptr<DataBuffer> *fetched_buffer, int64_t buffer_id,
int32_t worker_id) { int32_t worker_id) {
*fetched_buffer = std::make_unique<DataBuffer>(buffer_id, DataBuffer::kDeBFlagNone); *fetched_buffer = std::make_unique<DataBuffer>(buffer_id, DataBuffer::kDeBFlagNone);
(*fetched_buffer)->set_column_name_map(column_name_mapping_);
std::unique_ptr<TensorQTable> tensor_table = std::make_unique<TensorQTable>(); std::unique_ptr<TensorQTable> tensor_table = std::make_unique<TensorQTable>();
for (int32_t i = 0; i < rows_per_buffer_; ++i) { for (int32_t i = 0; i < rows_per_buffer_; ++i) {
ShardTuple tupled_buffer; ShardTuple tupled_buffer;
......
...@@ -23,7 +23,6 @@ ...@@ -23,7 +23,6 @@
#include <queue> #include <queue>
#include <string> #include <string>
#include <tuple> #include <tuple>
#include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
...@@ -262,7 +261,6 @@ class MindRecordOp : public ParallelOp { ...@@ -262,7 +261,6 @@ class MindRecordOp : public ParallelOp {
std::vector<std::string> columns_blob_; // Blob Columns to load from dataset std::vector<std::string> columns_blob_; // Blob Columns to load from dataset
std::vector<int32_t> columns_blob_index_; // Blob Columns to load from dataset std::vector<int32_t> columns_blob_index_; // Blob Columns to load from dataset
std::unordered_map<std::string, int32_t> column_name_mapping_;
std::unique_ptr<ShardReader> shard_reader_; std::unique_ptr<ShardReader> shard_reader_;
WaitPost shard_reader_wait_post_; WaitPost shard_reader_wait_post_;
QueueList<std::unique_ptr<IOBlock>> io_blk_queues_; QueueList<std::unique_ptr<IOBlock>> io_blk_queues_;
......
...@@ -75,8 +75,9 @@ MnistOp::MnistOp(int32_t num_workers, int32_t rows_per_buffer, std::string folde ...@@ -75,8 +75,9 @@ MnistOp::MnistOp(int32_t num_workers, int32_t rows_per_buffer, std::string folde
rows_per_buffer_(rows_per_buffer), rows_per_buffer_(rows_per_buffer),
sampler_(std::move(sampler)), sampler_(std::move(sampler)),
data_schema_(std::move(data_schema)) { data_schema_(std::move(data_schema)) {
// set the column name map (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
col_name_map_[data_schema_->column(i).name()] = i; column_name_id_map_[data_schema_->column(i).name()] = i;
} }
io_block_queues_.Init(num_workers, queue_size); io_block_queues_.Init(num_workers, queue_size);
} }
...@@ -185,7 +186,6 @@ Status MnistOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<Dat ...@@ -185,7 +186,6 @@ Status MnistOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<Dat
deq->push_back(std::move(trow)); deq->push_back(std::move(trow));
} }
(*db)->set_tensor_table(std::move(deq)); (*db)->set_tensor_table(std::move(deq));
(*db)->set_column_name_map(col_name_map_);
return Status::OK(); return Status::OK();
} }
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
#include <string> #include <string>
#include <algorithm> #include <algorithm>
#include <map> #include <map>
#include <unordered_map>
#include <vector> #include <vector>
#include <utility> #include <utility>
...@@ -158,7 +157,7 @@ class MnistOp : public ParallelOp, public RandomAccessOp { ...@@ -158,7 +157,7 @@ class MnistOp : public ParallelOp, public RandomAccessOp {
Status GetNumRowsInDataset(int64_t *num) const override; Status GetNumRowsInDataset(int64_t *num) const override;
// Method derived from RandomAccess Op, enable Sampler to get all ids for each class // Method derived from RandomAccess Op, enable Sampler to get all ids for each class
// @param (std::unordered_map<uint64_t, std::vector<uint64_t >> * map - key label, val all ids for this class // @param (std::map<uint64_t, std::vector<uint64_t >> * map - key label, val all ids for this class
// @return Status - The error code return // @return Status - The error code return
Status GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const override; Status GetClassIds(std::map<int32_t, std::vector<int64_t>> *cls_ids) const override;
...@@ -253,7 +252,6 @@ class MnistOp : public ParallelOp, public RandomAccessOp { ...@@ -253,7 +252,6 @@ class MnistOp : public ParallelOp, public RandomAccessOp {
std::shared_ptr<Sampler> sampler_; std::shared_ptr<Sampler> sampler_;
std::unique_ptr<DataSchema> data_schema_; std::unique_ptr<DataSchema> data_schema_;
std::vector<MnistLabelPair> image_label_pairs_; std::vector<MnistLabelPair> image_label_pairs_;
std::unordered_map<std::string, int32_t> col_name_map_;
std::vector<std::string> image_names_; std::vector<std::string> image_names_;
std::vector<std::string> label_names_; std::vector<std::string> label_names_;
QueueList<std::unique_ptr<IOBlock>> io_block_queues_; QueueList<std::unique_ptr<IOBlock>> io_block_queues_;
......
...@@ -54,8 +54,7 @@ Status RandomDataOp::Builder::Build(std::shared_ptr<RandomDataOp> *out_op) { ...@@ -54,8 +54,7 @@ Status RandomDataOp::Builder::Build(std::shared_ptr<RandomDataOp> *out_op) {
} }
// Extract the column name mapping from the schema and save it in the class. // Extract the column name mapping from the schema and save it in the class.
// This will be needed when constructing buffers. RETURN_IF_NOT_OK((*out_op)->data_schema_->GetColumnNameMap(&((*out_op)->column_name_id_map_)));
RETURN_IF_NOT_OK((*out_op)->data_schema_->GetColumnNameMap(&((*out_op)->column_name_map_)));
return Status::OK(); return Status::OK();
} }
...@@ -320,7 +319,6 @@ Status RandomDataOp::WorkerEntry(int32_t worker_id) { ...@@ -320,7 +319,6 @@ Status RandomDataOp::WorkerEntry(int32_t worker_id) {
Status RandomDataOp::PackAndSend(int32_t worker_id, std::unique_ptr<TensorQTable> in_table) { Status RandomDataOp::PackAndSend(int32_t worker_id, std::unique_ptr<TensorQTable> in_table) {
auto new_buffer = std::make_unique<DataBuffer>(GetNextBufferId(), DataBuffer::kDeBFlagNone); auto new_buffer = std::make_unique<DataBuffer>(GetNextBufferId(), DataBuffer::kDeBFlagNone);
new_buffer->set_tensor_table(std::move(in_table)); new_buffer->set_tensor_table(std::move(in_table));
new_buffer->set_column_name_map(column_name_map_);
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(new_buffer))); RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(new_buffer)));
return Status::OK(); return Status::OK();
} }
......
...@@ -17,13 +17,11 @@ ...@@ -17,13 +17,11 @@
#define DATASET_ENGINE_DATASETOPS_SOURCE_RANDOM_DATA_OP_ #define DATASET_ENGINE_DATASETOPS_SOURCE_RANDOM_DATA_OP_
#include <atomic> #include <atomic>
#include <map>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <random> #include <random>
#include <string> #include <string>
#include <vector> #include <vector>
#include <unordered_map>
#include <utility> #include <utility>
#include "dataset/util/status.h" #include "dataset/util/status.h"
#include "dataset/core/tensor.h" #include "dataset/core/tensor.h"
...@@ -259,7 +257,6 @@ class RandomDataOp : public ParallelOp { ...@@ -259,7 +257,6 @@ class RandomDataOp : public ParallelOp {
std::unique_ptr<DataSchema> data_schema_; std::unique_ptr<DataSchema> data_schema_;
std::vector<int64_t> worker_max_rows_; std::vector<int64_t> worker_max_rows_;
std::vector<int64_t> worker_rows_packed_; std::vector<int64_t> worker_rows_packed_;
std::unordered_map<std::string, int32_t> column_name_map_;
std::mt19937 rand_gen_; std::mt19937 rand_gen_;
WaitPost epoch_sync_wait_post_; WaitPost epoch_sync_wait_post_;
WaitPost all_out_; WaitPost all_out_;
......
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
#include <memory> #include <memory>
#include <random> #include <random>
#include <vector> #include <vector>
#include <unordered_map>
#include "dataset/core/tensor.h" #include "dataset/core/tensor.h"
#include "dataset/engine/data_buffer.h" #include "dataset/engine/data_buffer.h"
...@@ -53,7 +52,7 @@ class RandomAccessOp { ...@@ -53,7 +52,7 @@ class RandomAccessOp {
} }
// sampler gets label , imageIds from storageOp, this function is unique to PK // sampler gets label , imageIds from storageOp, this function is unique to PK
// @param std::unordered_map<int64_t, std::vector<int64_t>> * map // @param std::map<int64_t, std::vector<int64_t>> * map
// @return - The error code return // @return - The error code return
virtual Status GetClassIds(std::map<int32_t, std::vector<int64_t>> *map) const { virtual Status GetClassIds(std::map<int32_t, std::vector<int64_t>> *map) const {
RETURN_STATUS_UNEXPECTED("GetClassIds needs to be override to support PK"); RETURN_STATUS_UNEXPECTED("GetClassIds needs to be override to support PK");
......
...@@ -247,11 +247,17 @@ Status StorageOp::InitOp(const std::vector<std::string> &files_list, const std:: ...@@ -247,11 +247,17 @@ Status StorageOp::InitOp(const std::vector<std::string> &files_list, const std::
Status StorageOp::init() { Status StorageOp::init() {
// First a sanity check to make sure the StorageClient initialization has done the proper // First a sanity check to make sure the StorageClient initialization has done the proper
// handshake and initialized both the schema and the number of rows for the dataset. // handshake and initialized both the schema and the number of rows for the dataset.
if (store_client_->schema()->NumColumns() == 0 || num_rows_ == 0) { const DataSchema *the_schema = store_client_->schema();
if (the_schema->NumColumns() == 0 || num_rows_ == 0) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__,
"Storage client did not run handshake to init schema and number of rows."); "Storage client did not run handshake to init schema and number of rows.");
} }
// Now that we have schema, generate the column name map (base class field)
for (int32_t i = 0; i < the_schema->NumColumns(); ++i) {
column_name_id_map_[the_schema->column(i).name()] = i;
}
// If the data buffer vector is not empty, then we may be redoing a scan again after a repeat. // If the data buffer vector is not empty, then we may be redoing a scan again after a repeat.
// In such a case, we have vector of nullptrs that used to hold the buffers. get rid of this // In such a case, we have vector of nullptrs that used to hold the buffers. get rid of this
// so we can reuse the vector. // so we can reuse the vector.
......
...@@ -121,8 +121,9 @@ Status TextFileOp::Init() { ...@@ -121,8 +121,9 @@ Status TextFileOp::Init() {
int32_t safe_queue_size = static_cast<int32_t>(std::ceil(text_files_list_.size() / num_workers_) + 1); int32_t safe_queue_size = static_cast<int32_t>(std::ceil(text_files_list_.size() / num_workers_) + 1);
io_block_queues_.Init(num_workers_, safe_queue_size); io_block_queues_.Init(num_workers_, safe_queue_size);
// Set the column name mapping (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
col_name_map_[data_schema_->column(i).name()] = i; column_name_id_map_[data_schema_->column(i).name()] = i;
} }
RETURN_IF_NOT_OK(ParallelOp::CreateWorkerConnector(worker_connector_size_)); RETURN_IF_NOT_OK(ParallelOp::CreateWorkerConnector(worker_connector_size_));
...@@ -164,7 +165,6 @@ Status TextFileOp::LoadFile(const std::string &file, const int64_t start_offset, ...@@ -164,7 +165,6 @@ Status TextFileOp::LoadFile(const std::string &file, const int64_t start_offset,
int64_t rows_total = 0; int64_t rows_total = 0;
std::string line; std::string line;
std::unique_ptr<DataBuffer> cur_buffer = std::make_unique<DataBuffer>(0, DataBuffer::BufferFlags::kDeBFlagNone); std::unique_ptr<DataBuffer> cur_buffer = std::make_unique<DataBuffer>(0, DataBuffer::BufferFlags::kDeBFlagNone);
cur_buffer->set_column_name_map(col_name_map_);
std::unique_ptr<TensorQTable> tensor_table = std::make_unique<TensorQTable>(); std::unique_ptr<TensorQTable> tensor_table = std::make_unique<TensorQTable>();
while (getline(handle, line)) { while (getline(handle, line)) {
...@@ -189,7 +189,6 @@ Status TextFileOp::LoadFile(const std::string &file, const int64_t start_offset, ...@@ -189,7 +189,6 @@ Status TextFileOp::LoadFile(const std::string &file, const int64_t start_offset,
RETURN_IF_NOT_OK(jagged_buffer_connector_->Add(worker_id, std::move(cur_buffer))); RETURN_IF_NOT_OK(jagged_buffer_connector_->Add(worker_id, std::move(cur_buffer)));
cur_buffer = std::make_unique<DataBuffer>(0, DataBuffer::BufferFlags::kDeBFlagNone); cur_buffer = std::make_unique<DataBuffer>(0, DataBuffer::BufferFlags::kDeBFlagNone);
cur_buffer->set_column_name_map(col_name_map_);
tensor_table = std::make_unique<TensorQTable>(); tensor_table = std::make_unique<TensorQTable>();
rows_each_buffer = 0; rows_each_buffer = 0;
} }
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
#include <map> #include <map>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <unordered_map>
#include <vector> #include <vector>
#include "dataset/util/status.h" #include "dataset/util/status.h"
...@@ -260,7 +259,6 @@ class TextFileOp : public ParallelOp { ...@@ -260,7 +259,6 @@ class TextFileOp : public ParallelOp {
bool finished_reading_dataset_; bool finished_reading_dataset_;
bool load_io_block_queue_; bool load_io_block_queue_;
bool load_jagged_connector_; bool load_jagged_connector_;
std::unordered_map<std::string, int32_t> col_name_map_;
std::unique_ptr<JaggedConnector> jagged_buffer_connector_; std::unique_ptr<JaggedConnector> jagged_buffer_connector_;
}; };
} // namespace dataset } // namespace dataset
......
...@@ -35,13 +35,7 @@ TFBuffer::TFBuffer( ...@@ -35,13 +35,7 @@ TFBuffer::TFBuffer(
uint32_t id, // In: The id for this buffer uint32_t id, // In: The id for this buffer
BufferFlags flags, // In: The flags for this buffer BufferFlags flags, // In: The flags for this buffer
const std::shared_ptr<StorageClient> &storage_client) // In: Storage client that is related to this buffer type const std::shared_ptr<StorageClient> &storage_client) // In: Storage client that is related to this buffer type
: DataBuffer(id, flags), storage_client_(storage_client) { : DataBuffer(id, flags), storage_client_(storage_client) {}
// Initializing mColumnNameMap from the schema file
const DataSchema *the_schema = storage_client_->schema();
for (int32_t i = 0; i < the_schema->NumColumns(); ++i) {
column_name_map_[the_schema->column(i).name()] = i;
}
}
// destructor // destructor
TFBuffer::~TFBuffer() {} TFBuffer::~TFBuffer() {}
......
...@@ -191,6 +191,11 @@ Status TFReaderOp::Init() { ...@@ -191,6 +191,11 @@ Status TFReaderOp::Init() {
RETURN_IF_NOT_OK(CreateSchema(dataset_files_list_[0], columns_to_load_)); RETURN_IF_NOT_OK(CreateSchema(dataset_files_list_[0], columns_to_load_));
} }
// Construct the column name map for this operator (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_id_map_[data_schema_->column(i).name()] = i;
}
if (total_rows_ == 0) { if (total_rows_ == 0) {
total_rows_ = data_schema_->num_rows(); total_rows_ = data_schema_->num_rows();
} }
...@@ -571,11 +576,6 @@ Status TFReaderOp::LoadFile(const std::string &filename, const int64_t start_off ...@@ -571,11 +576,6 @@ Status TFReaderOp::LoadFile(const std::string &filename, const int64_t start_off
int64_t rows_read = 0; int64_t rows_read = 0;
int64_t rows_total = 0; int64_t rows_total = 0;
std::unique_ptr<DataBuffer> current_buffer = std::make_unique<DataBuffer>(0, DataBuffer::BufferFlags::kDeBFlagNone); std::unique_ptr<DataBuffer> current_buffer = std::make_unique<DataBuffer>(0, DataBuffer::BufferFlags::kDeBFlagNone);
std::unordered_map<std::string, int32_t> column_name_map;
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
column_name_map[data_schema_->column(i).name()] = i;
}
current_buffer->set_column_name_map(column_name_map);
std::unique_ptr<TensorQTable> new_tensor_table = std::make_unique<TensorQTable>(); std::unique_ptr<TensorQTable> new_tensor_table = std::make_unique<TensorQTable>();
while (reader.peek() != EOF) { while (reader.peek() != EOF) {
...@@ -613,7 +613,6 @@ Status TFReaderOp::LoadFile(const std::string &filename, const int64_t start_off ...@@ -613,7 +613,6 @@ Status TFReaderOp::LoadFile(const std::string &filename, const int64_t start_off
RETURN_IF_NOT_OK(jagged_buffer_connector_->Add(worker_id, std::move(current_buffer))); RETURN_IF_NOT_OK(jagged_buffer_connector_->Add(worker_id, std::move(current_buffer)));
current_buffer = std::make_unique<DataBuffer>(0, DataBuffer::BufferFlags::kDeBFlagNone); current_buffer = std::make_unique<DataBuffer>(0, DataBuffer::BufferFlags::kDeBFlagNone);
current_buffer->set_column_name_map(column_name_map);
new_tensor_table = std::make_unique<TensorQTable>(); new_tensor_table = std::make_unique<TensorQTable>();
rows_read = 0; rows_read = 0;
} }
......
...@@ -71,8 +71,9 @@ VOCOp::VOCOp(int32_t num_workers, int32_t rows_per_buffer, const std::string &fo ...@@ -71,8 +71,9 @@ VOCOp::VOCOp(int32_t num_workers, int32_t rows_per_buffer, const std::string &fo
rows_per_buffer_(rows_per_buffer), rows_per_buffer_(rows_per_buffer),
sampler_(std::move(sampler)), sampler_(std::move(sampler)),
data_schema_(std::move(data_schema)) { data_schema_(std::move(data_schema)) {
// Set the column name map (base class field)
for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) { for (int32_t i = 0; i < data_schema_->NumColumns(); ++i) {
col_name_map_[data_schema_->column(i).name()] = i; column_name_id_map_[data_schema_->column(i).name()] = i;
} }
io_block_queues_.Init(num_workers_, queue_size); io_block_queues_.Init(num_workers_, queue_size);
} }
...@@ -183,7 +184,6 @@ Status VOCOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataB ...@@ -183,7 +184,6 @@ Status VOCOp::LoadBuffer(const std::vector<int64_t> &keys, std::unique_ptr<DataB
deq->push_back(std::move(trow)); deq->push_back(std::move(trow));
} }
(*db)->set_tensor_table(std::move(deq)); (*db)->set_tensor_table(std::move(deq));
(*db)->set_column_name_map(col_name_map_);
return Status::OK(); return Status::OK();
} }
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
...@@ -212,7 +211,6 @@ class VOCOp : public ParallelOp, public RandomAccessOp { ...@@ -212,7 +211,6 @@ class VOCOp : public ParallelOp, public RandomAccessOp {
WaitPost wp_; WaitPost wp_;
std::vector<std::string> image_ids_; std::vector<std::string> image_ids_;
std::unordered_map<std::string, int32_t> col_name_map_;
QueueList<std::unique_ptr<IOBlock>> io_block_queues_; QueueList<std::unique_ptr<IOBlock>> io_block_queues_;
}; };
} // namespace dataset } // namespace dataset
......
...@@ -72,6 +72,7 @@ Status TakeOp::operator()() { ...@@ -72,6 +72,7 @@ Status TakeOp::operator()() {
TaskManager::FindMe()->Post(); TaskManager::FindMe()->Post();
std::unique_ptr<DataBuffer> buf; std::unique_ptr<DataBuffer> buf;
RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buf)); RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buf));
RETURN_IF_NOT_OK(DatasetOp::AssignColMapFromChild());
while (buf->eof() == false) { while (buf->eof() == false) {
if (take_count_ == max_takes_) { if (take_count_ == max_takes_) {
......
...@@ -92,9 +92,8 @@ Status ZipOp::operator()() { ...@@ -92,9 +92,8 @@ Status ZipOp::operator()() {
if (!curr_table->empty()) { if (!curr_table->empty()) {
std::unique_ptr<DataBuffer> curr_buffer = std::make_unique<DataBuffer>(buffer_id_, DataBuffer::kDeBFlagNone); std::unique_ptr<DataBuffer> curr_buffer = std::make_unique<DataBuffer>(buffer_id_, DataBuffer::kDeBFlagNone);
curr_buffer->set_tensor_table(std::move(curr_table)); curr_buffer->set_tensor_table(std::move(curr_table));
curr_buffer->set_column_name_map(col_name_id_map_);
MS_LOG(DEBUG) << "Zip operator finished one buffer, pushing, rows " << curr_buffer->NumRows() << ", cols " MS_LOG(DEBUG) << "Zip operator finished one buffer, pushing, rows " << curr_buffer->NumRows() << ", cols "
<< curr_buffer->NumCols() << ", map " << col_name_id_map_.size() << "."; << curr_buffer->NumCols() << ", map " << column_name_id_map_.size() << ".";
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(curr_buffer))); RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(curr_buffer)));
buffer_id_++; buffer_id_++;
} }
...@@ -141,20 +140,20 @@ Status ZipOp::prepare(TensorQTable *const table) { ...@@ -141,20 +140,20 @@ Status ZipOp::prepare(TensorQTable *const table) {
// At this point we have at least 1 row produced, so all child iterators have their column names such that we // At this point we have at least 1 row produced, so all child iterators have their column names such that we
// can produce our column name map now. // can produce our column name map now.
col_name_id_map_ = {}; column_name_id_map_ = {};
for (int32_t i = 0; i < children_num_; ++i) { for (int32_t i = 0; i < children_num_; ++i) {
// Initializing col_name_id_map_ from the first data buffer. // Initializing col_name_id_map_ from the first data buffer.
const std::unordered_map<std::string, int32_t> col_name_id_map = child_iterators_[i]->col_name_id_map(); const std::unordered_map<std::string, int32_t> col_name_id_map = child_iterators_[i]->GetColumnNameMap();
int32_t colsCurrent = col_name_id_map_.size(); int32_t colsCurrent = column_name_id_map_.size();
// the update code below shouldn't do anything bad if the column name already exists. // the update code below shouldn't do anything bad if the column name already exists.
for (const auto &pair : col_name_id_map) { for (const auto &pair : col_name_id_map) {
std::string name = pair.first; std::string name = pair.first;
int32_t old_id = pair.second; int32_t old_id = pair.second;
// check if name already exists in column name descriptor // check if name already exists in column name descriptor
if (col_name_id_map_.count(name) == 1) { if (column_name_id_map_.count(name) == 1) {
RETURN_STATUS_UNEXPECTED("key already exists when zipping datasets"); RETURN_STATUS_UNEXPECTED("key already exists when zipping datasets");
} }
col_name_id_map_[name] = old_id + colsCurrent; column_name_id_map_[name] = old_id + colsCurrent;
} }
} }
return Status::OK(); return Status::OK();
......
...@@ -136,7 +136,6 @@ class ZipOp : public PipelineOp { ...@@ -136,7 +136,6 @@ class ZipOp : public PipelineOp {
int32_t buffer_id_; int32_t buffer_id_;
bool draining_; bool draining_;
bool eof_; bool eof_;
std::unordered_map<std::string, int32_t> col_name_id_map_;
std::vector<std::unique_ptr<ChildIterator>> child_iterators_; std::vector<std::unique_ptr<ChildIterator>> child_iterators_;
}; };
} // namespace dataset } // namespace dataset
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册