对象存储组件:腾讯对象存储

参考 腾讯官方文档

from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
from sts.sts import Sts
from untitled import settings
from qcloud_cos.cos_exception import CosServiceError

def create_client():
    secret_id = settings.tencent_secret_id      # 替换为用户的 secretId
    secret_key = settings.tencent_secret_key      # 替换为用户的 secretKey
    region = settings.region    # 替换为用户的 Region
    token = None                # 使用临时密钥需要传入 Token,默认为空,可不填
    scheme = 'https'            # 指定使用 http/https 协议来访问 COS,默认为 https,可不填
    config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
    # 2. 获取客户端对象
    client = CosS3Client(config)
    return client

#3.创建桶
def create_bucket(bucket_name):

    create_client().create_bucket(
        Bucket=bucket_name,
        ACL="public-read"
    )
    cors_config = {
        'CORSRule': [
            {
                'AllowedOrigin': '*',
                'AllowedMethod': ['GET', 'PUT', 'HEAD', 'POST', 'DELETE'],
                'AllowedHeader': "*",
                'ExposeHeader': "*",
                'MaxAgeSeconds': 500,
            }
        ]
    }
    create_client().put_bucket_cors(
        Bucket=bucket_name,
        CORSConfiguration=cors_config
    )


#上传文件
def upload_file(bucket,region,file,key):

    create_client().upload_file_from_buffer(
        Bucket=bucket,
        Body=file,
        Key=key,

    )
    return "https://{}.cos.{}.myqcloud.com/{}".format(bucket,region,key)

#删除文件
def delete_file(bucket,key):

    create_client().delete_object(
        Bucket=bucket,
        Key=key
    )

#删除多个文件
def delete_files(bucket,key_list):
    # Delete = {
    #     'Object': [
    #         {
    #             'Key': 'exampleobject1',
    #         },
    #         {
    #             'Key': 'exampleobject2',
    #         },
    #     ],
    #     'Quiet': 'true' | 'false'
    # }
    objects = {
        "Quiet":"true",
        "Object":key_list,
    }

    create_client().delete_objects(
        Bucket=bucket,
        Delete=objects,
    )

#删除桶
def delete_bucket(bucket):
    client = create_client()
    try:
        #找到文件并删除
        while True:
            part_objects = client.list_objects(bucket)
            # print(part_objects)

            contents = part_objects.get("Contents")
            print(contents)
            if  not contents:
                break

            #批量删除
            client.delete_objects(
                Bucket=bucket,
                Delete={
                    "Object":[{"Key":item["Key"]} for item in contents],
                    "Quiet":"true",
                }
            )

            if part_objects["IsTruncated"] == "false":
                break

        #找到碎片并删除
        while True:
            part_uploads = client.list_multipart_uploads(bucket)
            print(part_uploads)

            contents = part_uploads.get("Upload")
            if not contents:
                break

            # 批量删除
            for item in contents:
                client.abort_multipart_upload(bucket,item["Key"],item['UploadId'])

            if part_uploads["IsTruncated"] == "false":
                break

        client.delete_bucket(bucket)
    except CosServiceError as e:
        pass


#获取临时凭证

def cos_credential(bucket):
    # 生成一个临时凭证,并给前端返回
    # 1. 安装一个生成临时凭证python模块   pip install -U qcloud-python-sts
    # 2. 写代码
    # from sts.sts import Sts


    config = {
        # 临时密钥有效时长,单位是秒(30分钟=1800秒)
        'duration_seconds': 1800,
        # 固定密钥 id
        'secret_id': settings.tencent_secret_id,
        # 固定密钥 key
        'secret_key': settings.tencent_secret_key,
        # 换成你的 bucket
        'bucket': bucket,
        # 换成 bucket 所在地区
        'region': settings.region,
        # 这里改成允许的路径前缀,可以根据自己网站的用户登录态判断允许上传的具体路径
        # 例子: a.jpg 或者 a/* 或者 * (使用通配符*存在重大安全风险, 请谨慎评估使用)
        'allow_prefix': '*',
        # 密钥的权限列表。简单上传和分片需要以下的权限,其他权限列表请看 https://cloud.tencent.com/document/product/436/31923
        'allow_actions': [
            # 'name/cos:PostObject',
            # 'name/cos:DeleteObject',
            # "name/cos:UploadPart",
            # "name/cos:UploadPartCopy",
            # "name/cos:CompleteMultipartUpload",
            # "name/cos:AbortMultipartUpload",
            "*",
        ],

    }

    sts = Sts(config)
    result_dict = sts.get_credential()
    return result_dict

#检验文件
def check_file(bucket,key):
    data = create_client().head_object(
        Bucket=bucket,
        Key=key
    )
    return data
cos.py
原文地址:https://www.cnblogs.com/hude/p/12848663.html