查看: 1434|回复: 0

如何创建 TensorFlow Server 集群

[复制链接]
  • TA的每日心情
    开心
    2019-11-4 13:48
  • 签到天数: 14 天

    连续签到: 1 天

    [LV.3]偶尔看看II

    发表于 2019-2-14 14:42:12 | 显示全部楼层 |阅读模式
    分享到:
    今天将介绍如何创建 TensorFlow Server 集群,及如何在该集群中发布计算图。在这里,我们假设您熟悉编写低级 TensorFlow 程序的基本概念 basic concepts。

    你好,分布式 TensorFlow !
    想要查看正在运行的简单 TensorFlow 集群,请执行以下操作:

    1. # Start a TensorFlow server as a single-process "cluster".
    2. $ python
    3. >>> import tensorflow as tf
    4. >>> c = tf.constant("Hello, distributed TensorFlow!")
    5. >>> server = tf.train.Server.create_local_server()
    6. >>> sess = tf.Session(server.target)  # Create a session on the server.
    7. >>> sess.run(c)
    8. 'Hello, distributed TensorFlow!'
    复制代码

    tf.train.Server.create_local_server 方法使用进程内 Server 创建单进程集群。

    创建一个集群
    TensorFlow “集群” 是一组参与 TensorFlow 图的分布式执行的 “任务”。每个任务都与 TensorFlow“Server” 相关联,该 server 包含可用于创建会话的 “master 主干”,以及在图中执行操作的 “worker”。群集还可以划分为一个或多个 “作业”,其中每个作业包含一个或多个任务。

    要创建群集,请在群集中的每个任务中启动一个 TensorFlow Server。每个任务通常在不同的计算机上运行,但您可以在同一台计算机上运行多个任务(例如,控制不同的 GPU 设备)。在每项任务中,执行以下操作:
    • 创建一个描述集群中所有任务的 tf.train.ClusterSpec 每项任务都应该相同
    • 创建一个 tf.train.Server,将 tf.train.ClusterSpec 传递给构造函数,并使用作业名称和任务索引标识本地任务

    创建一个 tf.train.ClusterSpec 来描述集群
    集群规范字典将作业名称映射到网络地址列表。将此字典传递给 tf.train.ClusterSpec 构造函数。例如:

    1.jpg

    在每个任务中创建一个 tf.train.Server 实例   
    tf.train.Server 对象包含一组本地设备,一组与 tf.train.ClusterSpec 中其他任务的连接,以及一个可以使用它们执行分布式计算的 tf.Session。每个 server 都是特定命名作业的成员,并且在该作业中具有任务索引。一个 server 可以与群集中的任何其他服务器通信。

    例如,要启动在 localhost:2222 和 localhost:2223 上运行的两个服务器的集群,请在本地计算机上的两个不同进程中运行以下代码段:

    1. # In task 0:
    2. cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
    3. server = tf.train.Server(cluster, job_name="local", task_index=0)
    复制代码
    1. # In task 1:
    2. cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
    3. server = tf.train.Server(cluster, job_name="local", task_index=1)
    复制代码


    注意:手动指定这些群集规范可能会很繁琐,尤其是对于大型群集。我们正在开发以编程方式启动任务的工具,例如:使用像 Kubernetes 这样的集群管理器。如果您希望获得某些特定集群管理器的技术支持,请在 GitHub issue 提出。

    在模型中指定分布式设备  
    要对特定进程执行操作,可以使用相同的 tf.device 函数来指定操作是在 CPU 还是 GPU 上运行。例如:

    1. with tf.device("/job:ps/task:0"):
    2.   weights_1 = tf.Variable(...)
    3.   biases_1 = tf.Variable(...)

    4. with tf.device("/job:ps/task:1"):
    5.   weights_2 = tf.Variable(...)
    6.   biases_2 = tf.Variable(...)

    7. with tf.device("/job:worker/task:7"):
    8.   input, labels = ...
    9.   layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
    10.   logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
    11.   # ...
    12.   train_op = ...

    13. with tf.Session("grpc://worker7.example.com:2222") as sess:
    14.   for _ in range(10000):
    15.     sess.run(train_op)
    复制代码

    在上面的示例中,变量是在 ps 作业中的两个任务上创建的,而模型的计算密集型部分是在 worker 作业中创建的。 TensorFlow 将在作业之间插入适当的数据传输(从 ps 到 worker 用于正向传递,从 worker 到 ps 用于应用渐变)。


    复制训练   
    一个常见的训练配置,被称为 “数据并行”,它涉及在不同的小批量数据上训练相同的模型额 worker 作业中的多个任务,更新在 ps 作业中的一个或多个任务中托管的共享参数。所有任务通常在不同的机器上运行。在 TensorFlow 中有很多方法可以指定这个结构,我们正在构建库,这将简化指定复制模型的工作。可能有效的方法包括:


    • 图形内复制。在这种方法中,客户端构建一个包含一组参数的 tf.Graph (在 tf.Variable 节点固定到 / job:ps);以及模型中计算密集型部分的多个副本,每个副本都固定在 / job:worker 中的不同任务中
    • 图之间的复制。在这种方法中,每个 / job:worker 任务都有一个单独的客户端,通常与 worker 任务在同一个进程中。每个客户端构建一个包含参数的类似图形(固定到 / job:psas,然后使用 tf.train.replica_device_setter 将它们确定性地映射到相同的任务);以及模型的计算密集型部分的单个副本,固定到 / job:worker 中的本地任务。
    • 异步训练。在这种方法中,图的每个副本都有一个独立的训练循环,无需协调即可执行。它与上述两种复制形式兼容。
    • 同步培训。在此方法中,所有副本都为当前参数读取相同的值,并行计算梯度,然后一起应用。它与图形内复制兼容(例如,如在 CIFAR-10 multi-GPU trainer 中使用梯度平均),以及图之间复制(例如,使用 tf.train.SyncReplicasOptimizer)均兼容。


    综合起来: 示例 trainer 计划
    以下代码显示了分布式 trainer 程序的框架,实现了图形间复制和异步训练。它包括参数 server 和 worker 任务的代码。

    1. import argparse
    2. import sys

    3. import tensorflow as tf

    4. FLAGS = None


    5. def main(_):
    6.   ps_hosts = FLAGS.ps_hosts.split(",")
    7.   worker_hosts = FLAGS.worker_hosts.split(",")

    8.   # Create a cluster from the parameter server and worker hosts.
    9.   cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

    10.   # Create and start a server for the local task.
    11.   server = tf.train.Server(cluster,
    12.                            job_name=FLAGS.job_name,
    13.                            task_index=FLAGS.task_index)

    14.   if FLAGS.job_name == "ps":
    15.     server.join()
    16.   elif FLAGS.job_name == "worker":

    17.     # Assigns ops to the local worker by default.
    18.     with tf.device(tf.train.replica_device_setter(
    19.         worker_device="/job:worker/task:%d" % FLAGS.task_index,
    20.         cluster=cluster)):

    21.       # Build model...
    22.       loss = ...
    23.       global_step = tf.contrib.framework.get_or_create_global_step()

    24.       train_op = tf.train.AdagradOptimizer(0.01).minimize(
    25.           loss, global_step=global_step)

    26.     # The StopAtStepHook handles stopping after running given steps.
    27.     hooks=[tf.train.StopAtStepHook(last_step=1000000)]

    28.     # The MonitoredTrainingSession takes care of session initialization,
    29.     # restoring from a checkpoint, saving to a checkpoint, and closing when done
    30.     # or an error occurs.
    31.     with tf.train.MonitoredTrainingSession(master=server.target,
    32.                                            is_chief=(FLAGS.task_index == 0),
    33.                                            checkpoint_dir="/tmp/train_logs",
    34.                                            hooks=hooks) as mon_sess:
    35.       while not mon_sess.should_stop():
    36.         # Run a training step asynchronously.
    37.         # See <a href="./../api_docs/python/tf/train/SyncReplicasOptimizer"><code>tf.train.SyncReplicasOptimizer</code></a> for additional details on how to
    38.         # perform *synchronous* training.
    39.         # mon_sess.run handles AbortedError in case of preempted PS.
    40.         mon_sess.run(train_op)


    41. if __name__ == "__main__":
    42.   parser = argparse.ArgumentParser()
    43.   parser.register("type", "bool", lambda v: v.lower() == "true")
    44.   # Flags for defining the tf.train.ClusterSpec
    45.   parser.add_argument(
    46.       "--ps_hosts",
    47.       type=str,
    48.       default="",
    49.       help="Comma-separated list of hostname:port pairs"
    50.   )
    51.   parser.add_argument(
    52.       "--worker_hosts",
    53.       type=str,
    54.       default="",
    55.       help="Comma-separated list of hostname:port pairs"
    56.   )
    57.   parser.add_argument(
    58.       "--job_name",
    59.       type=str,
    60.       default="",
    61.       help="One of 'ps', 'worker'"
    62.   )
    63.   # Flags for defining the tf.train.Server
    64.   parser.add_argument(
    65.       "--task_index",
    66.       type=int,
    67.       default=0,
    68.       help="Index of task within the job"
    69.   )
    70.   FLAGS, unparsed = parser.parse_known_args()
    71.   tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
    复制代码
    要使用两个参数 server 和两个 worker 程序启动 trainer,请使用以下命令行(假设该脚本名为 trainer.py):
    1. # On ps0.example.com:
    2. $ python trainer.py \
    3.      --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
    4.      --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
    5.      --job_name=ps --task_index=0
    6. # On ps1.example.com:
    7. $ python trainer.py \
    8.      --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
    9.      --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
    10.      --job_name=ps --task_index=1
    11. # On worker0.example.com:
    12. $ python trainer.py \
    13.      --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
    14.      --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
    15.      --job_name=worker --task_index=0
    16. # On worker1.example.com:
    17. $ python trainer.py \
    18.      --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
    19.      --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
    20.      --job_name=worker --task_index=1
    复制代码


    词汇表


    客户端
    客户端通常是一个程序,用于建立一个 TensorFlow 图,并构造 tensorflow :: Session 以与集群交互。客户端主要使用 Python 或 C ++ 编写。一个单客户端进程可以直接与多个 TensorFlow Server 交互(请参阅上面的 “复制训练”),一个单 Server 也可以为多个客户端提供服务。

    集群
    一个 TensorFlow 集群包括一个或多个 “作业”,每个 “作业” 被划分为一个或多个 “任务” 列表。一个集群通常效力于一个特定的高级别的目标,例如多机并行训练一个神经网络,并行使用多台机器。集群由 tf.train.ClusterSpec 目标函数定义。

    作业
    一个作业由一系列 “任务” 组成,通常用于实现共同目的。 例如,名为 ps 的作业(用于 “参数服务器”)通常承载存储和更新变量参数的节点;而名为 worker 的作业通常托管执行计算密集型任务的无状态节点。作业中的任务通常在不同的计算机上运行。作业角色集是灵活的:例如,一个 worker 可以维持某种状态。

    核心服务
    RPC 服务为分布式设备之间提供远程访问,并充当会话目标的角色。核心设备实现 tensorflow :: Session 接口,负责协调跨一个或多个 “worker services” 之间的工作。所有 TensorFlow Servers 都实现核心服务。

    任务
    任务对应于特定的 TensorFlow Server,通常对应于单个进程。任务属于特定的 “作业”,并通过该作业的任务列表中的索引标识来区分。

    TensorFlow server 运行 tf.train.Server 实例的进程,该实例是集群的成员,并导出 “master service 主服务” 和 “worker service 工作服务”。

    工作服务
    RPC 服务,使用其本地设备执行 TensorFlow 图的部分。 Worker service 工作服务执行 worker_service.proto。所有的 TensorFlow Server 都执行 worker service 工作服务。


    转载自 TensorFlow 公众号
    回复

    使用道具 举报

    您需要登录后才可以回帖 注册/登录

    本版积分规则

    关闭

    站长推荐上一条 /1 下一条

    手机版|小黑屋|与非网

    GMT+8, 2024-4-25 06:34 , Processed in 0.130640 second(s), 16 queries , MemCache On.

    ICP经营许可证 苏B2-20140176  苏ICP备14012660号-2   苏州灵动帧格网络科技有限公司 版权所有.

    苏公网安备 32059002001037号

    Powered by Discuz! X3.4

    Copyright © 2001-2024, Tencent Cloud.