0
点赞
收藏
分享

微信扫一扫

用户行为日志的统计,Java mapreduce与Scala spark的代码存档...





123 456 123
123 456

Array[Array[(K,V)]]] = {
Array[(K,V)] = { (123,1), (456,1), (123, 1) }
Array[(K,V)] = { (123,1), (456,1) }
}

Array[(K,V)] = { (123,1), (456,1), (123, 1), (123,1), (456,1) }


注意spark在map的时候不要随便返回一个null,可能会导致程序运行失败,返回一个该类型的空对象就好。



Java mapreduce:

package com.news.rec.monitor;

import com.newsRec.model.UserActionLog;
import com.sohu.newsRec.parser.UserLogParser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
* Created by jacobzhou on 2016/9/1.
*/
public class UserActiveCount extends Configured implements Tool {
private static String DELIMA = "\t";
public static class MRMapper extends Mapper<Object, Text, Text, Text> {
private UserLogParser userActorParser = new UserLogParser();
private long getReadTime(String line){
String readTime = "";
int index = line.indexOf("readTime") + 9;
while (line.charAt(index)>='0' && line.charAt(index)<='9'){
readTime += line.charAt(index);
index ++;
}
if (!readTime.equals(""))
return Long.parseLong(readTime);
else
return 0;
}
protected void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String line = value.toString();
UserActionLog userLog = userActorParser.parseKV(line);
String act = userLog.getAct();
long gbCode = userLog.getgbCode();
long pvNum = 0;
long expoNum = 0;
long tmNum = 0;
long readTime = getReadTime(line);
if (readTime<4 || readTime>3000)
readTime = 0;
if (act.equals("expo")) expoNum = 1;
else if (act.equals("pv")) pvNum = 1;
else if (act.equals("tm")){
tmNum = 1;
if (readTime == 0)
return;
}
String net = userLog.getNet();
if (net==null || net.trim().equals("")){
net = "blank";
}
String wKey = "net" + DELIMA + net + DELIMA + "gbCode" + DELIMA + gbCode;
String wValue = expoNum + DELIMA + pvNum + DELIMA + tmNum + DELIMA + readTime;
context.write(new Text(wKey), new Text(wValue));
}
protected void cleanup(Context context) throws IOException, InterruptedException {}
}
public static class MRReducer extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String sKey[] = key.toString().split(DELIMA);
long expoNum, pvNum, tmNum, readTime;
String result;
expoNum=pvNum=tmNum=readTime=0;
for (Text val : values) {
String data[] = val.toString().split(DELIMA);
expoNum += Long.parseLong(data[0]);
pvNum += Long.parseLong(data[1]);
tmNum += Long.parseLong(data[2]);
readTime += Long.parseLong(data[3]);
}
result = expoNum + DELIMA + pvNum + DELIMA + tmNum + DELIMA + readTime;
context.write(key, new Text(result));
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
conf.set("mapreduce.job.queuename", "datacenter");
conf.set("mapred.max.map.failures.percent", "5");
int reduceTasksMax = 10;
Job job = new Job(conf);
job.setJobName("userActiveStatistic job");
job.setNumReduceTasks(reduceTasksMax);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MRMapper.class);
job.setReducerClass(MRReducer.class);
job.setJarByClass(UserActiveCount .class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.addInputPath(job,new Path(args[0]));
TextOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
try{
System.out.println("start run job!");
int ret = ToolRunner.run(new UserActiveCount(), args);
System.exit(ret);
}catch (Exception e){
e.printStackTrace();
}
}
}

Scala Spark, map, 把每行数据变成一个(key,value)

package zzy

import com.newsRec.parser.UserLogParser
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by jacobzhou on 2016/10/11.
*/
object newsMonitor {
private val DELIMA: String ="\t"
private val userActorParser = new UserLogParser
var num = 0
def mapData(line : String): (String,String) ={
if (num < 100) {
println(line)
num = num + 1
}
val userLog = UserLogParser.parseKV(line)
val act: String = userLog.getAct
val gbCode: Long = userLog.getgbCode
var pvNum: Long = 0
var expoNum: Long = 0
var tmNum: Long = 0
if (act == "expo") expoNum = 1
else if (act == "pv") pvNum = 1
else if (act == "tm") tmNum = 1
var net: String = userLog.getNet
if (net == null || net.trim == "") net = "blank"
val wKey: String = "net" + DELIMA + net + DELIMA + "gbCode" + DELIMA + gbCode
val wValue: String = expoNum + DELIMA + pvNum + DELIMA + tmNum
(wKey , wValue)
}
def reduceData(a: String, b : String): String = {
var expoNum: Long = 0L
var pvNum: Long = 0L
var tmNum: Long = 0L
val dataA: Array[String] = a.split(DELIMA)
val dataB: Array[String] = b.split(DELIMA)
expoNum = dataA(0).toLong + dataB(0).toLong
pvNum = dataA(1).toLong + dataB(1).toLong
tmNum = dataA(2).toLong + dataB(2).toLong
return expoNum + DELIMA + pvNum + DELIMA + tmNum
}
def main(args: Array[String]): Unit ={
println("Running")
val conf = new SparkConf()
conf.setAppName("SparkTest")
val input = args(0)
val output = args(1)
val sc = new SparkContext(conf)
val inData = sc.textFile(input)
val tmp = inData.map(line => mapData(line)).reduceByKey((x,y) => reduceData(x,y));//.collect().foreach(println)
tmp.saveAsTextFile(output);
}
}


Scala Spark, flatmap,有更好的扩展性,比如一行数据拆分成多个(key,value)就要先组合成一个List[(key,vlaue)]再通过flatmap展开

package zzy

import com.newsRec.parser.UserLogParser
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by jacobzhou on 2016/9/18.
*/
object newsMonitor {
private val DELIMA: String ="\t"
private val userActorParser = new UserLogParser
var num = 0
def mapData(line : String): Map[String,String] ={
if (num < 100) {
println(line)
num = num + 1
}
val userLog = UserLogParser.parseKV(line)
val act: String = userLog.getAct
val gbCode: Long = userLog.getgbCode
var pvNum: Long = 0
var expoNum: Long = 0
var tmNum: Long = 0
if (act == "expo") expoNum = 1
else if (act == "pv") pvNum = 1
else if (act == "tm") tmNum = 1
var net: String = userLog.getNet
if (net == null || net.trim == "") net = "blank"
val wKey: String = "net" + DELIMA + net + DELIMA + "gbCode" + DELIMA + gbCode
val wValue: String = expoNum + DELIMA + pvNum + DELIMA + tmNum
return Map(wKey -> wValue);
}
def reduceData(a: String, b : String): String = {
var expoNum: Long = 0L
var pvNum: Long = 0L
var tmNum: Long = 0L
val dataA: Array[String] = a.split(DELIMA)
val dataB: Array[String] = b.split(DELIMA)
expoNum = dataA(0).toLong + dataB(0).toLong
pvNum = dataA(1).toLong + dataB(1).toLong
tmNum = dataA(2).toLong + dataB(2).toLong
return expoNum + DELIMA + pvNum + DELIMA + tmNum
}
def main(args: Array[String]): Unit ={
println("Running")
val conf = new SparkConf()
conf.setAppName("SparkTest")
val input = args(0)
val output = args(1)
val sc = new SparkContext(conf)
val inData = sc.textFile(input)
val tmp = inData.flatMap(line => mapData(line)).reduceByKey((x,y) => reduceData(x,y));//.collect().foreach(println)
tmp.saveAsTextFile(output);
}
}


启动spark脚本举例

output=zeyangzhou/count
input=zeyangzhou/data
hadoop fs -rmr $output
jar=/opt/develop/zeyangzhou/zzy-1.0-SNAPSHOT-jar-with-dependencies.jar
SPARK=/usr/lib/spark/bin/spark-submit
${SPARK} --queue datacenter \
--class zzy.newsMonitor \
--executor-memory 15g \
--master yarn-cluster \
--driver-memory 20g \
--num-executors 30 \
--executor-cores 15 \
$jar $input $output

举报

相关推荐

0 条评论