一、引入相关依赖(版本很重要)
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.3</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.58.Final</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.1.6</version>
</dependency>
二、实现Spark编码
public class ProductRanking {
private Long skuId;
private String goodsName;
private Double totalQuantity;
private Double totalSales;
public ProductRanking(Long skuId, String goodsName, Double totalQuantity, Double totalSales) {
this.skuId = skuId;
this.goodsName = goodsName;
this.totalQuantity = totalQuantity;
this.totalSales = totalSales;
}
public Long getSkuId() {
return skuId;
}
public void setSkuId(Long skuId) {
this.skuId = skuId;
}
public String getGoodsName() {
return goodsName;
}
public void setGoodsName(String goodsName) {
this.goodsName = goodsName;
}
public Double getTotalQuantity() {
return totalQuantity;
}
public void setTotalQuantity(Double totalQuantity) {
this.totalQuantity = totalQuantity;
}
public Double getTotalSales() {
return totalSales;
}
public void setTotalSales(Double totalSales) {
this.totalSales = totalSales;
}
}
public class SparkJob {
public List<ProductRanking> getRanking(){
SparkSession spark = SparkSession.builder()
.appName("Order Item Ranking")
.master("local[*]")
.getOrCreate();
String jdbcUrl = "jdbc:mysql://localhost:3306/atest?useUnicode=true&characterEncoding=utf8false";
Properties jdbcProps = new Properties();
jdbcProps.put("user", "root");
jdbcProps.put("password", "123456");
jdbcProps.put("driver", "com.mysql.jdbc.Driver");
String query = "(SELECT * FROM nasi_mts_trade_order_item ) AS order_item_filtered";
Dataset<Row> orderItemDF = spark.read()
.jdbc(jdbcUrl, query, jdbcProps);
Dataset<Row> productRankingDF = orderItemDF.groupBy("sku_id", "goods_name")
.agg(
functions.sum("net_weight").alias("total_quantity"),
functions.sum(functions.col("net_weight").multiply(functions.col("goods_price"))).alias("total_sales")
)
.orderBy(functions.col("total_quantity").desc())
.limit(10);
List<Row> rows = productRankingDF.collectAsList();
List<ProductRanking> ra = rows.stream().map(row -> new ProductRanking(
row.getAs("sku_id"),
row.getAs("goods_name"),
row.getAs("total_quantity"),
row.getAs("total_sales")
)).collect(Collectors.toList());
log.info(JSONObject.toJSONString(ra));
spark.stop();
return ra;
}
}