TensorFlow分布式实践

  

大数据时代,基于单机的建模很难满足企业不断增长的数据量级的需求,开发者需要使用分布式的开发方式,在集群上进行建模。而单机和分布式的开发代码有一定的区别,本文就将为开发者们介绍,基于TensorFlow进行分布式开发的两种方式,帮助开发者在实践的过程中,更好地选择模块的开发方向。

  <人力资源/>   

基于TensorFlow原生的分布式开发

  

分布式开发会涉及到更新梯度的方式,有同步和异步的两个方案,同步更新的方式在模型的表现上能更快地进行收敛,而异步更新时,迭代的速度则会更加快。两种更新方式的图示如下:

  

 TensorFlow分布式实践“> </p>
  <p> <em>同步更新流程</em> <br/> <em>(图片TensorFlow:大规模机器学习alt=

  

异步更新流程
(图片TensorFlow:大规模机器学习% FLAGS.task_index,   集群=集群):   

继而更新梯度。

  

(1)同步更新梯度:

  
 <代码> rep_op=tf.train.SyncReplicasOptimizer(优化器,
  replicas_to_aggregate=len (worker_hosts),
  replica_id=FLAGS.task_index,
  total_num_replicas=len (worker_hosts),
  use_locking=True)
  train_op=rep_op.apply_gradients (grads_and_vars global_step=global_step)
  init_token_op=rep_op.get_init_tokens_op ()
  chief_queue_runner=rep_op.get_chief_queue_runner()  
  

(2)异步更新梯度:

  
 <代码> train_op=optimizer.apply_gradients (grads_and_vars global_step=global_step)  
  

最后,使用tf.train。主管进行真的迭代

  

另外,开发者还要注意,如果是同步更新梯度,则还需要加入如下代码:

  
 <代码> sv。start_queue_runners(税[chief_queue_runner])
  sess.run (init_token_op)  
  

需要注意的是,上述异步的方式需要自行指定集群IP和端口,不过,开发者们也可以借助TensorFlowOnSpark,使用纱进行管理。

  

基于TensorFlowOnSpark的分布式开发

  

作为个推面向开发者服务的移动应用数据统计分析产品,个数所具有的用户行为预测功能模块,便是基于TensorFlowOnSpark这种分布式来实现的。基于TensorFlowOnSpark的分布式开发使其可以在屏蔽了端口和机器IP的情况下,也能够做到较好的资源申请和分配。而在多个千万级应用同时建模的情况下,集群也有良好的表现,在sparkUI中也能看到相对应的资源和进程的情况。最关键的是,TensorFlowOnSpark可以在单机过度到分布式的情况下,使代码方便修改,且容易部署。

  

基于TensorFlowOnSpark的分布式开发的具体流程如下:

  

首先,需要使用spark-submit来提交任务,同时指定火花需要运行的参数(-num-executors 6等)、模型代码,模型超参等,同样需要接受外部参数:

  
 <代码>解析器=argparse.ArgumentParser ()
  解析器。add_argument(“我”,“——跟踪”,帮助=笆菁肪丁?
  args=parser.parse_args()  
  

之后,准备好参数和训练数据(DataFrame),调用模型的API进行启动。

  

其中,soft_dist.map_fun是要调起的方法,后面均是模型训练的参数。

  
 <代码>估计量=TFEstimator (soft_dist。map_fun, args) \
  .setInputMapping({“跟踪”:“跟踪”,“标签”:“标签”})\
  .setModelDir args.model \
  .setExportDir args.serving \
  .setClusterSize args.cluster_size \
  .setNumPS num_ps \
  .setEpochs args.epochs \
  .setBatchSize args.batch_size \
  .setSteps (args.max_steps)
  模型=estimator.fit (df)  
  

接下来是soft_dist定义一个map_fun (args, ctx)的方法:

  
 <代码> def map_fun (args, ctx):
  …
  worker_num=ctx。worker_num #工人数量
  job_name=ctx。job_name #工作名
  task_index=ctx。task_index #任务索引
  如果job_name==皃s”: # ps节点(主节点)
  时间。睡眠((worker_num + 1) * 5)=TFNode集群服务器。start_cluster_server (ctx 1 args.rdma)
  num_workers=len (cluster.as_dict()(“工人”))
  如果job_name==皃s”:
  server.join ()
  elif job_name==肮と恕?
  tf.device (tf.train.replica_device_setter (worker_device="/工作:工作/任务:% d % task_index,集群=集群)): 
  

之后,可以使用tf.train.MonitoredTrainingSession高级API,进行模型训练和预测。

  

总结

  

基于TensorFlow的分布式开发大致就是本文中介绍的两种情况,第二种方式可以用于实际的生产环境,稳定性会更高。

TensorFlow分布式实践