1、pom.xml
                <dependency>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-starter</artifactId>
                </dependency>
                <dependency>
                     <groupId>org.apache.spark</groupId>
                     <artifactId>spark-core_2.11</artifactId>
                        <version>1.6.0</version>
                      <exclusions>
                          <exclusion>
                                <groupId>io.netty</groupId>
                                <artifactId>netty-all</artifactId>
                          </exclusion>
                    </exclusions>
              </dependency>
              <dependency>
                    <groupId>io.netty</groupId>
                    <artifactId>netty-all</artifactId>
                    <version>4.1.17.Final</version>
              </dependency>
              <dependency>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-client</artifactId>
                    <version>2.6.2</version>
              </dependency>
              <dependency>
                    <groupId>mysql</groupId>
                    <artifactId>mysql-connector-java</artifactId>
                    <scope>runtime</scope>
              </dependency>
              <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-sql_2.11</artifactId>
                    <version>1.6.0</version>
              </dependency>
              
2、RDDtoMysql.scala
//写入到mysql 表newtest.blog
object RDDtoMysql {
    case  class  Blog(name:String,count: Int)
      //iterator入参数据
    def  myFun(iterator: Iterator[(String,Int)]):Unit={
        var conn: Connection=null;
        var ps:PreparedStatement=null;
        var sql="insert into blog(name,count) values(?,?)"
        try {
            //编码及serverTimezone useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT
            conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/newtest" +
              "?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT","root","root");
            iterator.foreach(data=>{
              ps=conn.prepareStatement(sql)
              ps.setString(1,data._1)
              ps.setInt(2,data._2)
              ps.executeUpdate()
          })
        }catch {
            case e:Exception=>e.printStackTrace()
            case _=>println("Mysql Exception")
        }finally {
            if(ps!=null){
                ps.close()
            }
            if(conn!=null){
                conn.close()
            }
        }
    }
main(args: Array[String]): Unit = {
        var conf=new SparkConf().setAppName("RDDToMysql").setMaster("local")
        var sc=new SparkContext(conf)
        //需要传入数据data
        var data=sc.parallelize(List(("www",10),("itblog",20),("com",30)
          ,("com2",40),("com3",50),("com4",60),("com5",60),("com6",70)))
        data.foreachPartition(myFun)
    }
}

3、RDDtoMysql2.scala
  //写入到mysql 表newtest.student
  import java.sql.{Connection, DriverManager, PreparedStatement}
  import org.apache.spark.{SparkConf, SparkContext}
object RDDtoMysql2 {
      case  class  Student(id:Int,name:String,gender: String,age:Int)
      //iterator入参数据
      def  myFun2(iterator: Iterator[(Int,String,String,Int)]):Unit={
          var conn: Connection=null;
          var ps:PreparedStatement=null;
          var sql="insert into student(id,name,gender,age) values(?,?,?,?)"
          try {
            //编码及serverTimezone useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT
              conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/newtest" +
                "?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT","root","root");
              iterator.foreach(data=>{
                ps=conn.prepareStatement(sql)
                ps.setInt(1,data._1)
                ps.setString(2,data._2)
                ps.setString(3,data._3)
                ps.setInt(4,data._4)
                ps.executeUpdate()
              })
          }catch {
              case e:Exception=>e.printStackTrace()
              case _=>println("Mysql Exception")
          }finally {
              if(ps!=null){
                  ps.close()
              }
              if(conn!=null){
                  conn.close()
              }
        }
    }
main(args: Array[String]): Unit = {
        var conf=new SparkConf().setAppName("RDDToMysql").setMaster("local")
        var sc=new SparkContext(conf)
        //需要传入数据data
        var data=sc.parallelize(List((2,"test2","g",20),(3,"test3","g",30),
            (4,"test4","g",14),(5,"test5","g",25),(6,"test6","b",60),
            (7,"test7","b",17),(8,"test8","b",28),(9,"test9","g",39)))
          data.foreachPartition(myFun2)
      }
}

4、RDDFromMysqlToMysql.java
  //mysql 从表newtest.student记录
  //      写入到newtest.student2
  import org.apache.spark.SparkConf;
  import org.apache.spark.api.java.JavaSparkContext;
  import org.apache.spark.sql.SQLContext;
  import org.apache.spark.sql.SaveMode;
  import org.apache.spark.sql.DataFrame;
  import java.io.Serializable;
  import java.util.Properties;
  public class RDDFromMysqlToMysql implements Serializable {
        private SparkConf sparkConf;
        private JavaSparkContext javaSparkContext;
        private SQLContext sqlContext;
        /**
           * 创建sparkContext
         */
        private void  sparkContext(){
              String warehouseLocation=System.getProperty("user.dir");
              sparkConf=new SparkConf().setAppName("FromToMysql").setMaster("local");
              javaSparkContext=new JavaSparkContext(sparkConf);
        }
        /*
           *   创建sqlContext
           *   用于读写MySQL中的数据
         * */
        private void initSQLContext() {
              sqlContext = new SQLContext(javaSparkContext);
        }
        /*
           *   初始化Load
           *   创建sparkContext, sqlContext
         * */
      public RDDFromMysqlToMysql() {
            sparkContext();
            initSQLContext();
      }
      /*
         *   使用spark-sql从student中读取数据, 处理后再回写到student2
       * */
      public void db2db() {
            String url = "jdbc:mysql://localhost:3306/newtest?characterEncoding=UTF-8&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT";
            String fromTable = "student";
            String toTable = "student2";
            Properties props = new Properties();
            props.setProperty("driver","com.mysql.cj.jdbc.Driver");
            props.put("user", "root");
            props.put("password", "root");
            //读取表student记录
            DataFrame rows = sqlContext.read().jdbc(url, fromTable, props);//.where("count < 1000");
            //显示表student记录
            rows.show();
            //表student记录 添加到表student2
            rows.write().mode(SaveMode.Append).jdbc(url, toTable, props);
      }
      public  static void  main(String[] args){
            RDDFromMysqlToMysql db=new RDDFromMysqlToMysql();
            System.out.println(" ---------------------- start db ------------------------");
            db.db2db();
            System.out.println(" ---------------------- end db ------------------------");
      }
}

 











