es-09-spark集成

es和spark的集成比较简单, 直接使用内部封装的一些方法即可

版本设置说明: 

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/requirements.html

maven依赖说明: 

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html

1, maven配置: 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>xiaoniubigdata</artifactId>
        <groupId>com.wenbronk</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spark06-es</artifactId>

    <properties>
        <spark.version>2.3.1</spark.version>
        <spark.scala.version>2.11</spark.scala.version>
        <scala.version>2.11.12</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${spark.scala.version}</artifactId>
            <version>${spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${spark.scala.version}</artifactId>
            <version>${spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${spark.scala.version}</artifactId>
            <version>${spark.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>6.3.2</version>
        </dependency>


    </dependencies>

    <build>

        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-deploy-plugin</artifactId>
                <version>2.8.2</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

2, RDD的使用

1), read

package com.wenbronk.spark.es.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 从es中读取数据
  */
object ReadMain {

  def main(args: Array[String]) = {
//    val sparkconf = new SparkConf().setAppName("read-es").setMaster("local[4]")
//    val spark = new SparkContext(sparkconf)

    val sparkSession = SparkSession.builder()
      .appName("read-es-rdd")
      .master("local[4]")
      .config("es.index.auto.create", true)
      .config("es.nodes", "10.124.147.22")
      .config("es.port", 9200)
      .getOrCreate()

    val spark = sparkSession.sparkContext

    // 自定义query, 导入es包
    import org.elasticsearch.spark._
    // 以array方式读取
    val esreadRdd: RDD[(String, collection.Map[String, AnyRef])] = spark.esRDD("macsearch_fileds/mac",
      """
        |{
        |  "query": {
        |    "match_all": {}
        |  }
        |}
      """.stripMargin)

    val value: RDD[(Option[AnyRef], Int)] = esreadRdd.map(_._2.get("mac")).map(mac => (mac, 1)).reduceByKey(_ + _)
      .sortBy(_._2)

    val tuples: Array[(Option[AnyRef], Int)] = value.collect()

    tuples.foreach(println)

    esreadRdd.saveAsTextFile("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/json")

    sparkSession.close()
  }

}

2, readJson

package com.wenbronk.spark.es.rdd

import org.apache.spark.sql.SparkSession

import scala.util.parsing.json.JSON

object ReadJsonMain {

  def main(args: Array[String]): Unit = {

    val sparkSession = SparkSession.builder()
      .appName("read-es-rdd")
      .master("local[4]")
      .config("es.index.auto.create", true)
      .config("es.nodes", "10.124.147.22")
      .config("es.port", 9200)
      .getOrCreate()

    val spark = sparkSession.sparkContext

    // 使用json的方式读取, 带查询的
    import org.elasticsearch.spark._
    val esJsonRdd = spark.esJsonRDD("macsearch_fileds/mac",
      """
      {
        "query": {
          "match_all": {}
        }
      }
      """.stripMargin)

    esJsonRdd.map(_._2).saveAsTextFile("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/json")

    sparkSession.close()
  }
}

3, write

package com.wenbronk.spark.es.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark.rdd.EsSpark

object WriteMain {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[4]")
      .appName("write-spark-es")
      .config("es.index.auto.create", true)
      .config("es.nodes", "10.124.147.22")
      .config("es.port", 9200)
      .getOrCreate()

    val df: RDD[String] = spark.sparkContext.textFile("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/json")

//    df.map(_.substring())

    import org.elasticsearch.spark._
//    df.rdd.saveToEs("spark/docs")
//    EsSpark.saveToEs(df, "spark/docs")
    EsSpark.saveJsonToEs(df, "spark/json")

    spark.close()
  }

}

4, 写入多个index中

package com.wenbronk.spark.es.rdd

import org.apache.spark.sql.SparkSession

object WriteMultiIndex {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[4]")
      .appName("es-spark-multiindex")
      .config("es.es.index.auto.create", true)
      .config("es.nodes", "10.124.147.22")
      .config("es.port", 9200)
      .getOrCreate()

    val sc = spark.sparkContext

    val game = Map("media_type"->"game","title" -> "FF VI","year" -> "1994")
    val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010")
    val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien")

    import org.elasticsearch.spark._
    // 可以自定义自己的metadata, 只添加id
    sc.makeRDD(Seq((1, game), (2, book), (3, cd))).saveToEs("my-collection-{media_type}/doc")

    spark.close()

  }

}

2, streaming

1), write

package com.wenbronk.spark.es.stream

import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.spark.rdd.EsSpark
import org.elasticsearch.spark.streaming.EsSparkStreaming

import scala.collection.mutable

object WriteStreamingMain {

  def main (args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("es-spark-streaming-write").setMaster("local[4]")
      conf.set("es.index.auto.create", "true")
      conf.set("es.nodes", "10.124.147.22")
      // 默认端口9200, 不知道怎么设置 Int类型

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(1))

    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

    val rdd = sc.makeRDD(Seq(numbers, airports))
    val microbatches = mutable.Queue(rdd)

    val dstream: InputDStream[Map[String, Any]] = ssc.queueStream(microbatches)

//    import org.elasticsearch.spark.streaming._
//    dstream.saveToEs("sparkstreaming/doc")

//    EsSparkStreaming.saveToEs(dstream, "sparkstreaming/doc")

    // 带有id的
//    EsSparkStreaming.saveToEs(dstream, "spark/docs", Map("es.mapping.id" -> "id"))

    // json格式
    EsSparkStreaming.saveJsonToEs(dstream, "sparkstreaming/json")

    ssc.start()
    ssc.awaitTermination()

  }

}

2, 写入带有meta的, rdd也是用

package com.wenbronk.spark.es.stream

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object WriteStreamMeta {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("es-spark-streaming-write").setMaster("local[4]")
    conf.set("es.index.auto.create", "true")
    conf.set("es.nodes", "10.124.147.22")
    // 默认端口9200, 不知道怎么设置 Int类型

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(1))

    val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
    val muc = Map("iata" -> "MUC", "name" -> "Munich")
    val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

    val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))
    val microbatches = mutable.Queue(airportsRDD)

    import org.elasticsearch.spark.streaming._
    ssc.queueStream(microbatches).saveToEsWithMeta("airports/2015")

    ssc.start()
    ssc.awaitTermination()
  }


  /**
    * 使用多种meta
    */
  def main1(args: Array[String]): Unit = {
    val ID = "id";
    val TTL = "ttl"
    val VERSION = "version"

    val conf = new SparkConf().setAppName("es-spark-streaming-write").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(1))

    val otp = Map("iata" -> "OTP", "name" -> "Otopeni")
    val muc = Map("iata" -> "MUC", "name" -> "Munich")
    val sfo = Map("iata" -> "SFO", "name" -> "San Fran")

    // 定义meta 不需要一对一对应
    val otpMeta = Map(ID -> 1, TTL -> "3h")
    val mucMeta = Map(ID -> 2, VERSION -> "23")
    val sfoMeta = Map(ID -> 3)

    val airportsRDD = sc.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo)))
    val microbatches = mutable.Queue(airportsRDD)

    import org.elasticsearch.spark.streaming._
    ssc.queueStream(microbatches).saveToEsWithMeta("airports/2015")
    ssc.start()
    ssc.awaitTermination()
  }

}

3, sql的使用

1), read

package com.wenbronk.spark.es.sql

import org.apache.spark.sql.{DataFrame, SparkSession}

object ESSqlReadMain {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[4]")
      .appName("es-sql-read")
      .config("es.index.auto.create", true)
      // 转换sql为es的DSL
      .config("pushown", true)
      .config("es.nodes", "10.124.147.22")
      .config("es.port", 9200)
      .getOrCreate()

    // 完全查询
//    val df: DataFrame = spark.read.format("es").load("macsearch_fileds/mac")
    import org.elasticsearch.spark.sql._
    val df = spark.esDF("macsearch_fileds/mac",
      """
        |{
        |   "query": {
        |     "match_all": {
        |   }
        |}
      """.stripMargin)

    // 显示下数据
    df.printSchema()
    df.createOrReplaceTempView("macseach_fileds")

    val dfSql: DataFrame = spark.sql(
      """
        select
         mac,
         count(mac) con
        from macseach_fileds
        group by mac
        order by con desc
      """.stripMargin)

    dfSql.show()

    // 存入本地文件中
    import spark.implicits._
    df.write.json("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/sql/json")

    spark.stop()
  }

}

2), write

package com.wenbronk.spark.es.sql

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.elasticsearch.spark.sql.EsSparkSQL

object ESSqlWriteMain {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[4]")
      .appName("es-sql-write")
      .config("es.index.auto.create", true)
      .config("es.nodes", "10.124.147.22")
      .config("es.port", 9200)
      .getOrCreate()

    import spark.implicits._
    val df: DataFrame = spark.read.format("json").load("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/sql/json")

    df.show()

    // json格式直接写入
//    import org.elasticsearch.spark.sql._
//    df.saveToEs("spark/people")

    EsSparkSQL.saveToEs(df, "spark/people")

    spark.close()
  }

}

4, structStream

对 结构化流不太熟悉, 等熟悉了在看

package com.wenbronk.spark.es.structstream

import org.apache.spark.sql.SparkSession

object StructStreamWriteMain {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("structstream-es-write")
      .master("local[4]")
      .config("es.index.auto.create", true)
      .config("es.nodes", "10.124.147.22")
      .config("es.port", 9200)
      .getOrCreate()

    val df = spark.readStream
        .format("json")
      .load("/Users/bronkwen/work/IdeaProjects/xiaoniubigdata/spark06-es/target/json")


    df.writeStream
        .option("checkpointLocation", "/save/location")
        .format("es")
        .start()

    spark.close()
  }

}
原文地址:https://www.cnblogs.com/wenbronk/p/9406669.html