大家好,我是不温卜火,是一名计算机学院大数据专业大二的学生,昵称来源于成语—不温不火,本意是希望自己性情温和。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!暂时只有这一个平台
项目代码博主已经打包到Github需要的可以自行下载:https://github.com/459804692/spark0729
目录
- 一. 需求简介
 - 二. 思路分析
 - 三. 具体代码实现
 - 四. 运行结果
 
一. 需求简介
计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率
比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率.

产品经理和运营总监,可以根据这个指标,去尝试分析,整个网站,产品,各个页面的表现怎么样,是不是需要去优化产品的布局;吸引用户最终可以进入最后的支付页面。
数据分析师,可以此数据做更深一步的计算和分析。
企业管理层,可以看到整个公司的网站,各个页面的之间的跳转的表现如何,可以适当调整公司的经营战略或策略。
在该模块中,需要根据查询对象中设置的 Session 过滤条件,先将对应得 Session 过滤出来,然后根据查询对象中设置的页面路径,计算页面单跳转化率,比如查询的页面路径为:3、5、7、8,那么就要计算 3-5、5-7、7-8 的页面单跳转化率。
需要注意的一点是,页面的访问时有先后的,要做好排序。
二. 思路分析
- 读取到规定的页面
 - 过滤出来规定页面的日志记录, 并统计出来每个页面的访问次数 countByKey 是行动算子 reduceByKey 是转换算子
 - 明确哪些页面需要计算跳转次数 1-2, 2-3, 3-4 …
 - 按照 session 统计所有页面的跳转次数, 并且需要按照时间升序来排序
 - 按照 session 分组, 然后并对每组内的 UserVisitAction 进行排序
 - 转换访问流水
 - 过滤出来和统计目标一致的跳转
 - 统计跳转次数
 - 计算跳转率
 

三. 具体代码实现
- 1. 业务代码
 
package com.buwenbuhuo.spark.core.project.app
import java.text.DecimalFormat
import com.buwenbuhuo.spark.core.project.bean.UserVisitAction
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
/**
 **
 *
 * @author 不温卜火
 *         *
 * @create 2020-07-30 15:19
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object PageConversion {
    def statPageConversionRate(sc: SparkContext,
                               userVisitActionRDD: RDD[UserVisitAction],
                               pageString:String): Unit ={
      // 1. 做出来目标跳转流 1,2,3,4,5,6,7
      val pages: Array[String] = pageString.split(",")
      val prePages: Array[String] = pages.take(pages.length -1)
      val postPages: Array[String] = pages.takeRight(pages.length -1)
      val targetPageFlows: Array[String] = prePages.zip(postPages).map {
        case (pre, post) => s"$pre->$post"
      }
/*      // 1.1 把targetPages做广播变量,优化性能
      val targetPageFlowsBC: Broadcast[Array[String]] = sc.broadcast(targetPageFlows)*/
//    println(targetPageFlows.toList)
      // 2. 计算分母,计算需要页面的点击量
      val pageAndCount = userVisitActionRDD
        .filter(action => prePages.contains(action.page_id.toString))
        .map(action => (action.page_id, 1))
        .countByKey()
//      println(pageAndCount)    // 没问题
      // 3. 计算分子
      // 3.1 按照sessionID分组,不能先对需要的页面做过滤,否则会应用调整的逻辑
      val sessionIdGrouped: RDD[(String, Iterable[UserVisitAction])] = userVisitActionRDD.groupBy(_ .session_id)
      val pageFlowsRDD: RDD[String] = sessionIdGrouped.flatMap {
        case (sid, actionIt) =>
          // 每个session的行为做一个按照时间排序
          val actions: List[UserVisitAction] = actionIt.toList.sortBy(_.action_time)
          val preActions: List[UserVisitAction] = actions.take(actions.length -1)
          val postActions: List[UserVisitAction] = actions.takeRight(actions.length -1)
          preActions.zip(postActions).map {
            case (preAction, postAction) => s"${preAction.page_id}->${postAction.page_id}"
          }.filter(flow => targetPageFlows.contains(flow))
//            .filter(flow => targetPageFlowsBC.value.contains(flow)) // 使用广播变量 本人使用有错误
      }
//      pageFlowsRDD.collect.foreach(println)
      // 3.2 聚合
      val pageFlowsAndCount = pageFlowsRDD.map((_, 1)).countByKey()
      // 序列化
      val f: DecimalFormat = new DecimalFormat(".00%")
      // 4. 计算调整率
      val result = pageFlowsAndCount.map {
        // pageAndCount 分母
        // 1-> 2 count/1的点击量
        case (flow, count) =>
          val rate = count.toDouble / pageAndCount(flow.split("->")(0).toLong)
          (flow,f.format(rate))
      }
      println(result)
    }
}
/*
1,2,3,4,5,6,7 计算他们的转换率
1. 想办法做出来跳转流
        “ 1->2 ”,“ 2->3 ”,“ 3->4 ”  ...
2. 计算跳转率
    1 -> 2 调整率
          分子
              “1->2” 跳转流的个数
                    如何计算?
                        1. 保证是同一session才能计算,其实就是按照session进行分组
                        2. 按照时间排序
                        3. RDD[“1->2”,“2->3”,“3->4”] map() reduceByKey
                           RDD[UserVisitAction] map
                           RDD[1,2,3,4,5,6,7,8]
                           如果做跳转流:
                              rdd1 = RDD[1,2,3,4,5,6]
                              rdd2 = RDD[2,3,4,5,6,7,8]
                              rdd3 = rdd1.zip(zip).map(...)
                              过滤出来目标跳转流,然后再聚合
           分母
              页数1的点击数
 */- 2. 主项目代码
 
package com.buwenbuhuo.spark.core.project.app
import com.buwenbuhuo.spark.core.project.bean.{CategoryCountInfo, UserVisitAction}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
  **
*@author 不温卜火
  **
  * @create 2020-07-29 12:18
  **
  *         MyCSDN :https://buwenbuhuo.blog.csdn.net/
  */
object ProjectApp {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("ProjectAPP").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(conf)
    // 把数据从文件读出来
    val sourceRDD: RDD[String] = sc.textFile("D:/user_visit_action.txt")
    // 把数据封装好(封装到样例类中)
//    sourceRDD.collect.foreach(println)
    val userVisitActionRDD: RDD[UserVisitAction] = sourceRDD.map(line => {
    val fields: Array[String] = line.split("_")
      UserVisitAction(
        fields(0),
        fields(1).toLong,
        fields(2),
        fields(3).toLong,
        fields(4),
        fields(5),
        fields(6).toLong,
        fields(7).toLong,
        fields(8),
        fields(9),
        fields(10),
        fields(11),
        fields(12).toLong)
    })
    // 三
    PageConversion.statPageConversionRate(sc,userVisitActionRDD,"1,2,3,4,5,6,7,8")
    // 关闭项目(sc)
    sc.stop()
  }
}四. 运行结果

本次的分享就到这里了,
  好书不厌读百回,熟读课思子自知。而我想要成为全场最靓的仔,就必须坚持通过学习来获取更多知识,用知识改变命运,用博客见证成长,用行动证明我在努力。
   如果我的博客对你有帮助、如果你喜欢我的博客内容,请“点赞” “评论”“收藏”一键三连哦!听说点赞的人运气不会太差,每一天都会元气满满呦!如果实在要白嫖的话,那祝你开心每一天,欢迎常来我博客看看。
   码字不易,大家的支持就是我坚持下去的动力。点赞后不要忘了关注我哦!











