提交 fe3c61a7 编写于 作者: M Matt Pharr

Rewrite asynchronous job/futures code to not be based on std::future.

The Future class is now gone, and its functionality ("is the result
ready?", "give me the result value") is now folded into AsyncJob.  This
allows retrieving the result from multiple threads, with synchronization
handled in AsyncJob.  (This is more flexible than std::future allows, since
we don't need either move semantics or the ability to pass along exceptions
when the future is harvested.)

This fixes a number of race conditions that helgrind was flagging during
startup with complex scenes.  (Our prior use of std::future wasn't correct
in that we had unprotected access attempts from multiple threads.)
上级 1ab70a56
......@@ -1337,7 +1337,7 @@ OptiXAggregate::OptiXAggregate(
BVH bvh;
int sbtOffset;
};
Future<GAS> triFuture = RunAsync([&]() {
AsyncJob<GAS> *triJob = RunAsync([&]() {
BVH triangleBVH = buildBVHForTriangles(
scene.shapes, plyMeshes, optixContext, hitPGTriangle, anyhitPGShadowTriangle,
hitPGRandomHitTriangle, textures.floatTextures, namedMaterials, materials, media,
......@@ -1346,7 +1346,7 @@ OptiXAggregate::OptiXAggregate(
return GAS{triangleBVH, sbtOffset};
});
Future<GAS> blpFuture = RunAsync([&]() {
AsyncJob<GAS> *blpJob = RunAsync([&]() {
BVH blpBVH = buildBVHForBLPs(
scene.shapes, optixContext, hitPGBilinearPatch, anyhitPGShadowBilinearPatch,
hitPGRandomHitBilinearPatch, textures.floatTextures, namedMaterials, materials,
......@@ -1355,7 +1355,7 @@ OptiXAggregate::OptiXAggregate(
return GAS{blpBVH, bilinearSBTOffset};
});
Future<GAS> quadricFuture = RunAsync([&]() {
AsyncJob<GAS> *quadricJob = RunAsync([&]() {
BVH quadricBVH = buildBVHForQuadrics(
scene.shapes, optixContext, hitPGQuadric, anyhitPGShadowQuadric,
hitPGRandomHitQuadric, textures.floatTextures, namedMaterials, materials, media,
......@@ -1480,8 +1480,8 @@ OptiXAggregate::OptiXAggregate(
gasInstance.flags =
OPTIX_INSTANCE_FLAG_NONE; // TODO: OPTIX_INSTANCE_FLAG_DISABLE_ANYHIT
LOG_VERBOSE("Starting to consume top-level GAS futures");
for (Future<GAS> *fut : {&triFuture, &blpFuture, &quadricFuture}) {
GAS gas = fut->Get();
for (AsyncJob<GAS> *job : {triJob, blpJob, quadricJob}) {
GAS gas = job->Get();
if (gas.bvh.traversableHandle) {
gasInstance.traversableHandle = gas.bvh.traversableHandle;
gasInstance.sbtOffset = gas.sbtOffset;
......
......@@ -603,7 +603,7 @@ void parse(ParserTarget *target, std::unique_ptr<Tokenizer> t) {
static std::atomic<bool> warnedTransformBeginEndDeprecated{false};
std::vector<std::pair<Future<void>, BasicSceneBuilder *>> imports;
std::vector<std::pair<AsyncJob<int> *, BasicSceneBuilder *>> imports;
LOG_VERBOSE("Started parsing %s",
std::string(t->loc.filename.begin(), t->loc.filename.end()));
......@@ -796,9 +796,10 @@ void parse(ParserTarget *target, std::unique_ptr<Tokenizer> t) {
parse(importBuilder, std::move(timport));
LOG_VERBOSE("Elapsed time to parse \"%s\": %.2fs", filename,
timer.ElapsedSeconds());
return 0;
};
Future<void> jobFinished = RunAsync(job, filename);
imports.push_back(std::make_pair(std::move(jobFinished), importBuilder));
AsyncJob<int> *jobFinished = RunAsync(job, filename);
imports.push_back(std::make_pair(jobFinished, importBuilder));
}
}
} else if (tok->token == "Identity")
......@@ -980,7 +981,7 @@ void parse(ParserTarget *target, std::unique_ptr<Tokenizer> t) {
}
for (auto &import : imports) {
import.first.Wait();
import.first->Wait();
BasicSceneBuilder *builder = dynamic_cast<BasicSceneBuilder *>(target);
CHECK(builder);
......
......@@ -6,6 +6,7 @@
#include <pbrt/cpu/aggregates.h>
#include <pbrt/cpu/integrators.h>
#include <pbrt/cpu/primitive.h>
#ifdef PBRT_BUILD_GPU_RENDERER
#include <pbrt/gpu/memory.h>
#endif // PBRT_BUILD_GPU_RENDERER
......@@ -735,7 +736,7 @@ void BasicScene::SetOptions(SceneEntity filter, SceneEntity film,
LOG_VERBOSE("Finished creating filter and film");
// Enqueue asynchronous job to create sampler
samplerFuture = RunAsync([sampler, this]() {
samplerJob = RunAsync([sampler, this]() {
LOG_VERBOSE("Starting to create sampler");
Allocator alloc = threadAllocators.Get();
Point2i res = this->film.FullResolution();
......@@ -744,7 +745,7 @@ void BasicScene::SetOptions(SceneEntity filter, SceneEntity film,
});
// Enqueue asynchronous job to create camera
cameraFuture = RunAsync([camera, this]() {
cameraJob = RunAsync([camera, this]() {
LOG_VERBOSE("Starting to create camera");
Allocator alloc = threadAllocators.Get();
Medium cameraMedium = GetMedium(camera.medium, &camera.loc);
......@@ -773,7 +774,7 @@ void BasicScene::AddMedium(MediumSceneEntity medium) {
};
std::lock_guard<std::mutex> lock(mediaMutex);
mediumFutures[medium.name] = RunAsync(create);
mediumJobs[medium.name] = RunAsync(create);
}
Medium BasicScene::GetMedium(const std::string &name, const FileLoc *loc) {
......@@ -787,14 +788,14 @@ Medium BasicScene::GetMedium(const std::string &name, const FileLoc *loc) {
mediaMutex.unlock();
return m;
} else {
auto fiter = mediumFutures.find(name);
if (fiter == mediumFutures.end())
auto fiter = mediumJobs.find(name);
if (fiter == mediumJobs.end())
ErrorExit(loc, "%s: medium is not defined.", name);
pstd::optional<Medium> m = fiter->second.TryGet(&mediaMutex);
pstd::optional<Medium> m = fiter->second->TryGet(&mediaMutex);
if (m) {
mediaMap[name] = *m;
mediumFutures.erase(fiter);
mediumJobs.erase(fiter);
mediaMutex.unlock();
return *m;
}
......@@ -804,18 +805,18 @@ Medium BasicScene::GetMedium(const std::string &name, const FileLoc *loc) {
std::map<std::string, Medium> BasicScene::CreateMedia() {
mediaMutex.lock();
if (!mediumFutures.empty()) {
if (!mediumJobs.empty()) {
// Consume futures for asynchronously-created _Medium_ objects
LOG_VERBOSE("Consume media futures start");
for (auto &m : mediumFutures) {
for (auto &m : mediumJobs) {
while (mediaMap.find(m.first) == mediaMap.end()) {
pstd::optional<Medium> med = m.second.TryGet(&mediaMutex);
pstd::optional<Medium> med = m.second->TryGet(&mediaMutex);
if (med)
mediaMap[m.first] = *med;
}
}
LOG_VERBOSE("Consume media futures finished");
mediumFutures.clear();
mediumJobs.clear();
}
mediaMutex.unlock();
return mediaMap;
......@@ -859,7 +860,7 @@ void BasicScene::startLoadingNormalMaps(const ParameterDictionary &parameters) {
return;
// Overload materialMutex, which we already hold, for the futures...
if (normalMapFutures.find(filename) != normalMapFutures.end())
if (normalMapJobs.find(filename) != normalMapJobs.end())
// It's already in flight.
return;
......@@ -876,7 +877,7 @@ void BasicScene::startLoadingNormalMaps(const ParameterDictionary &parameters) {
return normalMap;
};
normalMapFutures[filename] = RunAsync(create, filename);
normalMapJobs[filename] = RunAsync(create, filename);
}
void BasicScene::AddFloatTexture(std::string name, TextureSceneEntity texture) {
......@@ -921,7 +922,7 @@ void BasicScene::AddFloatTexture(std::string name, TextureSceneEntity texture) {
return FloatTexture::Create(texture.name, renderFromTexture, texDict,
&texture.loc, alloc, Options->useGPU);
};
floatTextureFutures[name] = RunAsync(create, texture);
floatTextureJobs[name] = RunAsync(create, texture);
}
void BasicScene::AddSpectrumTexture(std::string name, TextureSceneEntity texture) {
......@@ -967,7 +968,7 @@ void BasicScene::AddSpectrumTexture(std::string name, TextureSceneEntity texture
SpectrumType::Albedo, &texture.loc, alloc,
Options->useGPU);
};
spectrumTextureFutures[name] = RunAsync(create, texture);
spectrumTextureJobs[name] = RunAsync(create, texture);
}
void BasicScene::AddLight(LightSceneEntity light) {
......@@ -984,7 +985,7 @@ void BasicScene::AddLight(LightSceneEntity light) {
GetCamera().GetCameraTransform(), lightMedium, &light.loc,
threadAllocators.Get());
};
lightFutures.push_back(RunAsync(create));
lightJobs.push_back(RunAsync(create));
}
int BasicScene::AddAreaLight(SceneEntity light) {
......@@ -1082,11 +1083,11 @@ void BasicScene::CreateMaterials(const NamedTextures &textures,
std::vector<pbrt::Material> *materialsOut) {
LOG_VERBOSE("Starting to consume normal map futures");
std::lock_guard<std::mutex> lock(materialMutex);
for (auto &fut : normalMapFutures) {
CHECK(normalMaps.find(fut.first) == normalMaps.end());
normalMaps[fut.first] = fut.second.Get();
for (auto &job : normalMapJobs) {
CHECK(normalMaps.find(job.first) == normalMaps.end());
normalMaps[job.first] = job.second->Get();
}
normalMapFutures.clear();
normalMapJobs.clear();
LOG_VERBOSE("Finished consuming normal map futures");
// Named materials
......@@ -1143,12 +1144,12 @@ NamedTextures BasicScene::CreateTextures() {
// active when CreateTextures() is called, but valgrind doesn't know
// that...
textureMutex.lock();
for (auto &tex : floatTextureFutures)
textures.floatTextures[tex.first] = tex.second.Get();
floatTextureFutures.clear();
for (auto &tex : spectrumTextureFutures)
textures.albedoSpectrumTextures[tex.first] = tex.second.Get();
spectrumTextureFutures.clear();
for (auto &tex : floatTextureJobs)
textures.floatTextures[tex.first] = tex.second->Get();
floatTextureJobs.clear();
for (auto &tex : spectrumTextureJobs)
textures.albedoSpectrumTextures[tex.first] = tex.second->Get();
spectrumTextureJobs.clear();
textureMutex.unlock();
LOG_VERBOSE("Finished consuming texture futures");
......@@ -1303,8 +1304,8 @@ std::vector<Light> BasicScene::CreateLights(
LOG_VERBOSE("Starting to consume non-area light futures");
std::lock_guard<std::mutex> lock(lightMutex);
for (auto &fut : lightFutures)
lights.push_back(fut.Get());
for (auto &job : lightJobs)
lights.push_back(job->Get());
LOG_VERBOSE("Finished consuming non-area light futures");
return lights;
......
......@@ -8,7 +8,6 @@
#include <pbrt/pbrt.h>
#include <pbrt/cameras.h>
#include <pbrt/cpu/primitive.h>
#include <pbrt/paramdict.h>
#include <pbrt/parser.h>
#include <pbrt/util/containers.h>
......@@ -30,6 +29,7 @@
namespace pbrt {
class Integrator;
class Primitive;
// SceneEntity Definition
struct SceneEntity {
......@@ -278,24 +278,24 @@ class BasicScene {
void Done();
Camera GetCamera() {
cameraFutureMutex.lock();
cameraJobMutex.lock();
while (!camera) {
pstd::optional<Camera> c = cameraFuture.TryGet(&cameraFutureMutex);
pstd::optional<Camera> c = cameraJob->TryGet(&cameraJobMutex);
if (c)
camera = *c;
}
cameraFutureMutex.unlock();
cameraJobMutex.unlock();
return camera;
}
Sampler GetSampler() {
samplerFutureMutex.lock();
samplerJobMutex.lock();
while (!sampler) {
pstd::optional<Sampler> s = samplerFuture.TryGet(&samplerFutureMutex);
pstd::optional<Sampler> s = samplerJob->TryGet(&samplerJobMutex);
if (s)
sampler = *s;
}
samplerFutureMutex.unlock();
samplerJobMutex.unlock();
return sampler;
}
......@@ -337,26 +337,26 @@ class BasicScene {
void startLoadingNormalMaps(const ParameterDictionary &parameters);
// BasicScene Private Members
Future<Sampler> samplerFuture;
AsyncJob<Sampler> *samplerJob = nullptr;
mutable ThreadLocal<Allocator> threadAllocators;
Camera camera;
Film film;
std::mutex cameraFutureMutex;
Future<Camera> cameraFuture;
std::mutex samplerFutureMutex;
std::mutex cameraJobMutex;
AsyncJob<Camera> *cameraJob = nullptr;
std::mutex samplerJobMutex;
Sampler sampler;
std::mutex mediaMutex;
std::map<std::string, Future<Medium>> mediumFutures;
std::map<std::string, AsyncJob<Medium> *> mediumJobs;
std::map<std::string, Medium> mediaMap;
std::mutex materialMutex;
std::map<std::string, Future<Image *>> normalMapFutures;
std::map<std::string, AsyncJob<Image *> *> normalMapJobs;
std::map<std::string, Image *> normalMaps;
std::vector<std::pair<std::string, SceneEntity>> namedMaterials;
std::vector<SceneEntity> materials;
std::mutex lightMutex;
std::vector<Future<Light>> lightFutures;
std::vector<AsyncJob<Light> *> lightJobs;
std::mutex areaLightMutex;
std::vector<SceneEntity> areaLights;
......@@ -366,8 +366,8 @@ class BasicScene {
std::vector<std::pair<std::string, TextureSceneEntity>> serialSpectrumTextures;
std::vector<std::pair<std::string, TextureSceneEntity>> asyncSpectrumTextures;
std::set<std::string> loadingTextureFilenames;
std::map<std::string, Future<FloatTexture>> floatTextureFutures;
std::map<std::string, Future<SpectrumTexture>> spectrumTextureFutures;
std::map<std::string, AsyncJob<FloatTexture> *> floatTextureJobs;
std::map<std::string, AsyncJob<SpectrumTexture> *> spectrumTextureJobs;
int nMissingTextures = 0;
std::mutex shapeMutex, animatedShapeMutex;
......
......@@ -94,10 +94,8 @@ void ThreadPool::WorkOrWait(std::unique_lock<std::mutex> *lock, bool isEnqueuing
DCHECK(!lock->owns_lock());
lock->lock();
job->activeWorkers--;
if (job->Finished()) {
if (job->Finished())
jobListCondition.notify_all();
job->Cleanup();
}
} else
// Wait for new work to arrive or the job to finish
......@@ -134,10 +132,8 @@ bool ThreadPool::WorkOrReturn() {
DCHECK(!lock.owns_lock());
lock.lock();
job->activeWorkers--;
if (job->Finished()) {
if (job->Finished())
jobListCondition.notify_all();
job->Cleanup();
}
return true;
}
......@@ -195,7 +191,7 @@ std::string ThreadPool::ToString() const {
}
bool DoParallelWork() {
CHECK(ParallelJob::threadPool && ParallelJob::threadPool->size());
CHECK(ParallelJob::threadPool);
// lock should be held when this is called...
return ParallelJob::threadPool->WorkOrReturn();
}
......
......@@ -293,8 +293,6 @@ class ParallelJob {
virtual std::string ToString() const = 0;
virtual void Cleanup() {}
// ParallelJob Public Members
static ThreadPool *threadPool;
......@@ -349,47 +347,6 @@ class ThreadPool {
bool DoParallelWork();
// Future Definition
template <typename T>
class Future {
public:
// Future Public Methods
Future() = default;
Future(std::future<T> &&f) : fut(std::move(f)) {}
Future &operator=(std::future<T> &&f) {
fut = std::move(f);
return *this;
}
T Get() {
Wait();
return fut.get();
}
pstd::optional<T> TryGet(std::mutex *mutex) {
if (IsReady())
return Get();
mutex->unlock();
DoParallelWork();
mutex->lock();
return {};
}
void Wait() {
while (!IsReady() && DoParallelWork())
;
fut.wait();
}
bool IsReady() const {
return fut.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
}
private:
std::future<T> fut;
};
// AsyncJob Definition
template <typename T>
class AsyncJob : public ParallelJob {
......@@ -403,22 +360,64 @@ class AsyncJob : public ParallelJob {
threadPool->RemoveFromJobList(this);
started = true;
lock->unlock();
work();
T r = work();
std::unique_lock<std::mutex> l(mutex);
result = r;
cv.notify_all();
}
bool IsReady() const {
std::lock_guard<std::mutex> lock(mutex);
return result.has_value();
}
void Cleanup() { delete this; }
void Wait() {
while (!IsReady() && DoParallelWork())
;
Future<T> GetFuture() { return work.get_future(); }
std::unique_lock<std::mutex> lock(mutex);
if (!result.has_value())
cv.wait(lock, [this]() { return result.has_value(); });
}
T Get() {
Wait();
std::lock_guard<std::mutex> lock(mutex);
return *result;
}
pstd::optional<T> TryGet(std::mutex *extMutex) {
{
std::lock_guard<std::mutex> lock(mutex);
if (result)
return result;
}
extMutex->unlock();
DoParallelWork();
extMutex->lock();
return {};
}
std::string ToString() const {
return StringPrintf("[ AsyncJob started: %s ]", started);
}
void DoWork() { work(); }
void DoWork() {
T r = work();
std::unique_lock<std::mutex> l(mutex);
CHECK(!result.has_value());
result = r;
cv.notify_all();
}
private:
bool started = false;
std::packaged_task<T(void)> work;
std::function<T(void)> work;
pstd::optional<T> result;
mutable std::mutex mutex;
std::condition_variable cv;
};
void ForEachThread(std::function<void(void)> func);
......@@ -441,7 +440,7 @@ inline auto RunAsync(F func, Args &&...args) {
else
lock = ParallelJob::threadPool->AddToJobList(job);
return job->GetFuture();
return job;
}
} // namespace pbrt
......
......@@ -64,16 +64,16 @@ TEST(ThreadLocal, Consistency) {
busywork(index);
});
std::vector<Future<int>> futures;
std::vector<AsyncJob<int> *> jobs;
for (int i = 0; i < 100; ++i) {
futures.push_back(RunAsync([&]() {
jobs.push_back(RunAsync([&]() {
EXPECT_EQ(std::this_thread::get_id(), tids.Get());
busywork(i);
return i;
}));
}
for (int i = 0; i < 100; ++i)
(void)futures[i].Get();
(void)jobs[i]->Get();
ParallelFor(0, 1000, [&](int64_t index) {
EXPECT_EQ(std::this_thread::get_id(), tids.Get());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册