大数据常用工具类

工具类
config.properties
# jbdc配置
jdbc.datasource.size=10
jdbc.url=jdbc:mysql://hadoop101:3306/database?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
jdbc.user=root
jdbc.password=000000

# Kafka
kafka.broker.list=hadoop101:9092,hadoop102:9092,hadoop103:9092

# Redis配置
redis.host=hadoop101
redis.port=6379

# hive 的数据库名(选配)
hive.database=database
Properties.Util
import java.io.InputStreamReader
import java.util.Properties

object PropertiesUtil {

   def load(propertieName: String): Properties = {
       val prop = new Properties();
       prop.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(propertieName),
           "UTF-8"))
       prop
  }

}
MyJdbcUtil
import com.alibaba.druid.pool.DruidDataSourceFactory
import java.sql.PreparedStatement
import java.util.Properties
import javax.sql.DataSource

object JdbcUtil {

   var dataSource: DataSource = init()

   def init() = {
       val properties = new Properties()
       val prop = PropertiesUtil.load("config.properties")

       properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
       properties.setProperty("url", prop.getProperty("jdbc.url"))
       properties.setProperty("username", prop.getProperty("jdbc.user"))
       properties.setProperty("password", prop.getProperty("jdbc.password"))
       properties.setProperty("maxActive", prop.getProperty("jdbc.datasource.size"))

       DruidDataSourceFactory.createDataSource(properties)

  }

   def executeUpdate(sql: String, params: Array[Any]): Int = { // "insert into xxx values (?,?,?)"
       var rtn = 0
       var pstmt: PreparedStatement = null
       val connection = dataSource.getConnection
       try {
           connection.setAutoCommit(false)
           pstmt = connection.prepareStatement(sql)

           if (params != null && params.length > 0) {
               for (i <- 0 until params.length) {
                   pstmt.setObject(i + 1, params(i))
              }
          }
           rtn = pstmt.executeUpdate()
           connection.commit()
      } catch {
           case e: Exception => e.printStackTrace
      }
       rtn
  }

   def executeBatchUpdate(sql: String, paramsList: Iterable[Array[Any]]): Array[Int] = {
       var rtn: Array[Int] = null
       var pstmt: PreparedStatement = null
       val connection = dataSource.getConnection
       try {
           connection.setAutoCommit(false)
           pstmt = connection.prepareStatement(sql)
           for (params <- paramsList) {
               if (params != null && params.length > 0) {
                   for (i <- 0 until params.length) {
                       pstmt.setObject(i + 1, params(i))
                  }
                   pstmt.addBatch()
              }
          }
           rtn = pstmt.executeBatch()
           connection.commit()
      } catch {
           case e: Exception => e.printStackTrace
      }
       rtn
  }

   // 测试
   def main(args: Array[String]): Unit = {
//       JdbcUtil.executeUpdate("insert into table_1 values(?,?,?,?,?)", Array("take100", "100", 100, 200,300))
       JdbcUtil.executeBatchUpdate("insert into table_1 values(?,?,?,?,?)",List(Array("take101", "100", 200, 200,200),Array("take102", "100", 300, 300,300)))
  }
}
MyRedisUtil
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

object MyRedisUtil {

   var jedisPool: JedisPool = null

   def getJedisClient: Jedis = {
       if (jedisPool == null) {
           println("开辟一个连接池")
           val prop = PropertiesUtil.load("config.properties")
           val host = prop.getProperty("redis.host")
           val port = prop.getProperty("redis.port").toInt

           val jedisPoolConfig = new JedisPoolConfig()
           jedisPoolConfig.setMaxTotal(100)  //最大连接数
           jedisPoolConfig.setMaxIdle(20)   //最大空闲
           jedisPoolConfig.setMinIdle(20)     //最小空闲
           jedisPoolConfig.setBlockWhenExhausted(true)  //忙碌时是否等待
           jedisPoolConfig.setMaxWaitMillis(500)//忙碌时等待时长 毫秒
           jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试

           jedisPool = new JedisPool(jedisPoolConfig, host, port)
      }
       println(s"jedisPool.getNumActive = ${jedisPool.getNumActive}")
       println("获得一个连接")
       jedisPool.getResource
  }
   
}
MyKafkaUitl
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object MyKafkaUtil {

   private val properties: Properties = PropertiesUtil.load("config.properties")
   val broker_list = properties.getProperty("kafka.broker.list")

   // kafka消费者配置
   val kafkaParam = Map(
       "bootstrap.servers" -> broker_list,//用于初始化链接到集群的地址
       "key.deserializer" -> classOf[StringDeserializer],
       "value.deserializer" -> classOf[StringDeserializer],
       //用于标识这个消费者属于哪个消费团体
       "group.id" -> "gmall_consumer_group",
       //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
       //可以使用这个配置,latest自动重置偏移量为最新的偏移量
       "auto.offset.reset" -> "latest",
       //如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
       //如果是false,会需要手动维护kafka偏移量
       "enable.auto.commit" -> (true: java.lang.Boolean)
  )

   // 创建DStream,返回接收到的输入数据
   // LocationStrategies:根据给定的主题和集群地址创建consumer
   // LocationStrategies.PreferConsistent:持续的在所有Executor之间分配分区
   // ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer
   // ConsumerStrategies.Subscribe:订阅一系列主题
   def getKafkaStream(topic: String, ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
       val dStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam))
       dStream
  }
}
MyEsUtil
import io.searchbox.client.config.HttpClientConfig
import io.searchbox.client.{JestClient, JestClientFactory}
import io.searchbox.core.{Bulk, BulkResult, Index}
import java.util.Objects

object MyEsUtil {
   private val ES_HOST = "http://hadoop101"
   private val ES_HTTP_PORT = 9200
   private var factory: JestClientFactory = null

   /**
     * 获取客户端
     *
     * @return jestclient
     */
   def getClient: JestClient = {
       if (factory == null) build()
       factory.getObject
  }

   /**
     * 关闭客户端
     */
   def close(client: JestClient): Unit = {
       if (!Objects.isNull(client)) try
           client.shutdownClient()
       catch {
           case e: Exception =>
               e.printStackTrace()
      }
  }

   /**
     * 建立连接
     */
   private def build(): Unit = {
       factory = new JestClientFactory
       factory.setHttpClientConfig(new HttpClientConfig.Builder(ES_HOST + ":" + ES_HTTP_PORT).multiThreaded(true)
          .maxTotalConnection(20) //连接总数
          .connTimeout(10000).readTimeout(10000).build)

  }

   // 批量插入
   def insertBulk(indexName: String, docList: List[Any]): Unit = {
       val jest: JestClient = getClient
       val bulkBuilder = new Bulk.Builder
       bulkBuilder.defaultIndex(indexName).defaultType("_ex")
       println(docList.mkString(" "))
       for (doc <- docList) {

           val index: Index = new Index.Builder(doc).build()
           bulkBuilder.addAction(index)
      }
       val result: BulkResult = jest.execute(bulkBuilder.build())
       println(s"保存es= ${result.getItems.size()} 条")
       close(jest)
  }

   // 测试
   def main(args: Array[String]): Unit = {
       val jest: JestClient = getClient
       val doc = "{ "name":"yiyi", "age": 17 }"
       val index: Index = new Index.Builder(doc).index("myesutil_test").`type`("_doc").build()
       jest.execute(index)
  }

}
 
---------------------
原文:https://blog.csdn.net/qq_31108141/article/details/88367058

原文地址:https://www.cnblogs.com/Bkxk/p/10563723.html