大数据时代,基于单机的建模很难满足企业不断增长的数据量级的需求,开发者需要使用分布式的开发方式,在集群上进行建模。而单机和分布式的开发代码有一定的区别,本文就将为开发者们介绍,基于TensorFlow进行分布式开发的两种方式,帮助开发者在实践的过程中,更好地选择模块的开发方向。
<人力资源/>基于TensorFlow原生的分布式开发
分布式开发会涉及到更新梯度的方式,有同步和异步的两个方案,同步更新的方式在模型的表现上能更快地进行收敛,而异步更新时,迭代的速度则会更加快。两种更新方式的图示如下:
异步更新流程 继而更新梯度。 (1)同步更新梯度: (2)异步更新梯度: 最后,使用tf.train。主管进行真的迭代 另外,开发者还要注意,如果是同步更新梯度,则还需要加入如下代码: 需要注意的是,上述异步的方式需要自行指定集群IP和端口,不过,开发者们也可以借助TensorFlowOnSpark,使用纱进行管理。 作为个推面向开发者服务的移动应用数据统计分析产品,个数所具有的用户行为预测功能模块,便是基于TensorFlowOnSpark这种分布式来实现的。基于TensorFlowOnSpark的分布式开发使其可以在屏蔽了端口和机器IP的情况下,也能够做到较好的资源申请和分配。而在多个千万级应用同时建模的情况下,集群也有良好的表现,在sparkUI中也能看到相对应的资源和进程的情况。最关键的是,TensorFlowOnSpark可以在单机过度到分布式的情况下,使代码方便修改,且容易部署。 首先,需要使用spark-submit来提交任务,同时指定火花需要运行的参数(-num-executors 6等)、模型代码,模型超参等,同样需要接受外部参数: 之后,准备好参数和训练数据(DataFrame),调用模型的API进行启动。 其中,soft_dist.map_fun是要调起的方法,后面均是模型训练的参数。 接下来是soft_dist定义一个map_fun (args, ctx)的方法: 之后,可以使用tf.train.MonitoredTrainingSession高级API,进行模型训练和预测。 基于TensorFlow的分布式开发大致就是本文中介绍的两种情况,第二种方式可以用于实际的生产环境,稳定性会更高。
(图片TensorFlow:大规模机器学习% FLAGS.task_index,
集群=集群):代码>
<代码> rep_op=tf.train.SyncReplicasOptimizer(优化器,
replicas_to_aggregate=len (worker_hosts),
replica_id=FLAGS.task_index,
total_num_replicas=len (worker_hosts),
use_locking=True)
train_op=rep_op.apply_gradients (grads_and_vars global_step=global_step)
init_token_op=rep_op.get_init_tokens_op ()
chief_queue_runner=rep_op.get_chief_queue_runner() 代码>
<代码> train_op=optimizer.apply_gradients (grads_and_vars global_step=global_step) 代码>
<代码> sv。start_queue_runners(税[chief_queue_runner])
sess.run (init_token_op) 代码>
基于TensorFlowOnSpark的分布式开发
基于TensorFlowOnSpark的分布式开发的具体流程如下:
<代码>解析器=argparse.ArgumentParser ()
解析器。add_argument(“我”,“——跟踪”,帮助=笆菁肪丁?
args=parser.parse_args() 代码>
<代码>估计量=TFEstimator (soft_dist。map_fun, args) \
.setInputMapping({“跟踪”:“跟踪”,“标签”:“标签”})\
.setModelDir args.model \
.setExportDir args.serving \
.setClusterSize args.cluster_size \
.setNumPS num_ps \
.setEpochs args.epochs \
.setBatchSize args.batch_size \
.setSteps (args.max_steps)
模型=estimator.fit (df) 代码>
<代码> def map_fun (args, ctx):
…
worker_num=ctx。worker_num #工人数量
job_name=ctx。job_name #工作名
task_index=ctx。task_index #任务索引
如果job_name==皃s”: # ps节点(主节点)
时间。睡眠((worker_num + 1) * 5)=TFNode集群服务器。start_cluster_server (ctx 1 args.rdma)
num_workers=len (cluster.as_dict()(“工人”))
如果job_name==皃s”:
server.join ()
elif job_name==肮と恕?
tf.device (tf.train.replica_device_setter (worker_device="/工作:工作/任务:% d % task_index,集群=集群)):代码>
总结
TensorFlow分布式实践