摘要:本文基于官方教程,實(shí)踐了分布式搭建的過程。一般將任務(wù)分為兩類一類叫參數(shù)服務(wù)器,,簡(jiǎn)稱為,用于存儲(chǔ)一類就是普通任務(wù),稱為,用于執(zhí)行具體的計(jì)算。參數(shù)服務(wù)器是一套分布式存儲(chǔ),用于保存參數(shù),并提供參數(shù)更新的操作。
簡(jiǎn)介
TensorFlow支持使用多臺(tái)機(jī)器的設(shè)備進(jìn)行計(jì)算。本文基于官方教程,實(shí)踐了分布式TensorFlow搭建的過程。
TensorFlow入門教程
基本概念 TensorFlow集群A TensorFlow "cluster" is a set of "tasks" that participate in the distributed execution of a TensorFlow graph. Each task is associated with a TensorFlow "server", which contains a "master" that can be used to create sessions, and a "worker" that executes operations in the graph.
從上面的定義可以看出,所謂的TensorFlow集群就是一組任務(wù),每個(gè)任務(wù)就是一個(gè)服務(wù)。服務(wù)由兩個(gè)部分組成,第一部分是master,用于創(chuàng)建session,第二部分是worker,用于執(zhí)行具體的計(jì)算。
TensorFlow一般將任務(wù)分為兩類job:一類叫參數(shù)服務(wù)器,parameter server,簡(jiǎn)稱為ps,用于存儲(chǔ)tf.Variable;一類就是普通任務(wù),稱為worker,用于執(zhí)行具體的計(jì)算。
首先來理解一下參數(shù)服務(wù)器的概念。一般而言,機(jī)器學(xué)習(xí)的參數(shù)訓(xùn)練過程可以劃分為兩個(gè)類別:第一個(gè)是根據(jù)參數(shù)算算梯度,第二個(gè)是根據(jù)梯度更新參數(shù)。對(duì)于小規(guī)模訓(xùn)練,數(shù)據(jù)量不大,參數(shù)數(shù)量不多,一個(gè)CPU就足夠了,兩類任務(wù)都交給一個(gè)CPU來做。對(duì)于普通的中等規(guī)模的訓(xùn)練,數(shù)據(jù)量比較大,參數(shù)數(shù)量不多,計(jì)算梯度的任務(wù)負(fù)荷較重,參數(shù)更新的任務(wù)負(fù)荷較輕,所以將第一類任務(wù)交給若干個(gè)CPU或GPU去做,第二類任務(wù)交給一個(gè)CPU即可。對(duì)于超大規(guī)模的訓(xùn)練,數(shù)據(jù)量大、參數(shù)多,不僅計(jì)算梯度的任務(wù)要部署到多個(gè)CPU或GPU上,而且更新參數(shù)的任務(wù)也要部署到多個(gè)CPU。如果計(jì)算量足夠大,一臺(tái)機(jī)器能搭載的CPU和GPU數(shù)量有限,就需要多臺(tái)機(jī)器來進(jìn)行計(jì)算能力的擴(kuò)展了。參數(shù)服務(wù)器是一套分布式存儲(chǔ),用于保存參數(shù),并提供參數(shù)更新的操作。
我們來看一下怎么創(chuàng)建一個(gè)TensorFlow集群。每個(gè)任務(wù)用一個(gè)ip:port表示。TensorFlow用tf.train.ClusterSpec表示一個(gè)集群信息,舉例如下:
import tensorflow as tf # Configuration of cluster ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
上面的語句提供了一個(gè)TensorFlow集群信息,集群有兩類任務(wù),稱為job,一個(gè)job是ps,一個(gè)job是worker;ps由2個(gè)任務(wù)組成,worker由3個(gè)任務(wù)組成。
定義完集群信息后,使用tf.train.Server創(chuàng)建每個(gè)任務(wù):
tf.app.flags.DEFINE_string("job_name", "worker", "One of "ps", "worker"") tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") FLAGS = tf.app.flags.FLAGS def main(_): server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) server.join() if __name__ == "__main__": tf.app.run()
對(duì)于本例而言,我們需要在ip:port對(duì)應(yīng)的機(jī)器上運(yùn)行每個(gè)任務(wù),共需執(zhí)行五次代碼,生成五個(gè)任務(wù)。
python worker.py --job_name=ps --task_index=0 python worker.py --job_name=ps --task_index=1 python worker.py --job_name=worker --task_index=0 python worker.py --job_name=worker --task_index=1 python worker.py --job_name=worker --task_index=2
我們找到集群的某一臺(tái)機(jī)器,執(zhí)行下面的代碼:
# -*- coding=utf-8 -*- import tensorflow as tf import numpy as np train_X = np.random.rand(100).astype(np.float32) train_Y = train_X * 0.1 + 0.3 # 選擇變量存儲(chǔ)位置和op執(zhí)行位置,這里全部放在worker的第一個(gè)task上 with tf.device("/job:worker/task:0"): X = tf.placeholder(tf.float32) Y = tf.placeholder(tf.float32) w = tf.Variable(0.0, name="weight") b = tf.Variable(0.0, name="reminder") y = w * X + b loss = tf.reduce_mean(tf.square(y - Y)) init_op = tf.global_variables_initializer() train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss) # 選擇創(chuàng)建session使用的master with tf.Session("grpc://xx.xxx.xx.xxxx:oooo") as sess: sess.run(init_op) for i in range(500): sess.run(train_op, feed_dict={X: train_Y, Y: train_Y}) if i % 50 == 0: print i, sess.run(w), sess.run(b) print sess.run(w) print sess.run(b)
執(zhí)行結(jié)果如下:
0 0.00245265 0.00697793 50 0.0752466 0.213145 100 0.0991397 0.279267 150 0.107308 0.30036 200 0.110421 0.306972 250 0.111907 0.308929 300 0.112869 0.309389 350 0.113663 0.309368 400 0.114402 0.309192 450 0.115123 0.308967 0.115824 0.30873
其實(shí)ps和worker本質(zhì)上是一個(gè)東西,就是名字不同,我們將上例中的with tf.device("/job:worker/task:0"):改為with tf.device("/job:psr/task:0"):,一樣能夠執(zhí)行。之所以在創(chuàng)建集群時(shí)要分為兩個(gè)類別的任務(wù),是因?yàn)門ensorFlow提供了一些工具函數(shù),會(huì)根據(jù)名字不同賦予task不同的任務(wù),ps的用于存儲(chǔ)變量,worker的用于計(jì)算。
同步與異步更新同步更新:將數(shù)據(jù)拆分成多份,每份基于參數(shù)計(jì)算出各自部分的梯度;當(dāng)每一份的部分梯度計(jì)算完成后,收集到一起算出總梯度,再用總梯度去更新參數(shù)。
異步更新:同步更新模式下,每次都要等各個(gè)部分的梯度計(jì)算完后才能進(jìn)行參數(shù)更新操作,處理速度取決于計(jì)算梯度最慢的那個(gè)部分,其他部分存在大量的等待時(shí)間浪費(fèi);異步更新模式下,所有的部分只需要算自己的梯度,根據(jù)自己的梯度更新參數(shù),不同部分之間不存在通信和等待。
分布式訓(xùn)練案例import tensorflow as tf import numpy as np # Configuration of cluster ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ] cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") FLAGS = tf.app.flags.FLAGS def main(_): with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)): x_data = tf.placeholder(tf.float32, [100]) y_data = tf.placeholder(tf.float32, [100]) W = tf.Variable(tf.random_uniform([1], -1.0, 1.0)) b = tf.Variable(tf.zeros([1])) y = W * x_data + b loss = tf.reduce_mean(tf.square(y - y_data)) global_step = tf.Variable(0, name="global_step", trainable=False) optimizer = tf.train.GradientDescentOptimizer(0.1) train_op = optimizer.minimize(loss, global_step=global_step) tf.summary.scalar("cost", loss) summary_op = tf.summary.merge_all() init_op = tf.global_variables_initializer() # The StopAtStepHook handles stopping after running given steps. hooks = [ tf.train.StopAtStepHook(last_step=1000000)] # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(master="grpc://" + worker_hosts[FLAGS.task_index], is_chief=(FLAGS.task_index==0), # 我們制定task_index為0的任務(wù)為主任務(wù),用于負(fù)責(zé)變量初始化、做checkpoint、保存summary和復(fù)原 checkpoint_dir="/tmp/tf_train_logs", save_checkpoint_secs=None, hooks=hooks) as mon_sess: while not mon_sess.should_stop(): # Run a training step asynchronously. # See `tf.train.SyncReplicasOptimizer` for additional details on how to # perform *synchronous* training. # mon_sess.run handles AbortedError in case of preempted PS. train_x = np.random.rand(100).astype(np.float32) train_y = train_x * 0.1 + 0.3 _, step, loss_v, weight, biase = mon_sess.run([train_op, global_step, loss, W, b], feed_dict={x_data: train_x, y_data: train_y}) if step % 100 == 0: print "step: %d, weight: %f, biase: %f, loss: %f" %(step, weight, biase, loss_v) print "Optimization finished." if __name__ == "__main__": tf.app.run()
代碼中,tf.train.replica_device_setter()會(huì)根據(jù)job名,將with內(nèi)的Variable op放到ps tasks,將其他計(jì)算op放到worker tasks。默認(rèn)分配策略是輪詢。
在屬于集群的一臺(tái)機(jī)器中執(zhí)行上面的代碼,屏幕會(huì)開始輸出每輪迭代的訓(xùn)練參數(shù)和損失
python train.py --task_index=0
在另一臺(tái)機(jī)器上執(zhí)行下面你的代碼,再啟動(dòng)一個(gè)任務(wù),會(huì)看到屏幕開始輸出每輪迭代的訓(xùn)練參數(shù)和損失,注意,step不再是從0開始,而是在啟動(dòng)時(shí)刻上一個(gè)啟動(dòng)任務(wù)的step后繼續(xù)。此時(shí)觀察兩個(gè)任務(wù),會(huì)發(fā)現(xiàn)他們同時(shí)在對(duì)同一參數(shù)進(jìn)行更新。
python train.py --task_index=2思考
分布式TensorFlow與Spark對(duì)比:
分布式的級(jí)別不同:TensorFlow的Tensor、Variable和Op不是分布式的,分布式執(zhí)行的是subgraph. Spark的op和變量都是構(gòu)建在RDD上,RDD本身是分布式的。
異步訓(xùn)練:TensorFlow支持同步和異步的分布式訓(xùn)練;Spark原生的API只支持同步訓(xùn)練
分布式存儲(chǔ):Spark在底層封裝好了worker和分布式數(shù)據(jù)之間的關(guān)系;TensorFlow需要自行維護(hù)。
Parameter Server:TensorFlow支持,Spark暫不支持。
TF分布式部署起來還是比較繁瑣的,需要定義好每個(gè)任務(wù)的ip:port,手工啟動(dòng)每個(gè)task,不提供一個(gè)界面可以對(duì)集群進(jìn)行維護(hù)。
參考資料
白話tensorflow分布式部署和開發(fā)
理解和實(shí)現(xiàn)分布式TensorFlow集群完整教程
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://www.ezyhdfw.cn/yun/38424.html
摘要:貢獻(xiàn)者飛龍版本最近總是有人問我,把這些資料看完一遍要用多長(zhǎng)時(shí)間,如果你一本書一本書看的話,的確要用很長(zhǎng)時(shí)間。為了方便大家,我就把每本書的章節(jié)拆開,再按照知識(shí)點(diǎn)合并,手動(dòng)整理了這個(gè)知識(shí)樹。 Special Sponsors showImg(https://segmentfault.com/img/remote/1460000018907426?w=1760&h=200); 貢獻(xiàn)者:飛龍版...
摘要:本文的目的是聚焦于數(shù)據(jù)操作能力,講述中比較重要的一些,幫助大家實(shí)現(xiàn)各自的業(yè)務(wù)邏輯。傳入輸入值,指定輸出的基本數(shù)據(jù)類型。 引言 用TensorFlow做好一個(gè)機(jī)器學(xué)習(xí)項(xiàng)目,需要具備多種代碼能力: 工程開發(fā)能力:怎么讀取數(shù)據(jù)、怎么設(shè)計(jì)與運(yùn)行Computation Graph、怎么保存與恢復(fù)變量、怎么保存統(tǒng)計(jì)結(jié)果、怎么共享變量、怎么分布式部署 數(shù)據(jù)操作能力:怎么將原始數(shù)據(jù)一步步轉(zhuǎn)化為模型需...
閱讀 2499·2021-11-11 11:01
閱讀 3395·2021-10-11 10:57
閱讀 2746·2021-09-30 09:46
閱讀 3559·2021-07-26 23:38
閱讀 1635·2019-08-29 12:22
閱讀 713·2019-08-29 11:28
閱讀 2431·2019-08-26 14:04
閱讀 3137·2019-08-23 18:34