From fe3c61a7bf937222c11b9a31e0768c6c845c78a6 Mon Sep 17 00:00:00 2001 From: Matt Pharr Date: Wed, 27 Oct 2021 11:41:39 -0700 Subject: [PATCH] Rewrite asynchronous job/futures code to not be based on std::future. The Future class is now gone, and its functionality ("is the result ready?", "give me the result value") is now folded into AsyncJob. This allows retrieving the result from multiple threads, with synchronization handled in AsyncJob. (This is more flexible than std::future allows, since we don't need either move semantics or the ability to pass along exceptions when the future is harvested.) This fixes a number of race conditions that helgrind was flagging during startup with complex scenes. (Our prior use of std::future wasn't correct in that we had unprotected access attempts from multiple threads.) --- src/pbrt/gpu/aggregate.cpp | 10 ++-- src/pbrt/parser.cpp | 9 +-- src/pbrt/scene.cpp | 57 +++++++++---------- src/pbrt/scene.h | 32 +++++------ src/pbrt/util/parallel.cpp | 10 +--- src/pbrt/util/parallel.h | 97 ++++++++++++++++----------------- src/pbrt/util/parallel_test.cpp | 6 +- 7 files changed, 109 insertions(+), 112 deletions(-) diff --git a/src/pbrt/gpu/aggregate.cpp b/src/pbrt/gpu/aggregate.cpp index da4a62c..45c5bd0 100644 --- a/src/pbrt/gpu/aggregate.cpp +++ b/src/pbrt/gpu/aggregate.cpp @@ -1337,7 +1337,7 @@ OptiXAggregate::OptiXAggregate( BVH bvh; int sbtOffset; }; - Future triFuture = RunAsync([&]() { + AsyncJob *triJob = RunAsync([&]() { BVH triangleBVH = buildBVHForTriangles( scene.shapes, plyMeshes, optixContext, hitPGTriangle, anyhitPGShadowTriangle, hitPGRandomHitTriangle, textures.floatTextures, namedMaterials, materials, media, @@ -1346,7 +1346,7 @@ OptiXAggregate::OptiXAggregate( return GAS{triangleBVH, sbtOffset}; }); - Future blpFuture = RunAsync([&]() { + AsyncJob *blpJob = RunAsync([&]() { BVH blpBVH = buildBVHForBLPs( scene.shapes, optixContext, hitPGBilinearPatch, anyhitPGShadowBilinearPatch, hitPGRandomHitBilinearPatch, textures.floatTextures, namedMaterials, materials, @@ -1355,7 +1355,7 @@ OptiXAggregate::OptiXAggregate( return GAS{blpBVH, bilinearSBTOffset}; }); - Future quadricFuture = RunAsync([&]() { + AsyncJob *quadricJob = RunAsync([&]() { BVH quadricBVH = buildBVHForQuadrics( scene.shapes, optixContext, hitPGQuadric, anyhitPGShadowQuadric, hitPGRandomHitQuadric, textures.floatTextures, namedMaterials, materials, media, @@ -1480,8 +1480,8 @@ OptiXAggregate::OptiXAggregate( gasInstance.flags = OPTIX_INSTANCE_FLAG_NONE; // TODO: OPTIX_INSTANCE_FLAG_DISABLE_ANYHIT LOG_VERBOSE("Starting to consume top-level GAS futures"); - for (Future *fut : {&triFuture, &blpFuture, &quadricFuture}) { - GAS gas = fut->Get(); + for (AsyncJob *job : {triJob, blpJob, quadricJob}) { + GAS gas = job->Get(); if (gas.bvh.traversableHandle) { gasInstance.traversableHandle = gas.bvh.traversableHandle; gasInstance.sbtOffset = gas.sbtOffset; diff --git a/src/pbrt/parser.cpp b/src/pbrt/parser.cpp index 70a834b..d29389a 100644 --- a/src/pbrt/parser.cpp +++ b/src/pbrt/parser.cpp @@ -603,7 +603,7 @@ void parse(ParserTarget *target, std::unique_ptr t) { static std::atomic warnedTransformBeginEndDeprecated{false}; - std::vector, BasicSceneBuilder *>> imports; + std::vector *, BasicSceneBuilder *>> imports; LOG_VERBOSE("Started parsing %s", std::string(t->loc.filename.begin(), t->loc.filename.end())); @@ -796,9 +796,10 @@ void parse(ParserTarget *target, std::unique_ptr t) { parse(importBuilder, std::move(timport)); LOG_VERBOSE("Elapsed time to parse \"%s\": %.2fs", filename, timer.ElapsedSeconds()); + return 0; }; - Future jobFinished = RunAsync(job, filename); - imports.push_back(std::make_pair(std::move(jobFinished), importBuilder)); + AsyncJob *jobFinished = RunAsync(job, filename); + imports.push_back(std::make_pair(jobFinished, importBuilder)); } } } else if (tok->token == "Identity") @@ -980,7 +981,7 @@ void parse(ParserTarget *target, std::unique_ptr t) { } for (auto &import : imports) { - import.first.Wait(); + import.first->Wait(); BasicSceneBuilder *builder = dynamic_cast(target); CHECK(builder); diff --git a/src/pbrt/scene.cpp b/src/pbrt/scene.cpp index 4e0abfe..ef26bcb 100644 --- a/src/pbrt/scene.cpp +++ b/src/pbrt/scene.cpp @@ -6,6 +6,7 @@ #include #include +#include #ifdef PBRT_BUILD_GPU_RENDERER #include #endif // PBRT_BUILD_GPU_RENDERER @@ -735,7 +736,7 @@ void BasicScene::SetOptions(SceneEntity filter, SceneEntity film, LOG_VERBOSE("Finished creating filter and film"); // Enqueue asynchronous job to create sampler - samplerFuture = RunAsync([sampler, this]() { + samplerJob = RunAsync([sampler, this]() { LOG_VERBOSE("Starting to create sampler"); Allocator alloc = threadAllocators.Get(); Point2i res = this->film.FullResolution(); @@ -744,7 +745,7 @@ void BasicScene::SetOptions(SceneEntity filter, SceneEntity film, }); // Enqueue asynchronous job to create camera - cameraFuture = RunAsync([camera, this]() { + cameraJob = RunAsync([camera, this]() { LOG_VERBOSE("Starting to create camera"); Allocator alloc = threadAllocators.Get(); Medium cameraMedium = GetMedium(camera.medium, &camera.loc); @@ -773,7 +774,7 @@ void BasicScene::AddMedium(MediumSceneEntity medium) { }; std::lock_guard lock(mediaMutex); - mediumFutures[medium.name] = RunAsync(create); + mediumJobs[medium.name] = RunAsync(create); } Medium BasicScene::GetMedium(const std::string &name, const FileLoc *loc) { @@ -787,14 +788,14 @@ Medium BasicScene::GetMedium(const std::string &name, const FileLoc *loc) { mediaMutex.unlock(); return m; } else { - auto fiter = mediumFutures.find(name); - if (fiter == mediumFutures.end()) + auto fiter = mediumJobs.find(name); + if (fiter == mediumJobs.end()) ErrorExit(loc, "%s: medium is not defined.", name); - pstd::optional m = fiter->second.TryGet(&mediaMutex); + pstd::optional m = fiter->second->TryGet(&mediaMutex); if (m) { mediaMap[name] = *m; - mediumFutures.erase(fiter); + mediumJobs.erase(fiter); mediaMutex.unlock(); return *m; } @@ -804,18 +805,18 @@ Medium BasicScene::GetMedium(const std::string &name, const FileLoc *loc) { std::map BasicScene::CreateMedia() { mediaMutex.lock(); - if (!mediumFutures.empty()) { + if (!mediumJobs.empty()) { // Consume futures for asynchronously-created _Medium_ objects LOG_VERBOSE("Consume media futures start"); - for (auto &m : mediumFutures) { + for (auto &m : mediumJobs) { while (mediaMap.find(m.first) == mediaMap.end()) { - pstd::optional med = m.second.TryGet(&mediaMutex); + pstd::optional med = m.second->TryGet(&mediaMutex); if (med) mediaMap[m.first] = *med; } } LOG_VERBOSE("Consume media futures finished"); - mediumFutures.clear(); + mediumJobs.clear(); } mediaMutex.unlock(); return mediaMap; @@ -859,7 +860,7 @@ void BasicScene::startLoadingNormalMaps(const ParameterDictionary ¶meters) { return; // Overload materialMutex, which we already hold, for the futures... - if (normalMapFutures.find(filename) != normalMapFutures.end()) + if (normalMapJobs.find(filename) != normalMapJobs.end()) // It's already in flight. return; @@ -876,7 +877,7 @@ void BasicScene::startLoadingNormalMaps(const ParameterDictionary ¶meters) { return normalMap; }; - normalMapFutures[filename] = RunAsync(create, filename); + normalMapJobs[filename] = RunAsync(create, filename); } void BasicScene::AddFloatTexture(std::string name, TextureSceneEntity texture) { @@ -921,7 +922,7 @@ void BasicScene::AddFloatTexture(std::string name, TextureSceneEntity texture) { return FloatTexture::Create(texture.name, renderFromTexture, texDict, &texture.loc, alloc, Options->useGPU); }; - floatTextureFutures[name] = RunAsync(create, texture); + floatTextureJobs[name] = RunAsync(create, texture); } void BasicScene::AddSpectrumTexture(std::string name, TextureSceneEntity texture) { @@ -967,7 +968,7 @@ void BasicScene::AddSpectrumTexture(std::string name, TextureSceneEntity texture SpectrumType::Albedo, &texture.loc, alloc, Options->useGPU); }; - spectrumTextureFutures[name] = RunAsync(create, texture); + spectrumTextureJobs[name] = RunAsync(create, texture); } void BasicScene::AddLight(LightSceneEntity light) { @@ -984,7 +985,7 @@ void BasicScene::AddLight(LightSceneEntity light) { GetCamera().GetCameraTransform(), lightMedium, &light.loc, threadAllocators.Get()); }; - lightFutures.push_back(RunAsync(create)); + lightJobs.push_back(RunAsync(create)); } int BasicScene::AddAreaLight(SceneEntity light) { @@ -1082,11 +1083,11 @@ void BasicScene::CreateMaterials(const NamedTextures &textures, std::vector *materialsOut) { LOG_VERBOSE("Starting to consume normal map futures"); std::lock_guard lock(materialMutex); - for (auto &fut : normalMapFutures) { - CHECK(normalMaps.find(fut.first) == normalMaps.end()); - normalMaps[fut.first] = fut.second.Get(); + for (auto &job : normalMapJobs) { + CHECK(normalMaps.find(job.first) == normalMaps.end()); + normalMaps[job.first] = job.second->Get(); } - normalMapFutures.clear(); + normalMapJobs.clear(); LOG_VERBOSE("Finished consuming normal map futures"); // Named materials @@ -1143,12 +1144,12 @@ NamedTextures BasicScene::CreateTextures() { // active when CreateTextures() is called, but valgrind doesn't know // that... textureMutex.lock(); - for (auto &tex : floatTextureFutures) - textures.floatTextures[tex.first] = tex.second.Get(); - floatTextureFutures.clear(); - for (auto &tex : spectrumTextureFutures) - textures.albedoSpectrumTextures[tex.first] = tex.second.Get(); - spectrumTextureFutures.clear(); + for (auto &tex : floatTextureJobs) + textures.floatTextures[tex.first] = tex.second->Get(); + floatTextureJobs.clear(); + for (auto &tex : spectrumTextureJobs) + textures.albedoSpectrumTextures[tex.first] = tex.second->Get(); + spectrumTextureJobs.clear(); textureMutex.unlock(); LOG_VERBOSE("Finished consuming texture futures"); @@ -1303,8 +1304,8 @@ std::vector BasicScene::CreateLights( LOG_VERBOSE("Starting to consume non-area light futures"); std::lock_guard lock(lightMutex); - for (auto &fut : lightFutures) - lights.push_back(fut.Get()); + for (auto &job : lightJobs) + lights.push_back(job->Get()); LOG_VERBOSE("Finished consuming non-area light futures"); return lights; diff --git a/src/pbrt/scene.h b/src/pbrt/scene.h index a988734..dd121c1 100644 --- a/src/pbrt/scene.h +++ b/src/pbrt/scene.h @@ -8,7 +8,6 @@ #include #include -#include #include #include #include @@ -30,6 +29,7 @@ namespace pbrt { class Integrator; +class Primitive; // SceneEntity Definition struct SceneEntity { @@ -278,24 +278,24 @@ class BasicScene { void Done(); Camera GetCamera() { - cameraFutureMutex.lock(); + cameraJobMutex.lock(); while (!camera) { - pstd::optional c = cameraFuture.TryGet(&cameraFutureMutex); + pstd::optional c = cameraJob->TryGet(&cameraJobMutex); if (c) camera = *c; } - cameraFutureMutex.unlock(); + cameraJobMutex.unlock(); return camera; } Sampler GetSampler() { - samplerFutureMutex.lock(); + samplerJobMutex.lock(); while (!sampler) { - pstd::optional s = samplerFuture.TryGet(&samplerFutureMutex); + pstd::optional s = samplerJob->TryGet(&samplerJobMutex); if (s) sampler = *s; } - samplerFutureMutex.unlock(); + samplerJobMutex.unlock(); return sampler; } @@ -337,26 +337,26 @@ class BasicScene { void startLoadingNormalMaps(const ParameterDictionary ¶meters); // BasicScene Private Members - Future samplerFuture; + AsyncJob *samplerJob = nullptr; mutable ThreadLocal threadAllocators; Camera camera; Film film; - std::mutex cameraFutureMutex; - Future cameraFuture; - std::mutex samplerFutureMutex; + std::mutex cameraJobMutex; + AsyncJob *cameraJob = nullptr; + std::mutex samplerJobMutex; Sampler sampler; std::mutex mediaMutex; - std::map> mediumFutures; + std::map *> mediumJobs; std::map mediaMap; std::mutex materialMutex; - std::map> normalMapFutures; + std::map *> normalMapJobs; std::map normalMaps; std::vector> namedMaterials; std::vector materials; std::mutex lightMutex; - std::vector> lightFutures; + std::vector *> lightJobs; std::mutex areaLightMutex; std::vector areaLights; @@ -366,8 +366,8 @@ class BasicScene { std::vector> serialSpectrumTextures; std::vector> asyncSpectrumTextures; std::set loadingTextureFilenames; - std::map> floatTextureFutures; - std::map> spectrumTextureFutures; + std::map *> floatTextureJobs; + std::map *> spectrumTextureJobs; int nMissingTextures = 0; std::mutex shapeMutex, animatedShapeMutex; diff --git a/src/pbrt/util/parallel.cpp b/src/pbrt/util/parallel.cpp index 335c6dd..aeb5b75 100644 --- a/src/pbrt/util/parallel.cpp +++ b/src/pbrt/util/parallel.cpp @@ -94,10 +94,8 @@ void ThreadPool::WorkOrWait(std::unique_lock *lock, bool isEnqueuing DCHECK(!lock->owns_lock()); lock->lock(); job->activeWorkers--; - if (job->Finished()) { + if (job->Finished()) jobListCondition.notify_all(); - job->Cleanup(); - } } else // Wait for new work to arrive or the job to finish @@ -134,10 +132,8 @@ bool ThreadPool::WorkOrReturn() { DCHECK(!lock.owns_lock()); lock.lock(); job->activeWorkers--; - if (job->Finished()) { + if (job->Finished()) jobListCondition.notify_all(); - job->Cleanup(); - } return true; } @@ -195,7 +191,7 @@ std::string ThreadPool::ToString() const { } bool DoParallelWork() { - CHECK(ParallelJob::threadPool && ParallelJob::threadPool->size()); + CHECK(ParallelJob::threadPool); // lock should be held when this is called... return ParallelJob::threadPool->WorkOrReturn(); } diff --git a/src/pbrt/util/parallel.h b/src/pbrt/util/parallel.h index 9f8ed28..16e45c8 100644 --- a/src/pbrt/util/parallel.h +++ b/src/pbrt/util/parallel.h @@ -293,8 +293,6 @@ class ParallelJob { virtual std::string ToString() const = 0; - virtual void Cleanup() {} - // ParallelJob Public Members static ThreadPool *threadPool; @@ -349,47 +347,6 @@ class ThreadPool { bool DoParallelWork(); -// Future Definition -template -class Future { - public: - // Future Public Methods - Future() = default; - Future(std::future &&f) : fut(std::move(f)) {} - Future &operator=(std::future &&f) { - fut = std::move(f); - return *this; - } - - T Get() { - Wait(); - return fut.get(); - } - - pstd::optional TryGet(std::mutex *mutex) { - if (IsReady()) - return Get(); - - mutex->unlock(); - DoParallelWork(); - mutex->lock(); - return {}; - } - - void Wait() { - while (!IsReady() && DoParallelWork()) - ; - fut.wait(); - } - - bool IsReady() const { - return fut.wait_for(std::chrono::seconds(0)) == std::future_status::ready; - } - - private: - std::future fut; -}; - // AsyncJob Definition template class AsyncJob : public ParallelJob { @@ -403,22 +360,64 @@ class AsyncJob : public ParallelJob { threadPool->RemoveFromJobList(this); started = true; lock->unlock(); - work(); + + T r = work(); + std::unique_lock l(mutex); + result = r; + cv.notify_all(); + } + + bool IsReady() const { + std::lock_guard lock(mutex); + return result.has_value(); } - void Cleanup() { delete this; } + void Wait() { + while (!IsReady() && DoParallelWork()) + ; - Future GetFuture() { return work.get_future(); } + std::unique_lock lock(mutex); + if (!result.has_value()) + cv.wait(lock, [this]() { return result.has_value(); }); + } + + T Get() { + Wait(); + std::lock_guard lock(mutex); + return *result; + } + + pstd::optional TryGet(std::mutex *extMutex) { + { + std::lock_guard lock(mutex); + if (result) + return result; + } + + extMutex->unlock(); + DoParallelWork(); + extMutex->lock(); + return {}; + } std::string ToString() const { return StringPrintf("[ AsyncJob started: %s ]", started); } - void DoWork() { work(); } + void DoWork() { + T r = work(); + std::unique_lock l(mutex); + CHECK(!result.has_value()); + result = r; + cv.notify_all(); + } private: bool started = false; - std::packaged_task work; + std::function work; + pstd::optional result; + mutable std::mutex mutex; + std::condition_variable cv; }; void ForEachThread(std::function func); @@ -441,7 +440,7 @@ inline auto RunAsync(F func, Args &&...args) { else lock = ParallelJob::threadPool->AddToJobList(job); - return job->GetFuture(); + return job; } } // namespace pbrt diff --git a/src/pbrt/util/parallel_test.cpp b/src/pbrt/util/parallel_test.cpp index 38dfce1..27f3e88 100644 --- a/src/pbrt/util/parallel_test.cpp +++ b/src/pbrt/util/parallel_test.cpp @@ -64,16 +64,16 @@ TEST(ThreadLocal, Consistency) { busywork(index); }); - std::vector> futures; + std::vector *> jobs; for (int i = 0; i < 100; ++i) { - futures.push_back(RunAsync([&]() { + jobs.push_back(RunAsync([&]() { EXPECT_EQ(std::this_thread::get_id(), tids.Get()); busywork(i); return i; })); } for (int i = 0; i < 100; ++i) - (void)futures[i].Get(); + (void)jobs[i]->Get(); ParallelFor(0, 1000, [&](int64_t index) { EXPECT_EQ(std::this_thread::get_id(), tids.Get()); -- GitLab