提交 76d3808e 编写于 作者: D Derek Murray 提交者: TensorFlower Gardener

Updated the distributed runtime how-to to use the new API in `tf.train.*`.

Change: 119589456
上级 a77499c8
......@@ -15,86 +15,84 @@ can test your installation by starting a local server as follows:
$ python
>>> import tensorflow as tf
>>> c = tf.constant("Hello, distributed TensorFlow!")
>>> server = tf.GrpcServer.new_local_server()
>>> server = tf.train.Server.create_local_server()
>>> sess = tf.Session(server.target)
>>> sess.run(c)
'Hello, distributed TensorFlow!'
The `tf.GrpcServer.new_local_server()` method creates a single-process cluster.
method creates a single-process cluster.
## Create a cluster
To create a cluster with multiple processes or machines:
Most clusters have multiple tasks, divided into one or more jobs. To create a
cluster with multiple processes or machines:
1. **Create a cluster specification dictionary**. All servers in the cluster share the
1. **For each process or machine** in the cluster, run a TensorFlow program to
do the following:
1. **For each process or machine** in the cluster, run a TensorFlow program to:
1. **Create a `tf.train.ClusterSpec`**, which describes all of the tasks
in the cluster. This should be the same in each process.
1. **Create a `ClusterSpec`**, passing the dictionary to the constructor.
1. **Create a `tf.train.Server`**, passing the `tf.train.ClusterSpec` to
the constructor, and identifying the local process with a job name
and task index.
1. **Create a `tf.ServerDef`** that identifies itself with one of the
tasks in the `ClusterSpec`.
1. **Create a `tf.GrpcServer`**, passing the `tf.ServerDef` to the
### Create a `tf.train.ClusterSpec` to describe the cluster
### Create the cluster specification dictionary and `ClusterSpec` instances.
The cluster specification dictionary maps job names to lists
of network adresses. Pass this dictionary to the `tf.ClusterSpec` constructor.
For example:
The cluster specification dictionary maps job names to lists of network
adresses. Pass this dictionary to the `tf.train.ClusterSpec` constructor. For
<tr><th><code>tf.ClusterSpec</code> construction</th><th>Available tasks</th>
<tr><th><code>tf.train.ClusterSpec</code> construction</th><th>Available tasks</th>
tf.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
"trainer": [
"worker": [
"params": [
"ps": [
### Create `ServerDef` and `GrpcServer` instances
### Create a `tf.train.Server` instance in each process
A `ServerDef` stores a job name and task index that uniquely identify one of
the tasks defined in the `tf.ClusterSpec`. The `GrpcServer` constructor uses
this information to start a server.
A [`tf.train.Server`](../../api_docs/python/train.md#Server) object contains a
set of local devices, and a
[`tf.Session`](../../api_docs/python/client.md#Session) target that can
participate in a distributed computation. Each server belongs to a particular
cluster (specified by a `tf.train.ClusterSpec`), and corresponds to a particular
task in a named job. The server can communicate with any other server in the
same cluster.
For example, to define and instantiate servers running on `localhost:2222` and
`localhost:2223`, run the following snippets in different processes:
# In task 0:
server_def = tf.ServerDef(
"local": ["localhost:2222", "localhost:2223"]}).as_cluster_def(),
job_name="local", task_index=0)
server = tf.GrpcServer(server_def)
cluster = tf.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.GrpcServer(cluster, job_name="local", task_index=0)
# In task 1:
server_def = tf.ServerDef(
"local": ["localhost:2222", "localhost:2223"]}).as_cluster_def(),
job_name="local", task_index=1)
server = tf.GrpcServer(server_def)
cluster = tf.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.GrpcServer(cluster, job_name="local", task_index=1)
**Note:** Manually specifying these cluster specifications can be tedious,
......@@ -126,7 +124,7 @@ with tf.device("/job:worker/task:7"):
# ...
train_op = ...
with tf.Session("grpc://worker7:2222") as sess:
with tf.Session("grpc://worker7.example.com:2222") as sess:
for _ in range(10000):
......@@ -174,6 +172,111 @@ replicated model. Possible approaches include:
`tf.Session` objects can all be created in a single Python client, or you can
use multiple Python clients to better distribute the trainer load.
### Putting it all together: example trainer program
The following code shows the skeleton of a distributed trainer program. It
includes the code for the parameter server and worker processes.
import tensorflow as tf
# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
"Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
"Comma-separated list of hostname:port pairs")
# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts(",")
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
server = tf.train.Server(cluster,
if FLAGS.job_name == "ps":
elif FLAGS.job_name == "worker":
# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
# Build model...
loss = ...
global_step = tf.Variable(0)
train_op = tf.train.AdagradOptimizer(0.01).minimize(
loss, global_step=global_step)
saver = tf.train.Saver()
summary_op = tf.merge_all_summaries()
init_op = tf.initialize_all_variables()
# Create a "supervisor", which oversees the training process.
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
# The supervisor takes care of session initialization and restoring from
# a checkpoint.
sess = sv.prepare_or_wait_for_session(server.target)
# Start queue runners for the input pipelines (if any).
# Loop until the supervisor shuts down (or 1000000 steps have completed).
step = 0
while not sv.should_stop() and step < 1000000:
# Run a training step asynchronously.
# See `tf.train.SyncReplicasOptimizer` for additional details on how to
# perform *synchronous* training.
_, step = sess.run([train_op, global_step])
if __name__ == "__main__":
To start the trainer with two parameter servers and two workers, use the
following command line (assuming the script is called `trainer.py`):
# On ps0.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=worker --task_index=1
## Glossary
......@@ -215,7 +318,7 @@ replicated model. Possible approaches include:
<dt>TensorFlow server</dt>
A process running a <code>tf.GrpcServer</code> instance, which is a
A process running a <code>tf.train.Server</code> instance, which is a
member of a cluster, and exports a "master service" and "worker service".
<dt>Worker service</dt>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册