第94课:SparkStreaming实现广告计费系统中在线黑名单过滤实战

  

本期课程内容:

<李>

在线黑名单过滤实现解析

<李>

SparkStreaming实现在线黑名单过滤,


广告计费系统,是电商必不可少的一个功能点。为了防止恶意的广告点击(假设商户A和B同时在某电商做了广告,A和B为竞争对手,那么如果一个使用点击机器人进行对B的广告的恶意点击,那么B的广告费用将很快被用完),必须对广告点击进行黑名单过滤。

可以使用leftOuterJoin对目标数据和黑名单数据进行关联,将命中黑名单的数据过滤掉。


package  com.dt.spark.sparkapps.streaming      import  org.apache.spark.SparkConf   import  org.apache.spark.streaming。{秒,StreamingContext}/* *   ,*使用Scala开发集群运行的Spark 在线黑名单过滤程序   ,* Created  by  Limaoran 提醒2016/5/2。   ,*新浪微博:http://weibo.com/ilovepains/,*   ,*背景描述:在广告点击计费系统中,我们在线过滤掉黑名单的点击,进而保护广告商的利益,   ,*只进行有效的广告点击计费或者在防刷评分(或者流量)系统,过滤掉无效的投票或者评分或者流量;   ,*实现技术:使用transform  Api直接基于抽样编程,进行加入操作   ,*/object  OnlineBlackListFilter  {   def 才能;主要(args:数组[String]), {   ,,,/* *   ,,,,*,第1步:创建火花的配置对象SparkConf,设置火花程序的运行时的配置信息。   ,,,,*,例如说通过setMaster来设置程序要链接的火花集群的主人的URL,如果设置   ,,,,*,为当地,则代表火花程序在本地运行,特别适合于机器配置条件非常差(例如   ,,,,*,只有1 g的内存)的初学者,,,,,,   ,,,*/,,,val  conf =, new  SparkConf(),,//创建SparkConf对象   ,,,conf.setAppName (“OnlineBlackListFilter”),,//设置应用程序的名称,在程序运行的监控界面可以看到名称   ,,,conf.setMaster(“火花://主:7077”),//此时,程序在火花集群      ,,,val  ssc =, new  StreamingContext(参看,秒(30))      ,,,/* *   ,,,,*,黑名单数据准备,实际上黑名单一般都是动态的,例如在复述,或者数据库中,黑名单的生成往往有复杂的业务   ,,,,*,逻辑,具体情况算法不同,但是在Spark 流进行处理的时候每次都能工访问完整的信息   ,,,*/,,,val  blackList =,数组((“hadoop”,真的),(“mahout”,真正的))   ,,,val  blackListRDD =, ssc.sparkContext.parallelize(黑名单,8)      ,,,val  adsClickStream =, ssc.socketTextStream(“大师”,,9999)   ,,,/* *   ,,,,*,此处模拟的广告点击的每条数据的格式为:时间、名字   ,,,,*,此处地图操作的结果是名字,(时间、名称)的格式   ,,,*/,,,val  adsClientStreamFormated =, adsClickStream.map(广告=祝辞(ads.split(", ")(1)、广告))   ,,,adsClientStreamFormated.transform (userClickRDD =祝辞,{   ,,,,,//通过leftOuterJoin操作既保留了左侧用户广告点击内容的抽样的所有内容,又获得了相应点击内容是否在黑名单中   ,,,,,val  joinedBlackListRDD =, userClickRDD.leftOuterJoin (blackListRDD)   ,,,,,/* *   ,,,,,,*,进行滤过滤的时候,其输入元素是一个元组:(名称,((时间、名称),布尔型))   ,,,,,,*,其中第一个元素是黑名单的名称,第二元素的第二个元素是进行leftOuterJoin的时候是否存在在值   ,,,,,,*,如果存在的话,表面当前广告点击是黑名单,需要过滤掉,否则的话则是有效点击内容;   ,,,,,*/,,,,,val  validClicked =, joinedBlackListRDD.filter (joinedItem=祝辞{   ,,,,,,,如果(joinedItem._2._2.getOrElse(假)){   ,,,,,,,,,假的   ,,,,,,,其他}{   ,,,,,,,,,真的   ,,,,,,,}   ,,,,,})   ,,,,,validClicked.map (validClick =祝辞,{validClick._2._1})   ,,,}).print ()   ,,,/* *   ,,,,*,计算后的有效数据一般都会写入卡夫卡中,下游的计费系统会从卡夫卡中拉到有效数据进行计费   ,,,*/,,,ssc.start ()   ,,,ssc.awaitTermination ()   ,,}   }

将程序打包,并上传至火花集群


在spark-master节点,启动数控

root@spark-master: ~ #, nc  -lk  9999年

运行OnlineBlacklistFilter程序

root@spark-master: ~ #,/usr/地方/spark-1.6.0/bin/spark-submit ——class  com.dt.spark.sparkapps.streaming.OnlineBlackListFilter ——master 火花://主:7077。/sparkApps。jar

第94课:SparkStreaming实现广告计费系统中在线黑名单过滤实战