spark springboot 实例写入mysql(写入blog表),mysql(读取student、写入student2)20221024

阅读 167

2022-11-29

1、pom.xml
      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
      </dependency>
      <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.11</artifactId>
          <version>1.6.0</version>
        <exclusions>
        <exclusion>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.17.Final</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.6.2</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <scope>runtime</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>1.6.0</version>
    </dependency>
    

2、RDDtoMysql.scala
//写入到mysql 表newtest.blog

 

 

object RDDtoMysql {

  case class Blog(name:String,count: Int)
    //iterator入参数据
  def myFun(iterator: Iterator[(String,Int)]):Unit={
    var conn: Connection=null;
    var ps:PreparedStatement=null;
    var sql="insert into blog(name,count) values(?,?)"
    try {
      //编码及serverTimezone useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT
      conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/newtest" +
      "?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT","root","root");
      iterator.foreach(data=>{
      ps=conn.prepareStatement(sql)
      ps.setString(1,data._1)
      ps.setInt(2,data._2)
      ps.executeUpdate()
    })
    }catch {
      case e:Exception=>e.printStackTrace()
      case _=>println("Mysql Exception")
    }finally {
      if(ps!=null){
        ps.close()
      }
      if(conn!=null){
        conn.close()
      }
    }
  }

 

main(args: Array[String]): Unit = {
    var conf=new SparkConf().setAppName("RDDToMysql").setMaster("local")
    var sc=new SparkContext(conf)
    //需要传入数据data
    var data=sc.parallelize(List(("www",10),("itblog",20),("com",30)
    ,("com2",40),("com3",50),("com4",60),("com5",60),("com6",70)))
    data.foreachPartition(myFun)
  }

}

 

spark springboot 实例写入mysql(写入blog表),mysql(读取student、写入student2)20221024_spark

 

 

3、RDDtoMysql2.scala
  //写入到mysql 表newtest.student
  import java.sql.{Connection, DriverManager, PreparedStatement}
  import org.apache.spark.{SparkConf, SparkContext}

  object RDDtoMysql2 {

    case class Student(id:Int,name:String,gender: String,age:Int)
    //iterator入参数据
    def myFun2(iterator: Iterator[(Int,String,String,Int)]):Unit={
      var conn: Connection=null;
      var ps:PreparedStatement=null;
      var sql="insert into student(id,name,gender,age) values(?,?,?,?)"
      try {
      //编码及serverTimezone useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT
        conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/newtest" +
        "?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT","root","root");
        iterator.foreach(data=>{
        ps=conn.prepareStatement(sql)
        ps.setInt(1,data._1)
        ps.setString(2,data._2)
        ps.setString(3,data._3)
        ps.setInt(4,data._4)
        ps.executeUpdate()
        })
      }catch {
        case e:Exception=>e.printStackTrace()
        case _=>println("Mysql Exception")
      }finally {
        if(ps!=null){
          ps.close()
        }
        if(conn!=null){
          conn.close()
        }
    }
  }

 

main(args: Array[String]): Unit = {
    var conf=new SparkConf().setAppName("RDDToMysql").setMaster("local")
    var sc=new SparkContext(conf)
    //需要传入数据data
    var data=sc.parallelize(List((2,"test2","g",20),(3,"test3","g",30),
      (4,"test4","g",14),(5,"test5","g",25),(6,"test6","b",60),
      (7,"test7","b",17),(8,"test8","b",28),(9,"test9","g",39)))
      data.foreachPartition(myFun2)
    }
}

 

spark springboot 实例写入mysql(写入blog表),mysql(读取student、写入student2)20221024_sql_02

 

 

 

 

4、RDDFromMysqlToMysql.java
  //mysql 从表newtest.student记录
  // 写入到newtest.student2

  import org.apache.spark.SparkConf;
  import org.apache.spark.api.java.JavaSparkContext;
  import org.apache.spark.sql.SQLContext;
  import org.apache.spark.sql.SaveMode;
  import org.apache.spark.sql.DataFrame;
  import java.io.Serializable;
  import java.util.Properties;

  public class RDDFromMysqlToMysql implements Serializable {
    private SparkConf sparkConf;
    private JavaSparkContext javaSparkContext;
    private SQLContext sqlContext;

    /**
      * 创建sparkContext
    */
    private void sparkContext(){
      String warehouseLocation=System.getProperty("user.dir");
      sparkConf=new SparkConf().setAppName("FromToMysql").setMaster("local");
      javaSparkContext=new JavaSparkContext(sparkConf);
    }

    /*
      * 创建sqlContext
      * 用于读写MySQL中的数据
    * */
    private void initSQLContext() {
      sqlContext = new SQLContext(javaSparkContext);
    }

    /*
      * 初始化Load
      * 创建sparkContext, sqlContext
    * */
  public RDDFromMysqlToMysql() {
    sparkContext();
    initSQLContext();
  }

 

  /*
    * 使用spark-sql从student中读取数据, 处理后再回写到student2
  * */
  public void db2db() {
    String url = "jdbc:mysql://localhost:3306/newtest?characterEncoding=UTF-8&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT";
    String fromTable = "student";
    String toTable = "student2";
    Properties props = new Properties();
    props.setProperty("driver","com.mysql.cj.jdbc.Driver");
    props.put("user", "root");
    props.put("password", "root");
    //读取表student记录
    DataFrame rows = sqlContext.read().jdbc(url, fromTable, props);//.where("count < 1000");
    //显示表student记录
    rows.show();
    //表student记录 添加到表student2
    rows.write().mode(SaveMode.Append).jdbc(url, toTable, props);
  }

  public static void main(String[] args){
    RDDFromMysqlToMysql db=new RDDFromMysqlToMysql();
    System.out.println(" ---------------------- start db ------------------------");
    db.db2db();
    System.out.println(" ---------------------- end db ------------------------");
  }
}

 

spark springboot 实例写入mysql(写入blog表),mysql(读取student、写入student2)20221024_mysql_03

 

 

spark springboot 实例写入mysql(写入blog表),mysql(读取student、写入student2)20221024_sql_04

 

精彩评论(0)

0 0 举报