1.基于服务器 log 的热门页面浏览量统计
 每隔 5 秒,输出最近 10 分钟内访问量最多的前 N 个 URL
package com.chuangyan.network35
import java.lang
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.time.Duration
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
object NetworkFlow {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)
    env.setParallelism(1)
    val source: DataStream[String] = env.readTextFile("D:\\study\\Code\\UserBehavior\\NetworkFlowAnalysis\\src\\main\\resources\\apache.log")
    val dataStream: DataStream[ApacheLogEvent] = source.map(line => {
      val split = line.split(" ")
      val ip = split(0)
      val userId = split(1)
      val format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
      val date = format.parse(split(3))
      val eventTime = date.getTime
      val method = split(5)
      val url = split(6)
      ApacheLogEvent(ip, userId, eventTime, method, url)
    })
      .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
        .withTimestampAssigner(new SerializableTimestampAssigner[ApacheLogEvent] {
          override def extractTimestamp(element: ApacheLogEvent, ApacheLogEvent: Long): Long = element.eventTime
        }))
    //每隔五秒输出近十分钟内访问量最多的前N个URL
    dataStream.filter(_.url!="/")
      .keyBy(_.url)
      .timeWindow(Time.minutes(10),Time.seconds(5))
      .aggregate(new CountAgg(),new WindowResultFunction())
    //窗内排序
      .keyBy(_.windowEnd)
      .process(new TopNHotUrls(5))
      .print()
    env.execute("url job")
  }
}
class CountAgg extends  AggregateFunction[ApacheLogEvent,Long,Long]{
  override def createAccumulator(): Long = 0L
  override def add(in: ApacheLogEvent, acc: Long): Long = acc + 1
  override def getResult(acc: Long): Long = acc
  override def merge(acc: Long, acc1: Long): Long = acc + acc1
}
case class UrlViewCount(url:String,windowEnd:Long,count:Long)
class WindowResultFunction extends WindowFunction[Long,UrlViewCount,String,TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
    val url=key
    val windowEnd=window.getEnd
    val count=input.iterator.next()
    out.collect(UrlViewCount(url,windowEnd,count))
  }
}
class TopNHotUrls(size: Int) extends KeyedProcessFunction[Long,UrlViewCount,String]{
  var listState: ListState[UrlViewCount] =_
  override def open(parameters: Configuration): Unit = {
    listState= getRuntimeContext.getListState(new ListStateDescriptor[UrlViewCount]("listState",classOf[UrlViewCount]))
  }
  override def processElement(i: UrlViewCount, context: KeyedProcessFunction[Long, UrlViewCount, String]#Context, collector: Collector[String]): Unit = {
    //添加到状态
    listState.add(i)
    //注册定时器
    val ts=i.windowEnd+1
    context.timerService().registerEventTimeTimer(ts)
  }
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
    val viewCounts: lang.Iterable[UrlViewCount] = listState.get()
    val buffer= ListBuffer[UrlViewCount]()
    val it = viewCounts.iterator()
    while (it.hasNext){
      buffer += it.next()
    }
    val urlViewCount=buffer.sortBy(_.count).reverse.take(size)
    //将排名信息格式化成String便于打印
    val result: StringBuilder = new StringBuilder
    result.append("====================")
    result.append("时间:").append(new Timestamp(timestamp - 1)).append("\n")
    for(i <- urlViewCount.indices){
      val currentURL=urlViewCount(i)
      result.append("No.").append(i+1).append(":")
        .append("URL=").append(currentURL.url)
        .append("浏览量=").append(currentURL.count).append("\n")
    }
    result.append("===================")
    //控制输出频率 模拟实时滚动结果
    Thread.sleep(1000)
    out.collect(result.toString)
  }
}










