spark本地读取写入s3文件

1.关于S3,S3N和S3A的区别与联系(wiki:https://wiki.apache.org/hadoop/AmazonS3)

S3 Native FileSystem (URI scheme: s3n) A native filesystem for reading and writing regular files on S3. The advantage of this filesystem is that you can access files on S3 that were written with other tools. Conversely, other tools can access files written using Hadoop. The disadvantage is the 5GB limit on file size imposed by S3.


S3A (URI scheme: s3a) A successor to the S3 Native, s3n fs, the S3a: system uses Amazon's libraries to interact with S3. This allows S3a to support larger files (no more 5GB limit), higher performance operations and more. The filesystem is intended to be a replacement for/successor to S3 Native: all objects accessible from s3n:// URLs should also be accessible from s3a simply by replacing the URL schema.


S3 Block FileSystem (URI scheme: s3) A block-based filesystem backed by S3. Files are stored as blocks, just like they are in HDFS. This permits efficient implementation of renames. This filesystem requires you to dedicate a bucket for the filesystem - you should not use an existing bucket containing files, or write other files to the same bucket. The files stored by this filesystem can be larger than 5GB, but they are not interoperable with other S3 tools.

2.如何选择S3访问协议

由上面介绍可知,首先是三种协议的访问大小有区别;其次S3是block-based,s3n/s3a是object-based,最后S3A是apache推荐的访问方式,且S3访问方式将会慢慢被替代,AWS不赞成使用S3访问,且S3A更加稳定安全高效,需要注意的是hadoop2.6版本对于S3A支持有bug,所以推荐使用hadoop2.7.x使用s3a协议访问

3.关于jar包的选择

在Hadoop当中,访问S3文件,需要导入aws-sdk包,这个包里有个s3的子服务供Java语言访问S3,其中会调用hadoop-aws包解析协议,这个包在hadoop2.6.x版本之前是由hadoop-core维护的,因此如果使用hadoop2.4.x,里面会有一个这样的类:org.apache.hadoop.fs.s3native.NativeS3FileSystem,但是在hadoop2.6.x版本就没有了,因为Apache将访问S3的包从hadoop-core包解耦到hadoop-aws包,由AWS维护

aws有v2,v3,v4几种签名算法,而jets3t库只有0.9.4版本才支持V4,而hadoop低版本里引入的是老的jar包,因此当你使用s3n方式时最好额外引入jets3t包,而如果使用s3a就不需要了,因为s3a采用的是com.amazonaws.http.AmazonHttpClient协议,而不是jets3t

本次测试引入的jar,主要包含以下几个:


<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
        <hadoop.version>2.7.3</hadoop.version>
        <spark.pom.scope>compile</spark.pom.scope>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>${spark.pom.scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>${spark.pom.scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>${spark.pom.scope}</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>net.java.dev.jets3t</groupId>
            <artifactId>jets3t</artifactId>
            <version>0.9.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.4</version>
        </dependency>
    </dependencies>
4.踩坑
读取代码很简单,如下。需要注意这里面用了s3a协议,也是推荐的访问方式

 val rdd = spark.sparkContext.textFile("s3a://xxx/part-00000")
 println(rdd.count())
即便我本机已经配置好了aws cli,但是执行的时候依然会遇到如下问题

Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
只好在代码里配置下

    spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "")
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "")
配置好后重新执行,依然会报错:
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: xx, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: xxx
出现这个问题的原因找了半天,后来才晓得aws-sdk调用rest服务,而调用rest服务需要指定endpoint的(http://docs.aws.amazon.com/zh_cn/AmazonS3/latest/dev/VirtualHosting.html),但是aws提供的默认的endpoint是在com.amazonaws.services.s3.internal.Constants这个类的HOSTNAME值是s3.amazonaws.com,而中国区的hostname应该是s3.cn-north-1.amazonaws.com.cn。因此有两种方式可以处理,一是新建个同样包名的Constants类替换其值(不建议,代码不友好),另外一种是直接在代码的conf里设置
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-north-1.amazonaws.com.cn")
到这基本可以在本地读取s3了,但是如果放在emr环境还是有些问题,首先emr环境可以不设置aws的key和secret。其次EMR集群中的Hadoop是通过EMRFS的方式访问S3的,会把s3和s3n都转成s3,这等同于在本地Hadoop中使用s3n,而EMR环境是根本不支持s3a的(报403错误),EMR官方建议在代码中使用s3来访问文件
完整实例代码:

object SparkS3Test {
  def main(args: Array[String]) {
    val spark = SparkSession.builder()
      .master("local[*]")
      .config("spark.eventLog.enabled", "false")
      .config("spark.driver.memory", "2g")
      .config("spark.executor.memory", "2g")
      .appName("SparkDemoFromS3")
      .getOrCreate()
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "")
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "")
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-north-1.amazonaws.com.cn")
    val rdd = spark.sparkContext.textFile("s3a://xxx/part-00000")
    println(rdd.count())
  }
}

  

原文地址:https://www.cnblogs.com/cloudrivers/p/13089957.html