python操作hdfs

python操作hdfs

  • 下载hdfs

    pip install hdfs
    
  • 代码示例

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    """
    # Author Xu Junkai
    # coding=utf-8
    # @Time    : 2021/1/17 13:46
    # @Site    :
    # @File    : demo1.py
    # @Software: PyCharm
    """
    from hdfs.client import Client
    
    class HdfsWork(object):
        def __init__(self, urls, root, timeout, session=False):
            self.urls = urls
            self.root = root
            self.timeout = timeout
            self.session = session
            self.client = Client(url=self.urls, root = self.root, timeout = self.timeout, session=self.session)
        def ls(self, hdfs_path):
            """
            hdfs目录下文件和文件夹
            :param hdfs_path: hdfs目录
            :return:
            """
            return self.client.list(hdfs_path, status=False)
        def mkdir(self, hdfs_path):
            """
            创建目录
            :param hdfs_path:
            :return:
            """
            # permission 755
            self.client.makedirs(hdfs_path)
    
        def rm(self,hdfs_path):
            """
            删除hdfs文件
            :param hdfs_path:
            :return:
            """
            # recursive boolean 是否递归删除
            self.client.delete(hdfs_path)
        def upload_hdfs(self, local_path, hdfs_path):
            """
            上传文件到hdfs
            :param local_path: 本地路径
            :param hdfs_path: hdfs路径
            :return:
            """
            # cleanup boolean 上传过程中发生错误,删除所有上传的文件
            self.client.upload(hdfs_path,local_path,cleanup=True)
        def download(self,hdfs_path,local_path):
            """
            从hdfs下载文件
            :param hdfs_path: hdfs路径
            :param local_path: 本地路径
            :return:
            """
            # overwrite boolean 覆盖任何现有文件或目录
            state = self.client.download(hdfs_path,local_path,overwrite=True)
            print(state)
        def status(self, hdfs_path):
            """
            获取hdfs下文件或文件夹信息
            :param hdfs_path:hdfs路径
            :return:
            """
            # print(self.client.content(hdfs_path))
            # strict boolean 不存在返回None,存在返回文件信息
            return self.client.status(hdfs_path,strict=False)
        def is_file(self, hdfs_path):
            """
            判断是文件
            :param hdfs_path:hdfs路径
            :return:
            """
            file_status = self.status(hdfs_path)
            if not file_status:
                return None
            else:
                if file_status["type"] == "FILE":
                    return True
                return False
        def is_directory(self, hdfs_path):
            """
            判断是文件夹
            :param hdfs_path:hdfs路径
            :return:
            """
            file_status = self.status(hdfs_path)
            if not file_status:
                return None
            else:
                if file_status["type"] == "DIRECTORY":
                    return True
                return False
        def mv_or_rename(self, hdfs_src_path, hdfs_dst_path):
            """
            移动或修改文件
            :param hdfs_src_path: hdfs源文件路经
            :param hdfs_dst_path: hdfs要修改路径
            :return:
            """
            self.client.rename(hdfs_src_path, hdfs_dst_path)
        def overwrite_hdfs(self, hdfs_path,data):
            """
            覆盖数据写到hdfs文件
            :param hdfs_path: hdfs路径
            :param data: 数据
            :return:
            """
            self.client.write(hdfs_path, data, overwrite=True, append=False, encoding="utf-8")
        def append_hdfs(self, hdfs_path, data):
            """
            追加数据到hdfs文件
            :param hdfs_path: hdfs路径
            :param data: 数据
            :return:
            """
            self.client.write(hdfs_path, data, overwrite=False, append=True, encoding="utf-8")
    if __name__ == '__main__':
        client = HdfsWork("http://10.0.0.134:50070/;http://10.0.0.131:50070/;http://10.0.0.132:50070/", "/", 10000, False)
        # 查看目录下文件和文件夹
        # file_path = client.ls("/")
        # print(file_path)
        # 创建文件目录
        # client.mkdir("/hdfs_test/demo_1/")
        # 将 /hdfs_test 下 demo_1删除
        # client.rm("/hdfs_test/demo_1")
        # 上传文件
        # client.upload_hdfs("./test_report_01.pdf", "/hdfs_test/demo_1/")
        # 下载文件
        # client.download("/hdfs_test/demo_1/test_report_01.pdf", "./hdfs_download/")
        # 获取文件或文件夹信息信息
        # state = client.status("/hdfs_test/demo_1/test_report_01.pdf")
        # print(state)
        # 判断是文件
        # state = client.is_file("/hdfs_test/demo_1/test_report_01.pdf")
        # print(state)
        # 移动或修改文件
        # client.mv_or_rename("/test_report_01.pdf", "/hdfs_test/demo_1/test_report_01.pdf")
        # 覆盖数据写到hdfs文件
        # client.overwrite_hdfs("/a.txt", "this is my wrire text
    ")
        # 追加数据到hdfs文件
        # client.append_hdfs("/a.txt", "this is my wrire text
    ")
    
    
  • 当创建目录报错解决方式

hdfs.util.HdfsError: Permission denied: user=dr.who, access=WRITE, inode="/test":root:supergroup:drwxr-xr-x

解决办法是:在配置文件hdfs-site.xml中加入
<property>
  <name>dfs.permissions</name>
  <value>false</value>
</property>
原文地址:https://www.cnblogs.com/xujunkai/p/14290061.html