1.集群启动
集群共三个节点,min01,min02,min03
网络启动(桥接) : 关闭桥接2,只留桥接三
在 min01 节点 ,sh start.sh 启动 hdfs,yarn ,spark
sh zk.sh 启动zk集群(三台同时操作)
sh kafka.sh 启动kafka集群,三台同时操作,
简单的kafka脚本 /root/apps/kafka_2.11-0.11.0.2/bin/kafka-console-producer.sh --broker-list min01:9092,min02:9092,min03:9092 --topic spark_streaming_test 生产消息
/root/apps/kafka_2.11-0.11.0.2/bin/kafka-console-consumer.sh --zookeeper min01:2181,min02:2181,min03:2181 --topic spark_streaming_test --from-beginning
接受消息
/root/apps/kafka_2.11-0.11.0.2/bin/kafka-topics.sh --zookeeper min01:2181,min02:2181,min03:2181 --topic sst --create --replication-factor 2 --partitions 3
创建主题
工具类-zk /** * @Author: 唐 * @Date: 2020/3/25 20:19 */ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkMarshallingError import org.I0Itec.zkclient.serialize.ZkSerializer object ZKUtil { def initZKClient(zkServers : String,sessionTimeout : Int,connectionTimeout: Int): ZkClient ={ new ZkClient(zkServers,sessionTimeout,connectionTimeout,new ZkSerializer { override def serialize(data: scala.Any): Array[Byte] = { try { data.toString.getBytes("UTF-8") } catch { case _: ZkMarshallingError => null } } override def deserialize(bytes: Array[Byte]): AnyRef = { try{ new String(bytes,"UTF-8") }catch { case _: ZkMarshallingError => null } } }) } }
代码工具类-数据连接池 /** * @Author: 唐 * @Date: 2020/3/25 23:28 */ import java.sql.{Connection,DriverManager} import java.util.concurrent.ConcurrentLinkedDeque object ConnectionPool { private var queue: ConcurrentLinkedDeque[Connection] = _ Class.forName("com.mysql.jdbc.Driver") def getConnection()={ if (queue == null) queue=new ConcurrentLinkedDeque[Connection]() if (queue.isEmpty){ for (i<- 1 to 10){ val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/syllabus?useUnicode=true&characterEncoding=utf8", "root", "123456") conn.setAutoCommit(false) queue.offer(conn) } } queue.poll() } def returnConnection(conn:Connection)={ queue.offer(conn) } }
代码逻辑
/** * @Author: 唐松怀 * @Date: 2020/3/25 22:32 */ /** * @Author: 唐松怀 * @Date: 2020/3/25 20:12 */ import java.sql.{DriverManager, PreparedStatement} import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, TopicMetadataRequest} import kafka.common.TopicAndPartition import kafka.consumer.SimpleConsumer import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Durations, Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable import scala.util.{Success, Try} object test01 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local[*]") .setAppName("Chapter8_4_5") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Durations.seconds(10)) val topics = Set("spark_streaming_test") val kafkaParams = mutable.Map[String, String]() kafkaParams.put("bootstrap.servers", "min01:9092,min02:9092,min03:9092") kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") kafkaParams.put("session.timeout.ms", "30000") kafkaParams.put("enable.auto.commit", "false") kafkaParams.put("max.poll.records", "100") kafkaParams.put("kafka.topics", "spark_streaming_test") kafkaParams.put("group.id", "g_spark_test") val zkHost = "min01:2181,min02:2181,min03:2181" val sessionTimeout = 120000 val connectionTimeout = 60000 val zkClient = ZKUtil.initZKClient(zkHost, sessionTimeout, connectionTimeout) val zkTopic = "spark_streaming_test" val zkConsumerGroupId = "g_spark_test" val zkTopicDir = new ZKGroupTopicDirs(zkConsumerGroupId, zkTopic) val zkTopicPath = zkTopicDir.consumerOffsetDir val childrenCount = zkClient.countChildren(zkTopicPath) var kafkaStream: InputDStream[(String, String)] = null var fromOffsets: Map[TopicAndPartition, Long] = Map() kafkaStream = if (childrenCount > 0) { val req = new TopicMetadataRequest(topics.toList, 0) val leaderConsumer = new SimpleConsumer("min01", 9092, 10000, 10000, "StreamingOffsetObserver") val res = leaderConsumer.send(req) val topicMetaOption = res.topicsMetadata.headOption val partitions = topicMetaOption match { case Some(tm) => tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String] case None => Map[Int, String]() } for (partition <- 0 until childrenCount) { val partitionOffset = zkClient.readData[String](zkTopicPath + "/" + partition) val tp = TopicAndPartition(kafkaParams("kafka.topics"), partition) val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))) val consumerMin = new SimpleConsumer(partitions(partition), 9092, 10000, 10000, "getMinOffset") val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets var nextOffset = partitionOffset.toLong if (curOffsets.nonEmpty && nextOffset < curOffsets.head) { nextOffset = curOffsets.head } fromOffsets += (tp -> nextOffset) } val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.key, mam.message) KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams.toMap, fromOffsets, messageHandler) } else { KafkaUtils.createDirectStream[ String, String, StringDecoder, StringDecoder](ssc, kafkaParams.toMap, topics) } var offsetRanges: Array[OffsetRange] = null val kafkaInputDStream = kafkaStream.transform { rdd => { offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd } } val kafkaValues = kafkaInputDStream.map(_._2) val kafkaSplits = kafkaValues.map(_.split(",")).filter(_.length == 4) val results =kafkaSplits.map(_.mkString(",")) results.foreachRDD(rdd => { //在Driver端执行 rdd.foreachPartition(p => { //在Worker端执行 //如果将输出结果保存到某个数据库,可在此处实例化数据库的连接器 p.foreach(result => { val car = result.split(",")(0) val longitude = result.split(",")(1) val latitude = result.split(",")(2) val timestamp = result.split(",")(3) val conn=ConnectionPool.getConnection() val sql = "INSERT INTO syllabus.t_car_position (plate_num,longitude,latitude,timestamp ) values (?,?,?,? )" val statement: PreparedStatement = conn.prepareStatement(sql) statement.setString(1,car) statement.setString(2,longitude) statement.setString(3,latitude) statement.setString(4,timestamp) statement.addBatch() statement.executeBatch() conn.commit() ConnectionPool.returnConnection(conn) println(result) }) }) //ZkUtils不可序列化,所以需要在Driver端执行 for (o <- offsetRanges) { ZkUtils.updatePersistentPath(zkClient, zkTopicDir.consumerOffsetDir + "/" + { o.partition }, o.fromOffset.toString) println("本次消息消费成功后,偏移量状态:" + o) } }) ssc.start() ssc.awaitTermination() } }
mysql INSERT INTO syllabus.t_car_position (plate_num,longitude,latitude,timestamp) values (20,23.3,65.0,56); 相关信息
消费测试数据
京A12345,1144444444446.46,3555555555559.92,15550666666666666731094000 京A666,886.46,99.93,15550731094001 京A90,1991.46,696.93,15550731094003 A12345,116.46,39.93,15550731094003 N,45,89,100 A,1,2,3,4 B,3,4,5,6 C,5,6,7,8 D,7,8,9,4