Spark高级数据分析——纽约出租车轨迹的空间和时间数据分析

Spark高级数据分析——纽约出租车轨迹的空间和时间数据分析

原文地址:https://www.jianshu.com/p/eb6f3e0c09b5
作者:IIGEOywq

一、地理空间分析:

object RunGeoTime extends Serializable {

  val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ENGLISH)

  def main(args: Array[String]): Unit = {

    /*--------------1.初始化SparkContext-------------------*/
    val sc = new SparkContext(new SparkConf().setAppName("SpaceGeo"))

    /*--------------2.读取HDFS数据-------------------*/
    val taxiRaw = sc.textFile("hdfs://master:9000/taxidata")

    /*--------------3.出租车数据预处理------------------*/
    //3.1 利用自定义的safe函数处理原始数据
    val safeParse = safe(parse)
    val taxiParsed = taxiRaw.map(safeParse)
    //taxiParsed数据持久化
    taxiParsed.cache()

    //查看非法数据
   /* val taxiBad = taxiParsed.collect({
      case t if t.isRight => t.right.get
    })*/

    //collect返回到驱动器,为了单机开发和测试使用,不建议集群使用
    //taxiBad.collect().foreach(println)


    /*val taxiGood = taxiParsed.collect({
      case t if t.isLeft => t.left.get
    })
    taxiGood.cache()*/

    //3.2 剔除非法数据结果,获得正确格式的数据
    val taxiGood=taxiParsed.filter(_.isLeft).map(_.left.get)
    taxiGood.cache()

    //自定义一次打车的乘坐时间函数
    def hours(trip: Trip): Long = {
      val d = new Duration(trip.pickupTime, trip.dropoffTime)
      d.getStandardHours
    }
    //3.3 打印统计乘客上下车时间的记录,打印结果如执行分析结果图中的1
    taxiGood.values.map(hours).countByValue().toList.sorted.foreach(println)
    taxiParsed.unpersist()

    //根据上面的输出结果,统计一次乘车时间大于0小于3小时的记录
    val taxiClean = taxiGood.filter {
      case (lic, trip) => {
        val hrs = hours(trip)
        0 <= hrs && hrs < 3
      }
    }

    /*--------------4.出租车数据空间分析------------------*/
    //4.1 获取纽约行政区划数据
    val geojson = scala.io.Source.fromURL(getClass.getResource("/nyc-boroughs.geojson")).mkString
    //转换为地理要素
    val features = geojson.parseJson.convertTo[FeatureCollection]

    val areaSortedFeatures = features.sortBy(f => {
      val borough = f("boroughCode").convertTo[Int]
      (borough, -f.geometry.area2D())
    })

    val bFeatures = sc.broadcast(areaSortedFeatures)
    //4.2 判断乘客下车点落在那个行政区
    def borough(trip: Trip): Option[String] = {
      val feature: Option[Feature] = bFeatures.value.find(f => {
        f.geometry.contains(trip.dropoffLoc)
      })
      feature.map(f => {
        f("borough").convertTo[String]
      })
    }
    //4.3 第一次统计打印各行政区下车点的记录,打印结果如执行分析结果图中的2
    taxiClean.values.map(borough).countByValue().foreach(println)

    
    //4.4 剔除起点和终点数据缺失的数据
    def hasZero(trip: Trip): Boolean = {
      val zero = new Point(0.0, 0.0)
      (zero.equals(trip.pickupLoc) || zero.equals(trip.dropoffLoc))
    }

    val taxiDone = taxiClean.filter {
      case (lic, trip) => !hasZero(trip)
    }.cache()

    //4.5 踢出零点数据后统计打印各行政区下车点的记录,打印结果如执行分析结果图中的3
    taxiDone.values.map(borough).countByValue().foreach(println)
    taxiGood.unpersist()

    //输出地理空间分析结果到HDFS
    //taxiDone.saveAsTextFile("hdfs://master:9000/GeoResult")

  }

  //字符串转double
  def point(longitude: String, latitude: String): Point = {
    new Point(longitude.toDouble, latitude.toDouble)
  }

  //获取taxiraw RDD记录中的出租车司机驾照和Trip对象
  def parse(line: String): (String, Trip) = {
    val fields = line.split(',')
    val license = fields(1)
    // Not thread-safe:
    val formatterCopy = formatter.clone().asInstanceOf[SimpleDateFormat]
    val pickupTime = new DateTime(formatterCopy.parse(fields(5)))
    val dropoffTime = new DateTime(formatterCopy.parse(fields(6)))
    val pickupLoc = point(fields(10), fields(11))
    val dropoffLoc = point(fields(12), fields(13))

    val trip = Trip(pickupTime, dropoffTime, pickupLoc, dropoffLoc)
    (license, trip)
  }

  //非法记录数据处理函数
  def safe[S, T](f: S => T): S => Either[T, (S, Exception)] = {
    new Function[S, Either[T, (S, Exception)]] with Serializable {
      def apply(s: S): Either[T, (S, Exception)] = {
        try {
          Left(f(s))
        } catch {
          case e: Exception => Right((s, e))
        }
      }
    }
  }

}

二、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.cloudera.datascience.geotime</groupId>
  <artifactId>ch08-geotime</artifactId>
  <packaging>jar</packaging>
  <name>Temporal and Geospatial Analysis</name>
  <version>2.0.0</version>

  <dependencies>
   <!--注意 scala版本对应spark集群中scala的版本,provided属性要加上 -->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.8</version>
      <scope>provided</scope>
    </dependency>
    <!--注意 hadoop版本对应spark集群中hadoop的版本,provided属性要加上 -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.7.3</version>
      <scope>provided</scope>
    </dependency>
    <!--注意 spark版本对应spark集群中spark的版本,2.11是对应的scala版本 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.0.1</version>
      <scope>provided</scope>
    </dependency>
    <!--nscala-time时间处理库,2.11是对应的scala版本 -->
    <dependency>
      <groupId>com.github.nscala-time</groupId>
      <artifactId>nscala-time_2.11</artifactId>
      <version>1.8.0</version>
    </dependency>
    <!--esri空间关系库,2.11是对应的scala版本 -->
    <dependency>
      <groupId>com.esri.geometry</groupId>
      <artifactId>esri-geometry-api</artifactId>
      <version>1.2.1</version>
    </dependency>
    <dependency>
      <groupId>io.spray</groupId>
      <artifactId>spray-json_2.11</artifactId>
      <version>1.3.2</version>
    </dependency>
    <dependency>
      <groupId>joda-time</groupId>
      <artifactId>joda-time</artifactId>
      <version>2.9.4</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
     <!--scala-maven插件必须加上,否则打包后无主程序 -->
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.2</version>
        <configuration>
          <scalaVersion>2.11.8</scalaVersion>
          <scalaCompatVersion>2.11.8</scalaCompatVersion>
          <args>
            <arg>-unchecked</arg>
            <arg>-deprecation</arg>
            <arg>-feature</arg>
          </args>
          <javacArgs>
            <javacArg>-source</javacArg>
            <javacArg>1.8.0</javacArg>
            <javacArg>-target</javacArg>
            <javacArg>1.8.0</javacArg>
          </javacArgs>
        </configuration>
        <executions>
          <execution>
            <phase>compile</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
       <!--maven-assembly插件可以打包应用的依赖包 -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.6</version>
        <configuration>
          <archive>
            <manifest>
              <mainClass>com.cloudera.datascience.geotime.RunGeoTime</mainClass>
            </manifest>
          </archive>
            <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
          <recompressZippedFiles>false</recompressZippedFiles>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id> <!-- 用于maven继承项目的聚合 -->
            <phase>package</phase> <!-- 绑定到package阶段 -->
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

</project>

原文地址:https://www.cnblogs.com/aixing/p/13327379.html