Idea-spark消费kafka数据写入es

1.maven配置

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>SparkToES</groupId>
  <artifactId>SparkToES</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <java.version>1.8</java.version>
    <spark.version>2.2.3</spark.version>
    <scala.version>2.11.8</scala.version>
    <elasticsearch.version>6.1.2</elasticsearch.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch-spark-20_2.11</artifactId>
      <version>${elasticsearch.version}</version>
          <scope>provided</scope>
    </dependency>
    <!-- spark end -->

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.56</version>
    </dependency>
    <dependency>
      <groupId>org.specs</groupId>
      <artifactId>specs</artifactId>
      <version>1.2.5</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <!--将所有依赖打成一个jar包-->
      <plugin >
        <artifactId >maven-assembly-plugin</artifactId >
        <configuration >
          <descriptorRefs >
            <descriptorRef >jar-with-dependencies </descriptorRef >
          </descriptorRefs >
          <archive >
            <manifest >
              <mainClass ></mainClass >
            </manifest >
          </archive >
        </configuration >
        <executions >
          <execution >
            <id >make-assembly </id >
            <phase >package </phase >
            <goals >
              <goal >single </goal >
            </goals >
          </execution >
        </executions >
      </plugin >
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>

    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>

</project>

2.简单的过滤后数据写入es的demo

package test1

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}

import com.alibaba.fastjson.JSON
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.rdd.EsSpark

object WafLogToES {

  def main(args: Array[String]): Unit = {

    val conf=new SparkConf().setMaster("yarn-client").setAppName("test").set("es.nodes", "ip1").set("es.port", "9200")

    //val conf=new SparkConf().setMaster("yarn-client").setAppName("sparktoes").set("es.nodes", "ip1").set("es.port", "9200")
    //val conf=new SparkConf().setMaster("local[3]").setAppName("scalaSpark08").set("es.nodes", "ip1").set("es.port", "9200")

    val sc=new SparkContext(conf)
    val ssc=new StreamingContext(sc,Seconds(3))

    //val topics=scala.Iterable("topic1")
    val topics=scala.Iterable("topic1")

   
    val kafkaParams=scala.collection.mutable.Map("bootstrap.servers"->"ip3,ip2,ip4",
      "group.id"->"kafkaConsumeGroup-01",
      "key.deserializer"->classOf[StringDeserializer])
    kafkaParams +=("value.deserializer"->classOf[StringDeserializer])
    kafkaParams +=("auto.offset.reset"->"latest")
    kafkaParams +=("enable.auto.commit"->true)
    kafkaParams +=("max.poll.records"->500)

    val data: InputDStream[ConsumerRecord[String, String]]=
      KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics,kafkaParams))





    data.foreachRDD{rdd=>
      val now=new Date().getTime
      val dateFormat=new SimpleDateFormat("yyyyMM")
      val cal:Calendar=Calendar.getInstance()
      cal.add(Calendar.DATE,-1)
      val date=dateFormat.format(cal.getTime)
      //val index="index_"+date+"/doc"
      //索引
      val index="index_"+date+"/doc"
      val json=rdd.map{  consumer =>
        val v1: String=consumer.value
        val obj:com.alibaba.fastjson.JSONObject=JSON.parseObject(v1)
        val logStr : String =obj.getString("message")
        val log:Array[String]=logStr.replace("<14>", "").split(";\]")
        //过滤value为空的key
        val result=log.filter{x=> x.lastIndexOf("=") != -1 && (x.indexOf("=")+1 !=x.length) }
          .map{x=>
            val resultTemp = x.split("=", 2)
            if (resultTemp.apply(0)=="CTime") {
              resultTemp(1)=resultTemp.apply(1).replace("T"," ")
            }
            if (resultTemp.apply(0)=="LastUTime"){
              resultTemp(1)=resultTemp.apply(1).replace("T"," ")
            }
            if (resultTemp.apply(1) == "[]"){
              (resultTemp.apply(0).toLowerCase(), "")
            } else {
              (resultTemp.apply(0).toLowerCase, resultTemp.apply(1))
            }
          }.toMap
          result
      }.filter(x=>x.contains("a") && x.contains("b") && x.contains("c") && x.contains("d")&&x.contains("e") )
        .filter(x=>x.apply("f").matches("""d*-d*-d* d*:d*:d*""") && x.apply("f")!="")
        .filter(x=>x.apply("g").length>0 && x.apply("h").length>0 && x.apply("i").length>0 && x.apply("j").length>0)
        .map{x=>
          import org.json4s.JsonDSL._
          import org.json4s.NoTypeHints
          import org.json4s.jackson.JsonMethods._
          import org.json4s.jackson.Serialization
          implicit val formats = Serialization.formats(NoTypeHints)
          val resultjson = compact(render(x))

          resultjson
        }
      EsSpark.saveJsonToEs(json, index)
    }
    ssc.start()
    ssc.awaitTermination()
  }
}

3.spark执行命令

export HADOOP_USER_NAME=队列1
export JAVA_HOME=/application/jdk1.8.0_111
export SPARK_HOME=/application/spark-2.2.1
export SPARK_CONF_DIR=/application/spark-2.2.1/conf

spark-submit 
--master yarn 
--deploy-mode client 
--driver-memory 2G 
--executor-memory 4G 
--executor-cores 2 
--num-executors 6 
--queue queue_8018_01 
--conf spark.serialize="org.apache.spark.serializer.KryoSerializer" 
--conf spark.yarn.executor.memoryOverhead=3096 
--conf spark.yarn.am.memory=2048 
--conf "spark.executorEnv.JAVA_HOME=/application/jdk1.8.0_111" 
--conf "spark.yarn.appMasterEnv.JAVA_HOME=/application/jdk1.8.0_111" 
--conf "spark.yarn.appMasterEnv.SPARK_HOME"="/application/spark-2.2.1" 
--conf "spark.yarn.appMasterEnv.SPARK_CONF_DIR"="/application/spark-2.2.1-config"  
--conf "spark.executorEnv.SPARK_HOME"="/application/spark-2.2.1"
--conf "spark.executorEnv.SPARK_CONF_DIR"="/application/spark-2.2.1-config"
--conf "spark.executorEnv.SCALA_HOME=/application/scala-2.11"
--conf "spark.yarn.appMasterEnv.SCALA_HOME=/application/scala-2.11"
--conf spark.executor.extraJavaOptions="-XX:InitiatingHeapOccupancyPercent=30 -XX:G1HeapRegionSize=16m"
--class com.test.SparkToES
--jars /application/spark_es/jars/spark-streaming-kafka-0-10_2.11-2.2.1.jar,
/application/spark_es/jars/elasticsearch-spark-20_2.11-6.0.1.jar,
/application/spark_es/jars/fastjson-1.2.56.jar,/application/jars/kafka-clients-2.0.1.jar
/application/waf_spark_es_001/ARS.jar 

  

原文地址:https://www.cnblogs.com/xinyumuhe/p/12186015.html