1、pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>1.6.0</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>1.6.0</version>
</dependency>
2、RDDtoMysql.scala
//写入到mysql 表newtest.blog
object RDDtoMysql {
case class Blog(name:String,count: Int)
//iterator入参数据
def myFun(iterator: Iterator[(String,Int)]):Unit={
var conn: Connection=null;
var ps:PreparedStatement=null;
var sql="insert into blog(name,count) values(?,?)"
try {
//编码及serverTimezone useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT
conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/newtest" +
"?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT","root","root");
iterator.foreach(data=>{
ps=conn.prepareStatement(sql)
ps.setString(1,data._1)
ps.setInt(2,data._2)
ps.executeUpdate()
})
}catch {
case e:Exception=>e.printStackTrace()
case _=>println("Mysql Exception")
}finally {
if(ps!=null){
ps.close()
}
if(conn!=null){
conn.close()
}
}
}
main(args: Array[String]): Unit = {
var conf=new SparkConf().setAppName("RDDToMysql").setMaster("local")
var sc=new SparkContext(conf)
//需要传入数据data
var data=sc.parallelize(List(("www",10),("itblog",20),("com",30)
,("com2",40),("com3",50),("com4",60),("com5",60),("com6",70)))
data.foreachPartition(myFun)
}
}
3、RDDtoMysql2.scala
//写入到mysql 表newtest.student
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.{SparkConf, SparkContext}
object RDDtoMysql2 {
case class Student(id:Int,name:String,gender: String,age:Int)
//iterator入参数据
def myFun2(iterator: Iterator[(Int,String,String,Int)]):Unit={
var conn: Connection=null;
var ps:PreparedStatement=null;
var sql="insert into student(id,name,gender,age) values(?,?,?,?)"
try {
//编码及serverTimezone useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT
conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/newtest" +
"?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT","root","root");
iterator.foreach(data=>{
ps=conn.prepareStatement(sql)
ps.setInt(1,data._1)
ps.setString(2,data._2)
ps.setString(3,data._3)
ps.setInt(4,data._4)
ps.executeUpdate()
})
}catch {
case e:Exception=>e.printStackTrace()
case _=>println("Mysql Exception")
}finally {
if(ps!=null){
ps.close()
}
if(conn!=null){
conn.close()
}
}
}
main(args: Array[String]): Unit = {
var conf=new SparkConf().setAppName("RDDToMysql").setMaster("local")
var sc=new SparkContext(conf)
//需要传入数据data
var data=sc.parallelize(List((2,"test2","g",20),(3,"test3","g",30),
(4,"test4","g",14),(5,"test5","g",25),(6,"test6","b",60),
(7,"test7","b",17),(8,"test8","b",28),(9,"test9","g",39)))
data.foreachPartition(myFun2)
}
}
4、RDDFromMysqlToMysql.java
//mysql 从表newtest.student记录
// 写入到newtest.student2
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.DataFrame;
import java.io.Serializable;
import java.util.Properties;
public class RDDFromMysqlToMysql implements Serializable {
private SparkConf sparkConf;
private JavaSparkContext javaSparkContext;
private SQLContext sqlContext;
/**
* 创建sparkContext
*/
private void sparkContext(){
String warehouseLocation=System.getProperty("user.dir");
sparkConf=new SparkConf().setAppName("FromToMysql").setMaster("local");
javaSparkContext=new JavaSparkContext(sparkConf);
}
/*
* 创建sqlContext
* 用于读写MySQL中的数据
* */
private void initSQLContext() {
sqlContext = new SQLContext(javaSparkContext);
}
/*
* 初始化Load
* 创建sparkContext, sqlContext
* */
public RDDFromMysqlToMysql() {
sparkContext();
initSQLContext();
}
/*
* 使用spark-sql从student中读取数据, 处理后再回写到student2
* */
public void db2db() {
String url = "jdbc:mysql://localhost:3306/newtest?characterEncoding=UTF-8&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT";
String fromTable = "student";
String toTable = "student2";
Properties props = new Properties();
props.setProperty("driver","com.mysql.cj.jdbc.Driver");
props.put("user", "root");
props.put("password", "root");
//读取表student记录
DataFrame rows = sqlContext.read().jdbc(url, fromTable, props);//.where("count < 1000");
//显示表student记录
rows.show();
//表student记录 添加到表student2
rows.write().mode(SaveMode.Append).jdbc(url, toTable, props);
}
public static void main(String[] args){
RDDFromMysqlToMysql db=new RDDFromMysqlToMysql();
System.out.println(" ---------------------- start db ------------------------");
db.db2db();
System.out.println(" ---------------------- end db ------------------------");
}
}