提交 5f4633d0 编写于 作者: B Brennan Saeta 提交者: TensorFlower Gardener

Stop using the worker_cache on MasterEnv

In order to move to a more flexible ClusterSpec and distributed runtime,
it is important to stop referencing the singleton worker_cache on the
MasterEnv. This change refactors the internal distributed runtime code
to avoid referencing MasterEnv->worker_cache and instead pass it
explicitly, and independently of the process-wide singleton MasterEnv.
Change: 150331649
上级 cf15d9eb
......@@ -113,8 +113,8 @@ class DeviceFinder {
public:
static Status GetRemoteDevices(
const protobuf::RepeatedPtrField<string>& device_filters, MasterEnv* env,
std::vector<Device*>* out_remote) {
DeviceFinder finder(device_filters, env);
WorkerCacheInterface* worker_cache, std::vector<Device*>* out_remote) {
DeviceFinder finder(device_filters, env, worker_cache);
finder.Start();
TF_RETURN_IF_ERROR(finder.Wait());
finder.GetRemoteDevices(env->local_devices, out_remote);
......@@ -123,15 +123,16 @@ class DeviceFinder {
static void GetRemoteWorkers(
const protobuf::RepeatedPtrField<string>& device_filters, MasterEnv* env,
std::vector<string>* workers) {
DeviceFinder finder(device_filters, env);
WorkerCacheInterface* worker_cache, std::vector<string>* workers) {
DeviceFinder finder(device_filters, env, worker_cache);
*workers = finder.targets_;
}
private:
explicit DeviceFinder(
const protobuf::RepeatedPtrField<string>& device_filters, MasterEnv* env)
: env_(env) {
const protobuf::RepeatedPtrField<string>& device_filters, MasterEnv* env,
WorkerCacheInterface* worker_cache)
: env_(env), worker_cache_(worker_cache) {
auto process_filter = [this](const string& filter) {
DeviceNameUtils::ParsedName parsed;
if (DeviceNameUtils::ParseFullName(filter, &parsed)) {
......@@ -146,7 +147,7 @@ class DeviceFinder {
// Enumerates all known workers' target. A target name is a
// prefix of a device name. E.g., /job:mnist/replica:0/task:10.
std::vector<string> workers;
env_->worker_cache->ListWorkers(&workers);
worker_cache->ListWorkers(&workers);
if (filters_.empty()) {
std::swap(workers, targets_);
} else {
......@@ -177,7 +178,7 @@ class DeviceFinder {
for (size_t i = 0; i < targets_.size(); ++i) {
// TODO(mrry): Propagate a timeout here, since `this->WhenFound()` may
// never be called.
NewRemoteDevices(env_->env, env_->worker_cache, targets_[i],
NewRemoteDevices(env_->env, worker_cache_, targets_[i],
std::bind(&ME::WhenFound, this, i, _1, _2));
}
}
......@@ -225,6 +226,7 @@ class DeviceFinder {
typedef DeviceFinder ME;
const MasterEnv* env_;
WorkerCacheInterface* worker_cache_;
std::vector<DeviceNameUtils::ParsedName> filters_;
mutex mu_;
......@@ -289,8 +291,9 @@ void Master::CreateSession(const CreateSessionRequest* req,
// Ping all the workers and build the list of devices that the
// session will use.
std::vector<Device*> remote_devices;
status = DeviceFinder::GetRemoteDevices(req->config().device_filters(),
env_, &remote_devices);
status =
DeviceFinder::GetRemoteDevices(req->config().device_filters(), env_,
env_->worker_cache, &remote_devices);
if (!status.ok()) {
done(status);
return;
......@@ -418,7 +421,8 @@ void Master::ListDevices(const ListDevicesRequest* req,
ListDevicesResponse* resp, MyClosure done) {
SchedClosure([this, req, resp, done]() {
std::vector<Device*> remote_devices;
Status s = DeviceFinder::GetRemoteDevices({}, env_, &remote_devices);
Status s = DeviceFinder::GetRemoteDevices({}, env_, env_->worker_cache,
&remote_devices);
if (s.ok()) {
for (Device* dev : env_->local_devices) {
*(resp->add_local_device()) = dev->attributes();
......@@ -434,7 +438,8 @@ void Master::ListDevices(const ListDevicesRequest* req,
void Master::CleanupWorkers(const ResetRequest& reset) {
std::vector<string> worker_names;
DeviceFinder::GetRemoteWorkers(reset.device_filters(), env_, &worker_names);
DeviceFinder::GetRemoteWorkers(reset.device_filters(), env_,
env_->worker_cache, &worker_names);
if (!worker_names.empty()) {
const int num_workers = worker_names.size();
std::vector<Notification> n(num_workers);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册