Kafka工具类(Scala)

1、配置文件config.properties

# Kafka配置
kafka.broker.list = hadoop300:9092,hadoop301:9092,hadoop302:9092
# Redis配置
redis.host=hadoop300
redis.port=6379

2、读取Properties

package com.duoduo.realtime.utils

import java.io.InputStreamReader
import java.util.Properties

/**
 * Author z
 * Date 2020-08-27 10:04:21
 */
object PropertiesUtil {
  def main(args: Array[String]): Unit = {
    val properties: Properties = PropertiesUtil.load("config.properties")
    
    println(properties.getProperty("kafka.broker.list"))
  }
  def load(propertiesName: String) = {
    val p=new Properties()
      p.load(new InputStreamReader(
        Thread.currentThread().getContextClassLoader
          .getResourceAsStream(propertiesName)
        , "UTF-8"))
    p
  }
}
 

3、POM文件依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>dw-stream</artifactId>
        <groupId>com.duoduo</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>dw-realtime</artifactId>
    <properties>
        <spark.version>2.4.0</spark.version>
        <scala.version>2.11.8</scala.version>
        <kafka.version>1.0.0</kafka.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>



        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>

        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>


        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-spark</artifactId>
            <version>4.14.2-HBase-1.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
            <version>5.3.3</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>net.java.dev.jna</groupId>
            <artifactId>jna</artifactId>
            <version>4.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>commons-compiler</artifactId>
            <version>2.7.8</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!-- 该插件用于将Scala代码编译成class文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到maven的compile阶段 -->
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

4、工具类

 1 package com.duoduo.realtime.utils
 2 
 3 import java.util.Properties
 4 
 5 import org.apache.kafka.clients.consumer.ConsumerRecord
 6 import org.apache.kafka.common.TopicPartition
 7 import org.apache.kafka.common.serialization.StringDeserializer
 8 import org.apache.spark.streaming.StreamingContext
 9 import org.apache.spark.streaming.dstream.InputDStream
10 import org.apache.spark.streaming.kafka010.{ConsumerStrategies, ConsumerStrategy, KafkaUtils, LocationStrategies}
11 
12 /**
13  * Author z
14  * Date 2020-08-27 10:02:21
15  */
16 object KafkaUtil {
17   private val properties: Properties = PropertiesUtil.load("config.properties")
18   val broker_list = properties.getProperty("kafka.broker.list")
19   var kafkaParam = collection.mutable.Map(
20     "bootstrap.servers" -> broker_list, //用于初始化链接到集群的地址
21     "key.deserializer" -> classOf[StringDeserializer],
22     "value.deserializer" -> classOf[StringDeserializer],
23     //用于标识这个消费者属于哪个消费团体
24     "group.id" -> "gmall_consumer_group",
25     //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
26     //可以使用这个配置,latest自动重置偏移量为最新的偏移量
27     "auto.offset.reset" -> "latest",
28     //如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
29     //如果是false,会需要手动维护kafka偏移量
30     "enable.auto.commit" -> (false: java.lang.Boolean)
31   )
32   
33   def getKafkaStream(topic: String, ssc: StreamingContext)
34   : InputDStream[ConsumerRecord[String, String]] = {
35     val dStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils
36       .createDirectStream[String, String](
37         ssc,
38         LocationStrategies.PreferConsistent,
39         ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam)
40       )
41     dStream
42   }
43   
44   def getKafkaStream(topic: String, ssc: StreamingContext, groupid: String)
45   : InputDStream[ConsumerRecord[String, String]] = {
46     kafkaParam("group.id") = groupid
47     val dStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils
48       .createDirectStream[String, String](
49         ssc,
50         LocationStrategies.PreferConsistent,
51         ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam)
52       )
53     dStream
54   }
55   
56   def getKafkaStream(topic: String,
57                      ssc: StreamingContext,
58                      offsets: Map[TopicPartition, Long]
59                      , groupid: String)
60   : InputDStream[ConsumerRecord[String, String]] = {
61     kafkaParam("group.id") = groupid
62     val dStream: ConsumerStrategy[String, String] = ConsumerStrategies
63       .Subscribe[String, String](Array(topic), kafkaParam, offsets)
64     KafkaUtils.createDirectStream[String, String](
65       ssc,
66       LocationStrategies.PreferConsistent,
67       dStream
68     )
69   }
70 }
原文地址:https://www.cnblogs.com/hyunbar/p/13570109.html