diff --git a/.travis.yml b/.travis.yml index f9b4a7e08315a42a61a58d6c61c45771df962c4d..87cef10b2b1aeecb4f4d490e3e034ccddedbaed7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,4 @@ +group: deprecated-2017Q2 language: cpp cache: directories: diff --git a/paddle/gserver/gradientmachines/NeuralNetwork.cpp b/paddle/gserver/gradientmachines/NeuralNetwork.cpp index 4512aacc81f86bf87fc9ea30adcf081327663f16..514c0759e174f354c1eb2f761f9bfd00e18ee8de 100644 --- a/paddle/gserver/gradientmachines/NeuralNetwork.cpp +++ b/paddle/gserver/gradientmachines/NeuralNetwork.cpp @@ -241,11 +241,14 @@ void NeuralNetwork::forward(const std::vector& inArgs, dataLayers_[i]->setData(inArgs[i]); } + gLayerStackTrace.set_stage(true); + { for (auto& layer : layers_) { REGISTER_TIMER_INFO("ForwardTimer", layer->getName().c_str()); gLayerStackTrace.push(layer->getName()); layer->forward(passType); + gLayerStackTrace.pop(layer->getName()); } } @@ -254,9 +257,6 @@ void NeuralNetwork::forward(const std::vector& inArgs, for (auto& layer : outputLayers_) { outArgs->push_back(layer->getOutput()); } - if (passType == PASS_TEST) { - gLayerStackTrace.clear(); - } } void NeuralNetwork::resetState() { @@ -283,9 +283,10 @@ void NeuralNetwork::getState(MachineState& machineState) { } void NeuralNetwork::backward(const UpdateCallback& callback) { - gLayerStackTrace.pop(""); // tell layer trace is during backward. + gLayerStackTrace.set_stage(false); FOR_EACH_R(layer, layers_) { REGISTER_TIMER_INFO("BackwardTimer", (*layer)->getName().c_str()); + gLayerStackTrace.push((*layer)->getName()); if ((*layer)->needGradient()) { (*layer)->backward(callback); } @@ -320,7 +321,7 @@ public: } } - virtual void eval(const NeuralNetwork& nn) { + virtual void eval(const NeuralNetwork& nn) override { for (auto& evaluator : evaluators_) { evaluator->eval(nn); } @@ -395,6 +396,30 @@ private: } }; +class SubnetEvaluator : public CombinedEvaluator { +public: + SubnetEvaluator(const std::string& layerName, + std::unique_ptr&& evaluator) + : layerName_(layerName) { + addEvaluator(std::move(evaluator)); + } + virtual void eval(const NeuralNetwork& nn) override { + const LayerPtr& layer = nn.getLayer(layerName_); + CHECK(layer) << "Nonexisted layer: " << layerName_ << " in submodel " + << nn.getName(); + bool accessed = false; + layer->accessSubNetwork([this, &accessed](NeuralNetwork& subnet) { + subnet.eval(evaluators_[0].get()); + accessed = true; + }); + CHECK(accessed) << "There is no subnetwork for layer " << layerName_ + << " in submodel " << nn.getName(); + } + +protected: + std::string layerName_; +}; + Evaluator* NeuralNetwork::makeEvaluator() const { CombinedEvaluator* combinedEvaluator = new CombinedEvaluator(); auto subModelConfig = std::find_if(config_.sub_models().begin(), @@ -421,6 +446,15 @@ Evaluator* NeuralNetwork::makeEvaluator() const { combinedEvaluator->addEvaluator(std::move(evaluator)); } } + for (auto& layer : layers_) { + layer->accessSubNetwork( + [layer, combinedEvaluator](NeuralNetwork& subnet) { + std::unique_ptr subEvaluator(new SubnetEvaluator( + layer->getName(), + std::unique_ptr(subnet.makeEvaluator()))); + combinedEvaluator->addEvaluator(std::move(subEvaluator)); + }); + } } else { for (const EvaluatorConfig& evalConfig : config_.evaluators()) { std::unique_ptr evaluator(Evaluator::create(evalConfig)); diff --git a/paddle/gserver/gradientmachines/NeuralNetwork.h b/paddle/gserver/gradientmachines/NeuralNetwork.h index e7b6c438407e7eab6eab1f6ed496f35caa9f2177..12810f642519b7965fc1b7d751290445e3350dd5 100644 --- a/paddle/gserver/gradientmachines/NeuralNetwork.h +++ b/paddle/gserver/gradientmachines/NeuralNetwork.h @@ -129,6 +129,8 @@ public: static NeuralNetwork* newNeuralNetwork(const std::string& name = "", NeuralNetwork* rootNetwork = nullptr); + const std::string& getName() const { return subModelName_; } + protected: /** * The constructor of NeuralNetwork. diff --git a/paddle/gserver/gradientmachines/RecurrentGradientMachine.cpp b/paddle/gserver/gradientmachines/RecurrentGradientMachine.cpp index 3e930380226bce58cc90704b4c4cfa36e9f70968..9a972466d66ba1417b2c31e66dc375b3da229aa8 100644 --- a/paddle/gserver/gradientmachines/RecurrentGradientMachine.cpp +++ b/paddle/gserver/gradientmachines/RecurrentGradientMachine.cpp @@ -208,6 +208,7 @@ void RecurrentGradientMachine::init( }); CHECK(subModelConfig != config.sub_models().end()); reversed_ = subModelConfig->reversed(); + generating_ = subModelConfig->has_generator(); inFrameLines_.resize(subModelConfig->in_links_size()); for (size_t i = 0; i < inFrameLines_.size(); ++i) { @@ -287,10 +288,6 @@ void RecurrentGradientMachine::init( parameterIds_.push_back(para->getID()); } } - - if (subModelConfig->evaluator_names_size() > 0) { - evaluator_.reset(frames_[0]->makeEvaluator()); - } } void RecurrentGradientMachine::resizeOrCreateFrames(int numFrames) { @@ -538,7 +535,7 @@ void RecurrentGradientMachine::forward(const std::vector& inArgs, The outputs are outFramesLines_[i].agentLayer */ - if (inFrameLines_.empty() && passType == PASS_TEST) { + if (generating_) { generateSequence(); return; } // else forward.. @@ -561,14 +558,14 @@ void RecurrentGradientMachine::forward(const std::vector& inArgs, std::vector outArgs; frames_[i]->forward(inArgs, &outArgs, passType); } - if (evaluator_ && passType == PASS_TEST) { - this->eval(evaluator_.get()); - } reorganizeOutput(passType); } void RecurrentGradientMachine::backward(const UpdateCallback& callback) { + if (generating_) { + return; + } REGISTER_TIMER_INFO("RecurrentBwTime", "RecurrentBwTime"); AsyncGpuBlock asyncGpuBlock; for (int i = maxSequenceLength_ - 1; i >= 0; --i) { @@ -577,11 +574,6 @@ void RecurrentGradientMachine::backward(const UpdateCallback& callback) { for (auto& memoryFrameLine : memoryFrameLines_) { memoryFrameLine.bootLayer->backward(nullptr); } - - // call printers here so the gradient can be printed - if (evaluator_) { - this->eval(evaluator_.get()); - } } void RecurrentGradientMachine::forwardBackward( @@ -595,9 +587,9 @@ void RecurrentGradientMachine::forwardBackward( void RecurrentGradientMachine::eval(Evaluator* evaluator) const { // call printers frame by frame for (int i = 0; i < maxSequenceLength_; ++i) { - LOG(INFO) << "Recurrent Layer Group eval frame " << i << " begin"; + VLOG(2) << "Recurrent Layer Group eval frame " << i << " begin"; evaluator->eval(*(frames_[i].get())); - LOG(INFO) << "Recurrent Layer Group eval frame " << i << " end"; + VLOG(2) << "Recurrent Layer Group eval frame " << i << " end"; } } @@ -1093,10 +1085,6 @@ void RecurrentGradientMachine::oneWaySearch(size_t batchSize) { copyDataOutlinkFrame(machineCur); - // call value printer - if (evaluator_) { - evaluator_->eval(*(frames_[machineCur].get())); - } // check eos const IVectorPtr& eosVec = eosFrameLine_->layers[machineCur]->getOutput().ids; @@ -1321,11 +1309,10 @@ void RecurrentGradientMachine::fillGenOutputs() { batchMachineIdVec_.clear(); generator_.ids.clear(); + int* starts = generator_.outArg.sequenceStartPositions->getMutableData(false); + starts[0] = 0; if (numResults > 1) { real* probs = generator_.outArg.in->getData(); - int* starts = - generator_.outArg.sequenceStartPositions->getMutableData(false); - starts[0] = 0; for (size_t i = 0; i < finalPaths_.size(); ++i) { for (size_t j = 0; j < finalPaths_[i].size(); ++j) { Path& path = finalPaths_[i][j]; @@ -1348,7 +1335,10 @@ void RecurrentGradientMachine::fillGenOutputs() { } else { for (size_t i = 0; i < finalPaths_.size(); ++i) { CHECK(!finalPaths_[i].empty()); - generator_.ids = finalPaths_[i][0].ids; + generator_.ids.insert(generator_.ids.begin(), + finalPaths_[i][0].ids.begin(), + finalPaths_[i][0].ids.end()); + starts[i + 1] = starts[i] + finalPaths_[i][0].ids.size(); } } } diff --git a/paddle/gserver/gradientmachines/RecurrentGradientMachine.h b/paddle/gserver/gradientmachines/RecurrentGradientMachine.h index 8d94d7e2df216c4657d759c16dd6b1f2848996e0..f245620cf668bb341df99cf498105cbd996a6b24 100644 --- a/paddle/gserver/gradientmachines/RecurrentGradientMachine.h +++ b/paddle/gserver/gradientmachines/RecurrentGradientMachine.h @@ -414,6 +414,7 @@ protected: std::vector ids; // store generated sequences Argument outArg; // final output argument }; + bool generating_; Generator generator_; std::vector> frames_; @@ -428,8 +429,6 @@ protected: std::vector parameterIds_; // parameters actually used by this Layer Group - std::unique_ptr evaluator_; // frame printers in this layer group - // store final argument of outFrameLines_ std::vector dataArgs_; // store each frame's output argument of outFrameLines_ diff --git a/paddle/gserver/layers/AgentLayer.cpp b/paddle/gserver/layers/AgentLayer.cpp index 31463823b3fc04cc24068d95887a9d3ed25a6168..15e7411b5fde0fa3a532394cf7d0e8477ef052d0 100644 --- a/paddle/gserver/layers/AgentLayer.cpp +++ b/paddle/gserver/layers/AgentLayer.cpp @@ -109,6 +109,40 @@ void GatherAgentLayer::forwardValue(PassType passType) { } } +namespace { + +// dest[index[i]] <- src[i] for each i +void copyElements(const IVector& srcVec, + const IVector& indexVec, + IVector& destVec) { + const int* src = srcVec.getData(); + const int* index = indexVec.getData(); + int* dest = destVec.getData(); + int len = indexVec.getSize(); + CHECK_EQ(srcVec.getSize(), indexVec.getSize()); + for (int i = 0; i < len; ++i) { + dest[index[i]] = src[i]; + } +} +} + +void GatherAgentLayer::forwardIds(PassType passType) { + IVectorPtr realId = realLayers_[0]->getOutputLabel(); + if (!realId) return; + + IVector::resizeOrCreate(output_.ids, allIds_->getSize(), useGpu_); + IVectorPtr outId = output_.ids; + idsVec_.resize(idIndex_.size()); + + for (size_t i = 0; i < realLayers_.size(); ++i) { + const IVectorPtr& realId = realLayers_[i]->getOutputLabel(); + idsVec_[i] = IVector::create(allIds_->getData() + idIndex_[i], + /* size */ realId->getSize(), + useGpu_); + execViaCpu(©Elements, *realId, *idsVec_[i], *outId); + } +} + void GatherAgentLayer::backward(const UpdateCallback& callback) { (void)callback; const MatrixPtr& outputGrad = getOutputGrad(); @@ -136,23 +170,22 @@ void ScatterAgentLayer::forward(PassType passType) { CHECK_EQ(realLayer_->getDeviceId(), this->getDeviceId()); int width = this->getSize(); - if (realOutArg_.hasSeq()) { - forwardSequence(passType); - } else if (realOutArg_.value || realOutArg_.ids) { - output_.subArgFrom( - realOutArg_, /* offset */ idIndex_, idSize_, width, useGpu_); - } else { // used in generation - if (realLayer_->getOutput().ids) { - IVector::resizeOrCreate(output_.ids, ids_->getSize(), useGpu_); - output_.ids->selectFrom(*realLayer_->getOutput().ids, *ids_); - } - if (realLayer_->getOutput().value) { - int height = ids_->getSize(); - resetOutput(height, width); - - const MatrixPtr& outV = getOutputValue(); - const MatrixPtr& realV = realLayer_->getOutputValue(); - outV->selectRows(*realV, *ids_); + if (selectionMode_) { + forwardWithSelection(passType); + } else { + if (realOutArg_.hasSeq()) { + output_.subArgFrom(realOutArg_, + /* offset */ idIndex_, + idSize_, + width, + useGpu_, + /* trans */ false, + /* seqFlag */ true, + /* seqStart */ seqStartPosIndex_, + /* seqSize */ numSequences_); + } else { + output_.subArgFrom( + realOutArg_, /* offset */ idIndex_, idSize_, width, useGpu_); } } } @@ -160,6 +193,8 @@ void ScatterAgentLayer::forward(PassType passType) { void ScatterAgentLayer::backward(const UpdateCallback& callback) { (void)callback; + CHECK(!selectionMode_); + const MatrixPtr& outputGrad = realOutArg_.grad; const MatrixPtr& realGrad = realLayer_->getOutputGrad(); if (realGrad) { @@ -174,42 +209,7 @@ void ScatterAgentLayer::backward(const UpdateCallback& callback) { REGISTER_LAYER(gather_agent, GatherAgentLayer); REGISTER_LAYER(scatter_agent, ScatterAgentLayer); -void GatherAgentLayer::forwardIds(PassType passType) { - int height = 0; - IVectorPtr idReal = realLayers_[0]->getOutputLabel(); - - if (!idReal) return; - - if (output_.subSequenceStartPositions) { - int* starts = output_.subSequenceStartPositions->getMutableData(false); - // Gather generator.idsVec - // if is beam search generation result. Get first result. - if (idReal->getData()[idReal->getSize() - 1] == -1) { - for (size_t i = 0; i < realLayers_.size(); ++i) { - // The first element stores first result size - idReal = realLayers_[i]->getOutputLabel(); - idReal->subVecFrom(*idReal, 1, idReal->getData()[0]); - } - } - for (size_t i = 0; i < realLayers_.size(); ++i) { - CHECK(realLayers_[i]->getOutputLabel()); - starts[i] = height; - height += realLayers_[i]->getOutputLabel()->getSize(); - } - starts[realLayers_.size()] = height; - output_.sequenceStartPositions->getMutableData(false)[1] = height; - - IVector::resizeOrCreate(output_.ids, height, false); - for (size_t i = 0; i < realLayers_.size(); ++i) { - output_.ids->subVec(starts[i], starts[i + 1] - starts[i]) - ->copyFrom(*realLayers_[i]->getOutputLabel()); - } - } else { - LOG(FATAL) << "Not implemented"; - } -} - -void ScatterAgentLayer::forwardSequence(PassType passType) { +void ScatterAgentLayer::forwardWithSelection(PassType passType) { Layer::forward(passType); CHECK_EQ(realLayer_->getDeviceId(), this->getDeviceId()); @@ -220,17 +220,19 @@ void ScatterAgentLayer::forwardSequence(PassType passType) { AsyncGpuBlock asyncGpuBlock; REGISTER_TIMER_INFO("SequenceAgentLayerForward", getName().c_str()); - if (realOutArg_.value || realOutArg_.ids) { - CHECK(realOutArg_.sequenceStartPositions); - output_.subArgFrom(realOutArg_, - /* offset */ idIndex_, - idSize_, - width, - useGpu_, - /* trans */ false, - /* seqFlag */ true, - /* seqStart */ seqStartPosIndex_, - /* seqSize */ numSequences_); + if (!input.hasSeq()) { + if (realLayer_->getOutput().ids) { + IVector::resizeOrCreate(output_.ids, ids_->getSize(), useGpu_); + output_.ids->selectFrom(*realLayer_->getOutput().ids, *ids_); + } + if (realLayer_->getOutput().value) { + int height = ids_->getSize(); + resetOutput(height, width); + + const MatrixPtr& outV = getOutputValue(); + const MatrixPtr& realV = realLayer_->getOutputValue(); + outV->selectRows(*realV, *ids_); + } } else { // Putting the generation logic here is really an ugly hack! // used in generation diff --git a/paddle/gserver/layers/AgentLayer.h b/paddle/gserver/layers/AgentLayer.h index 461b84b17e556b53e0734bff8e37a0d529a3290e..29681b29c6a9a10715548839f2d365eb4a0c7381 100644 --- a/paddle/gserver/layers/AgentLayer.h +++ b/paddle/gserver/layers/AgentLayer.h @@ -110,6 +110,9 @@ protected: // of real layer. ICpuGpuVectorPtr inputStartPos_; + // true for setRealLayer, false for setRealLayerAndOutput + bool selectionMode_; + public: explicit ScatterAgentLayer(const LayerConfig& config) : Layer(config) {} @@ -137,6 +140,7 @@ public: } else { cpuIds_ = ids_; } + selectionMode_ = true; } // set real layer and output, [idIndex, idIndex + idSize) of *ids* @@ -153,6 +157,7 @@ public: idIndex_ = idIndex; idSize_ = idSize; handleBackward_ = handleBackward; + selectionMode_ = false; } void setSequenceStartPositions(const ICpuGpuVectorPtr& sequenceStartPositions, @@ -166,7 +171,7 @@ public: void forward(PassType passType) override; void backward(const UpdateCallback& callback) override; - void forwardSequence(PassType passType); + void forwardWithSelection(PassType passType); }; } // namespace paddle diff --git a/paddle/trainer/tests/sample_trainer_nest_rnn_gen.conf b/paddle/trainer/tests/sample_trainer_nest_rnn_gen.conf index d669fbc40cbc19df309d8bf20c942a9d8fc8f47d..741a0aa71df7866c180ab2513f28638117d0f1ca 100644 --- a/paddle/trainer/tests/sample_trainer_nest_rnn_gen.conf +++ b/paddle/trainer/tests/sample_trainer_nest_rnn_gen.conf @@ -35,7 +35,7 @@ def outer_step(dummy_data): embedding_size=num_words)] def inner_step(dummy_memory, predict_word): - + # simplified RNN for testing with mixed_layer(size=num_words) as layer: layer += full_matrix_projection(input=predict_word, @@ -46,15 +46,15 @@ def outer_step(dummy_data): param_attr=ParamAttr(name="wordvec")) return out - + beam_gen = beam_search(name="rnn_gen", step=inner_step, input=gen_inputs, bos_id=0, eos_id=num_words-1, beam_size=2 if beam_flag else 1, - num_results_per_sample=2 if beam_flag else 1, - max_length=10) + num_results_per_sample=1, + max_length=10) return beam_gen beam_gen_concat = recurrent_group(name="rnn_gen_concat", diff --git a/paddle/trainer/tests/sample_trainer_rnn_gen.conf b/paddle/trainer/tests/sample_trainer_rnn_gen.conf index 2b337282f6285afb527e9bbf138d2e8184700d8d..58d27f15ae1c0a38885ee105a7963b6e7bd55906 100644 --- a/paddle/trainer/tests/sample_trainer_rnn_gen.conf +++ b/paddle/trainer/tests/sample_trainer_rnn_gen.conf @@ -33,7 +33,7 @@ gen_inputs = [StaticInput(input=dummy_data, size=2), embedding_size=num_words)] def step(dummy_memory, predict_word): - + # simplified RNN for testing with mixed_layer(size=num_words) as layer: layer += full_matrix_projection(input=predict_word, @@ -44,7 +44,7 @@ def step(dummy_memory, predict_word): param_attr=ParamAttr(name="wordvec")) return out - + beam_gen = beam_search(name="rnn_gen", step=step, input=gen_inputs, @@ -52,7 +52,7 @@ beam_gen = beam_search(name="rnn_gen", eos_id=num_words-1, beam_size=2 if beam_flag else 1, num_results_per_sample=2 if beam_flag else 1, - max_length=10) + max_length=10) seqtext_printer_evaluator(input=beam_gen, id_input=sent_id, diff --git a/paddle/utils/CustomStackTrace.h b/paddle/utils/CustomStackTrace.h index 6992e856223494d6575ef3261d82cbdf4e375885..52a6df94979fd3d8d7d540ed0e3898bb3375d975 100644 --- a/paddle/utils/CustomStackTrace.h +++ b/paddle/utils/CustomStackTrace.h @@ -55,13 +55,17 @@ public: * Else, just set status to popping. */ void pop(const T& item) { - pushing() = false; auto& s = this->stack(); if (item == s.top()) { s.pop(); } } + /** + * @brief Indicate whether we are at forward or backward stage of computation + */ + void set_stage(bool isForward) { pushing() = isForward; } + /** * @brief clear current thread stack. */ diff --git a/paddle/utils/tests/test_CustomStackTrace.cpp b/paddle/utils/tests/test_CustomStackTrace.cpp index b5d9f93f1376048eabd726331006b0bb848bce11..c320074fbadab3e211ed72ce715d595c90673d6d 100644 --- a/paddle/utils/tests/test_CustomStackTrace.cpp +++ b/paddle/utils/tests/test_CustomStackTrace.cpp @@ -72,7 +72,6 @@ TEST(CustomStackTrace, normalTrain) { for (size_t i = 0; i < layerSize; ++i) { tracer.push("layer_" + paddle::str::to_string(i)); } - tracer.pop(""); for (size_t i = 0; i < layerSize; ++i) { tracer.pop("layer_" + paddle::str::to_string(layerSize - 1 - i)); } diff --git a/python/paddle/v2/dataset/common.py b/python/paddle/v2/dataset/common.py index e09ac1a7a0fe70dbf58a04f51cdf6916485e9be1..72894c24b16162f41db729d8bcabf09cd5510922 100644 --- a/python/paddle/v2/dataset/common.py +++ b/python/paddle/v2/dataset/common.py @@ -27,13 +27,17 @@ __all__ = ['DATA_HOME', 'download', 'md5file', 'split', 'cluster_files_reader'] DATA_HOME = os.path.expanduser('~/.cache/paddle/dataset') -if not os.path.exists(DATA_HOME): - try: - os.makedirs(DATA_HOME) - except OSError as exc: - if exc.errno != errno.EEXIST: - raise - pass +# When running unit tests, there could be multiple processes that +# trying to create DATA_HOME directory simultaneously, so we cannot +# use a if condition to check for the existence of the directory; +# instead, we use the filesystem as the synchronization mechanism by +# catching returned errors. +try: + os.makedirs(DATA_HOME) +except OSError as exc: + if exc.errno != errno.EEXIST: + raise + pass def md5file(fname): diff --git a/python/paddle/v2/layer.py b/python/paddle/v2/layer.py index bbb9c3ea8c1b389f0ec9fd5ec7be52bd0449f52d..4ade1c6f329ae39769279963af6809f938807bdd 100644 --- a/python/paddle/v2/layer.py +++ b/python/paddle/v2/layer.py @@ -45,12 +45,12 @@ __all__ = ['data', 'parse_network'] def __need_to_keep__(name): return name in [ 'StaticInput', 'SubsequenceInput', 'GeneratedInput', 'LayerType', - 'layer_support' + 'layer_support', 'BaseGeneratedInput' ] def __need_to_wrap__(name): - return name not in ['AggregateLevel', 'ExpandLevel'] + return name not in ['AggregateLevel', 'ExpandLevel', 'BaseGeneratedInput'] def __convert_name__(inname): @@ -199,6 +199,15 @@ def __get_used_submodels__(layer_names): return submodel_names +def __get_submodel_data_out_links__(): + data_links = set() + for submodel in cp.g_config.model_config.sub_models: + for link in submodel.out_links: + if cp.g_layer_map[link.link_name].type == 'data': + data_links.add(link.link_name) + return data_links + + def __get_used_evaluators__(layer_names): evaluator_names = set() for e in cp.g_config.model_config.evaluators: @@ -264,6 +273,7 @@ def parse_network(output_layers, extra_layers=None): submodel_names = __get_used_submodels__(layer_names) submodel_names.add('root') evaluator_names = __get_used_evaluators__(layer_names) + data_out_links = __get_submodel_data_out_links__() input_layer_names = set() output_layer_names = set() @@ -279,7 +289,7 @@ def parse_network(output_layers, extra_layers=None): continue model_config.layers.extend([l]) if l.type == 'data': - if l.name in model_config.output_layer_names: + if l.name in data_out_links: """ In text generation, the outlink to save the generated word indices is a data_layer defined in recurrent_group. This diff --git a/python/paddle/v2/reader/decorator.py b/python/paddle/v2/reader/decorator.py index c76faa596c9fb9079cab3456b721c18ef9768e95..e432003129d2b8dea60138d08f13ec5e9d29a7ad 100644 --- a/python/paddle/v2/reader/decorator.py +++ b/python/paddle/v2/reader/decorator.py @@ -230,7 +230,7 @@ class XmapEndSignal(): pass -def xmap_readers(mapper, reader, process_num, buffer_size): +def xmap_readers(mapper, reader, process_num, buffer_size, order=False): """ Use multiprocess to map samples from reader by a mapper defined by user. And this function contains a buffered decorator. @@ -242,12 +242,15 @@ def xmap_readers(mapper, reader, process_num, buffer_size): :type process_num: int :param buffer_size: max buffer size :type buffer_size: int + :param order: keep the order of reader + :type order: bool :return: the decarated reader :rtype: callable """ end = XmapEndSignal() in_queue = Queue(buffer_size) out_queue = Queue(buffer_size) + out_order = [0] # define a worker to read samples from reader to in_queue def read_worker(reader, in_queue): @@ -255,8 +258,17 @@ def xmap_readers(mapper, reader, process_num, buffer_size): in_queue.put(i) in_queue.put(end) + # define a worker to read samples from reader to in_queue with order flag + def order_read_worker(reader, in_queue): + in_order = 0 + for i in reader(): + in_queue.put((in_order, i)) + in_order += 1 + in_queue.put(end) + # start a read worker in a thread - t = Thread(target=read_worker, args=(reader, in_queue)) + target = order_read_worker if order else read_worker + t = Thread(target=target, args=(reader, in_queue)) t.daemon = True t.start() @@ -271,11 +283,28 @@ def xmap_readers(mapper, reader, process_num, buffer_size): in_queue.put(end) out_queue.put(end) + # define a worker to handle samples from in_queue by mapper + # and put mapped samples into out_queue by order + def order_handle_worker(in_queue, out_queue, mapper, out_order): + ins = in_queue.get() + while not isinstance(ins, XmapEndSignal): + order, sample = ins + r = mapper(sample) + while order != out_order[0]: + pass + out_queue.put(r) + out_order[0] += 1 + ins = in_queue.get() + in_queue.put(end) + out_queue.put(end) + # start several handle_workers + target = order_handle_worker if order else handle_worker + args = (in_queue, out_queue, mapper, out_order) if order else ( + in_queue, out_queue, mapper) workers = [] for i in xrange(process_num): - worker = Thread( - target=handle_worker, args=(in_queue, out_queue, mapper)) + worker = Thread(target=target, args=args) worker.daemon = True workers.append(worker) for w in workers: diff --git a/python/paddle/v2/reader/tests/decorator_test.py b/python/paddle/v2/reader/tests/decorator_test.py index 734154b9790a4dc118d11992343648364c907305..bb3c5d220b9ce1552d2fc429abb1863930cd4d17 100644 --- a/python/paddle/v2/reader/tests/decorator_test.py +++ b/python/paddle/v2/reader/tests/decorator_test.py @@ -121,5 +121,27 @@ class TestShuffle(unittest.TestCase): self.assertEqual(total, 10) +class TestXmap(unittest.TestCase): + def test_xmap(self): + def mapper(x): + return (x + 1) + + orders = (True, False) + thread_nums = (1, 2, 4, 8, 16) + buffered_size = (1, 2, 4, 8, 16) + for order in orders: + for tNum in thread_nums: + for size in buffered_size: + result = [] + for i in paddle.v2.reader.xmap_readers(mapper, + reader_creator_10(0), + tNum, size, order)(): + result.append(i) + if not order: + result.sort() + for idx, e in enumerate(result): + self.assertEqual(e, mapper(idx)) + + if __name__ == '__main__': unittest.main()