springboot 整合spark

参考:https://my.oschina.net/woter/blog/1843755

依赖:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.edurt.ssi</groupId>
    <artifactId>springboot-scala-integration</artifactId>
    <packaging>jar</packaging>
    <version>1.0.0</version>

    <name>springboot-scala-integration</name>
    <description>SpringBoot Scala Integration is a open source springboot, scala integration example.</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <!-- dependency config -->
        <dependency.scala.version>2.11.12</dependency.scala.version>
        <!-- plugin config -->
        <plugin.maven.scala.version>3.1.3</plugin.maven.scala.version>
<!--        <scala.version>2.11.12</scala.version>-->
        <spark.version>2.4.5</spark.version>
    </properties>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${dependency.scala.version}</version>
        </dependency>
       <!-- 模板技术,一个比freemarker轻量级的模板引擎  -->

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>ch.qos.logback</groupId>
                    <artifactId>logback-classic</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- mustache -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mustache</artifactId>
        </dependency>
        <!-- data jpa and db -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.8</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>


        <!-- test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>junit</groupId>
                    <artifactId>junit</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.mockito</groupId>
                    <artifactId>mockito-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
    <!--    <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>
        <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>-->
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>${plugin.maven.scala.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

log4j.properties

#log4j.rootLogger=debug,console,fileAppender,dailyRollingFile,ROLLING_FILE,MAIL,DATABASE
log4j.rootLogger=console,dailyRollingFile
logpath=D:\MyworkPlace\scala-java
### 配置输出到控制台 ###
log4j.appender.console = org.apache.log4j.ConsoleAppender
log4j.appender.console.Target = System.out
log4j.appender.console.layout = org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern =  %d{ABSOLUTE} %5p %c{ 1 }:%L - %m%n

### 配置输出到文件 ###
log4j.appender.fileAppender = org.apache.log4j.FileAppender
log4j.appender.fileAppender.File = logs/log.log
log4j.appender.fileAppender.Append = true
log4j.appender.fileAppender.Threshold = DEBUG
log4j.appender.fileAppender.layout = org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n

### 配置输出到文件,并且每天都创建一个文件 ###
log4j.appender.dailyRollingFile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.dailyRollingFile.File = ${logpath}/log.log
log4j.appender.dailyRollingFile.Append = true
log4j.appender.dailyRollingFile.Threshold = DEBUG
log4j.appender.dailyRollingFile.layout = org.apache.log4j.PatternLayout
log4j.appender.dailyRollingFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n

### 配置输出到文件,且大小到达指定尺寸的时候产生一个新的文件 ###
log4j.appender.ROLLING_FILE=org.apache.log4j.RollingFileAppender 
log4j.appender.ROLLING_FILE.Threshold=ERROR 
log4j.appender.ROLLING_FILE.File=rolling.log 
log4j.appender.ROLLING_FILE.Append=true 
log4j.appender.ROLLING_FILE.MaxFileSize=10KB 
log4j.appender.ROLLING_FILE.MaxBackupIndex=1 
log4j.appender.ROLLING_FILE.layout=org.apache.log4j.PatternLayout 
log4j.appender.ROLLING_FILE.layout.ConversionPattern=[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n

### 配置输出到邮件 ###
log4j.appender.MAIL=org.apache.log4j.net.SMTPAppender
log4j.appender.MAIL.Threshold=FATAL
log4j.appender.MAIL.BufferSize=10
log4j.appender.MAIL.From=chenyl@yeqiangwei.com
log4j.appender.MAIL.SMTPHost=mail.hollycrm.com
log4j.appender.MAIL.Subject=Log4J Message
log4j.appender.MAIL.To=chenyl@yeqiangwei.com
log4j.appender.MAIL.layout=org.apache.log4j.PatternLayout
log4j.appender.MAIL.layout.ConversionPattern=[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n

使用:

        public static  String sparkHome = ".";

        public  static String appName = "sparkTest";

        public static  String master = "local[6]";
        //spark sql use
        @Test
        public  void testSparkSql() throws AnalysisException{
            System.out.println("start --------");
            SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
            SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); 
            JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
            System.out.println(" --------");
            
            
            Tuple2<List,List> predata = predata();
            List userlist =  predata._1();
            List addresslist = predata._2();
            //list to rdd
            JavaRDD userrdd = javaSparkContext.parallelize(userlist);
            JavaRDD addressrdd = javaSparkContext.parallelize(addresslist);
            //convert DataFrame
            Dataset<Row> userdf = spark.createDataFrame(userrdd, User.class);
            Dataset<Row> addressdf = spark.createDataFrame(userrdd, Address.class);
            //create views
            userdf.createTempView("user");
            addressdf.createTempView("address");
//            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMM")
            // 注册udf,把时间戳转换成年月格式
//            spark.udf.register("changeDate", (x)->simpleDateFormat.format( new Date(x * 1000L)).toInt() )
            //query by sql
            //暂时为解决多表关联的case
//            Row row = RowFactory.create("odd", i);
//            Dataset<Row> select_uid_from_user = spark.sql("SELECT us.uid,us.name,ad.addr FROM user as us left JOIN address as  ad ON us.addr=ad.addr");
            Dataset<Row> select_uid_from_user = spark.sql("select uid,name,age from user");
            select_uid_from_user.show(100);
            javaSparkContext.close();
            spark.close();
            System.out.println("end --------");
        }
原文地址:https://www.cnblogs.com/lshan/p/12842519.html