Spark之Streaming

1. socket消息发送

import java.net.ServerSocket
import java.io.PrintWriter
import scala.collection.mutable.ListBuffer
import java.util.Random



/**
 * Created by zzy on 8/28/15.
 */

/**
 * 模拟socket消息发送
 */
object SparkSoketSender {

  def main(args: Array[String]) {
    if(args.length != 2){   //校验
      System.err.println("usage: <port> <time>")   //端口     时间(毫秒)
      System.exit(1)
    }

    val listener = new ServerSocket(args(0).toInt)
    while(true){
      val socket = listener.accept()
      new Thread(){
        override def run = {
          println("find connected from : " + socket.getInetAddress())
          val out = new PrintWriter(socket.getOutputStream(),true)
          while(true){
            Thread.sleep(args(1).toLong)
            val context = createContext(index)
            println(context)
            out.write(context + "
")
            out.flush()
          }
          socket.close()
        }
      }.start()
    }
  }

  def createContext(index:Int) :String= {  //发送的内容
  val charList = ListBuffer[Char]()
    for( i <- 65 to 90){
      charList += i.toChar   // A B C D E F
    }
    var arr = charList.toArray
    arr(index).toString
  }

  def index = {  //产生一个随机数
  val num = new Random
    num.nextInt(5)  // 0 1 2 3 4 5
  }

}
2.
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.PairDStreamFunctions
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
/**
 * Created by zzy on 8/28/15.
 */
object SparkStreaming {
  def main(args: Array[String]) {
    if (args.length != 3) {
      System.err.println("usage: <hostname> <port> <seconds>")  //socket发送的机器   socket消息发送的端口    时间片
      System.exit(1)
    }
    val  ssc = new StreamingContext(new SparkConf,Seconds(args(2).toInt))


  //输入源  可以有很多种
  val lines =  ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_ONLY_SER)  //网络数据存两份

  val words = lines.flatMap(_.split(" "))    //返回DStream checkpoint(interval)必须指定时间

  //时间间隔操作
  val wc = words.map((_,1)).reduceByKey(_+_)

  /*window操作*/
  //    val wc = words.map((_,1)).reduceByKeyAndWindow(_+_, _-_, windowDuration, slideDuration, numPartitions, filterFunc)

  /*    //带状态的操作,使用updateStateByKey
       val sDstream = words.map((_,1)).updateStateByKey(updateFunc)  //传入保存状态函数
       val updateFunc = (currValues: Seq[Int], state: Option[Int]) => {
       val currentCount = currValues.foldLeft(0)(_ + _)
          // 已累加的结果值
       val previousCount = state.getOrElse(0)
            // 返回累加后的结果,是一个Option[Int]类型
       Some(currentCount + previousCount)
       }*/

  wc.print()
  ssc.start()
  ssc.awaitTermination()
}

}

3.提交任务

Streaming

  spark-submit --class cn.crxy.SocketSender original-testSpark-1.0-SNAPSHOT.jar 2015 1000

   spark-submit --class cn.crxy.SparkStreaming original-testSpark-1.0-SNAPSHOT.jar crxy164 2015 10

原文地址:https://www.cnblogs.com/chaoren399/p/4768018.html