【第四章】封装request类

代码地址:https://gitee.com/todayisgoodday/PythonRequest
  • 什么是Request
  1. 是基于urllib库,用Python编写,采用Apache2 Licensed开源协议的http库
  2. 比urllib更加方便,可以节约大量的工作,完全满足http的测试需求
  • 安装request
pip install request

  

  • Request用法详解
# -*- coding: utf-8 -*-
# @Time : 2021/12/9 14:24
# @Author : Limusen
# @File : request_demo_05


import requests

# 先进行简单的使用

import requests

response = requests.get('http://www.baidu.com/')  # 首先调用requests的get方法

print(type(response))  # 获取返回信息的类型
print(response.status_code)  # 获取响应状态码
print(response.text)  # 不需要解码就可以直接打印信息
print(response.cookies)  # 获取网页cookies


  

  • 其他实现方式
# -*- coding: utf-8 -*-
# @Time : 2021/12/9 14:24
# @Author : Limusen
# @File : request_demo_05


import requests

# 先进行简单的使用

import requests

response = requests.get('http://www.baidu.com/')  # 首先调用requests的get方法

print(type(response))  # 获取返回信息的类型
print(response.status_code)  # 获取响应状态码
print(response.text)  # 不需要解码就可以直接打印信息
print(response.cookies)  # 获取网页cookies

# 其他实现方式
requests.post('http://httpbin.org/post')
requests.put('http://httpbin.org/put')
requests.delete('http://httpbin.org/delete')
requests.head('http://httpbin.org/get')
requests.options('http://httpbin.org/get')

需求一: 将request请求设置成类的形式,全局皆可调用

  • 示例
  • request_demo_05.py
# -*- coding: utf-8 -*-
# @Time : 2021/12/9 14:24
# @Author : Limusen
# @File : request_demo_05


import requests

# 先进行简单的使用

import requests

response = requests.get('http://www.baidu.com/')  # 首先调用requests的get方法

print(type(response))  # 获取返回信息的类型
print(response.status_code)  # 获取响应状态码
print(response.text)  # 不需要解码就可以直接打印信息
print(response.cookies)  # 获取网页cookies

# 其他实现方式
requests.post('http://httpbin.org/post')
requests.put('http://httpbin.org/put')
requests.delete('http://httpbin.org/delete')
requests.head('http://httpbin.org/get')
requests.options('http://httpbin.org/get')

# 带参数的get请求
url = 'http://www.baidu.com/s?'
params = {
    "wd" : "测试一下"
}
response = requests.get(url,params=params)
# print(response.content.decode("utf-8"))

# 带参数的post请求
r = requests.post('https://accounts.douban.com/login', data={'form_email': 'abc@example.com', 'form_password': '123456'})
# print(r.content)


  • 封装request类
  1. 在common中新增request_utils.py
  2. 封装request方法
  3. get\post方法封装,响应的结果数据处理成json格式
  4. 调用成功:{'code':0,'response_body':...}
  5. 调用失败:{'code':1,'result':'错误描述'}
# -*- coding: utf-8 -*-
# @Time : 2021/12/9 14:37
# @Author : Limusen
# @File : request_utils

import requests
from common.config_utils import local_config


class RequestsUtils:

    def __init__(self):
        # 封装好的配置文件直接拿来用
        self.hosts = local_config.get_hosts

    def get(self, request_info):
        #  request_info 是我们封装好的数据,可以直接拿来用
        url = "http://%s%s" % (self.hosts, request_info['请求地址'])
        print(url)

    def post(self, request_info):
        pass


if __name__ == '__main__':
    requests_info = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '', '用例步骤': 'step_01',
                     '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token',
                     '请求参数(get)': '{"grant_type":"client_credential","appid":"wxb637f897f0bf1f0d","secret":"501123d2d367b109a5cb9a9011d0f084"}',
                     '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token',
                     '断言类型': 'json_key', '期望结果': 'access_token,expires_in'}
    RequestsUtils().get(requests_info)


继续封装 get跟post

# -*- coding: utf-8 -*-
# @Time : 2021/12/9 14:37
# @Author : Limusen
# @File : request_utils

import json
import requests
from common.config_utils import local_config


class RequestsUtils:

    def __init__(self):
        # 封装好的配置文件直接拿来用
        self.hosts = local_config.get_hosts
        # 全局session调用
        self.session = requests.session()

    def get(self, request_info):
        #  request_info 是我们封装好的数据,可以直接拿来用
        url = "https://%s%s" % (self.hosts, request_info["请求地址"])
        response = self.session.get(url=url,
                                    params=json.loads(
                                        requests_info['请求参数(get)'] if requests_info['请求参数(get)'] else None),
                                    headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None
                                    )
        result = {
            "code": 0,
            "response_code": response.status_code,
            "response_reason": response.reason,
            "response_headers": response.headers,
            "response_body": response.text
        }
        return result

    def post(self, request_info):
        url = "https://%s%s" % (self.hosts, request_info["请求地址"])
        response = self.session.post(url=url,
                                    params=json.loads(
                                        requests_info["请求参数(get)"] if requests_info["请求参数(get)"] else None),
                                    headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None,
                                    json=json.loads(request_info["请求参数(post)"])
                                    )
        result = {
            "code": 0,
            "response_code": response.status_code,
            "response_reason": response.reason,
            "response_headers": response.headers,
            "response_body": response.text
        }
        return result


if __name__ == '__main__':
    requests_info = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '', '用例步骤': 'step_01',
                     '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token',
                     '请求参数(get)': '{"grant_type":"client_credential","appid":"wxb630bf1f0d","secret":"501123da9011d0f084"}',
                     '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token',
                     '断言类型': 'json_key', '期望结果': 'access_token,expires_in'}
    requests_info_post = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '', '用例步骤': 'step_02',
                          '接口名称': '创建标签接口', '请求方式': 'post',
                          '请求头部信息': '', '请求地址': '/cgi-bin/tags/create',
                          '请求参数(get)': '{"access_token":"51_YRaWpXWWOE5Q5JrQxD4KxnyHzrLA8UuC4HmSLlSbkpgFSYxXKbVNGPvmQV7QDfkuYujC6HqvOlhn13UEjAISgySpmkE_JRFnw02b8Tfk8Uu3C5gyLM_bvsXqX7VBnN8QZBpXBWrNeJBFR30MGYKjADAZRD"}',
                          '请求参数(post)': '{   "tag" : {     "name" : "9958qq" } } ', '取值方式': '正则取值',
                          '取值代码': '"id":(.+?),',
                          '取值变量': 'tag_id', '断言类型': 'none', '期望结果': ''}
    res = RequestsUtils()
    # res.get(requests_info)
    res.post(requests_info_post)

需求二: 一个接口自动辨别并调用get或者post请求

  • ok 那我们现在已经封装好了get跟post请求 现在有一个需求是,用一个方法 可以根据一个字段任意请求get或者post
  • 继续改造,新增一个统一调度的接口
  • request_utils.py
# -*- coding: utf-8 -*-
# @Time : 2021/12/9 14:37
# @Author : Limusen
# @File : request_utils

import json
import requests
from common.config_utils import local_config


class RequestsUtils:

    def __init__(self):
        # 封装好的配置文件直接拿来用
        self.hosts = local_config.get_hosts
        # 全局session调用
        self.session = requests.session()

    def get(self, request_info):
        #  request_info 是我们封装好的数据,可以直接拿来用
        url = "https://%s%s" % (self.hosts, request_info["请求地址"])
        response = self.session.get(url=url,
                                    params=json.loads(
                                        requests_info['请求参数(get)'] if requests_info['请求参数(get)'] else None),
                                    headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None
                                    )
        result = {
            "code": 0,
            "response_code": response.status_code,
            "response_reason": response.reason,
            "response_headers": response.headers,
            "response_body": response.text
        }
        return result

    def post(self, request_info):
        url = "https://%s%s" % (self.hosts, request_info["请求地址"])
        response = self.session.post(url=url,
                                     params=json.loads(
                                         requests_info["请求参数(get)"] if requests_info["请求参数(get)"] else None),
                                     headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None,
                                     json=json.loads(request_info["请求参数(post)"])
                                     )
        result = {
            "code": 0,
            "response_code": response.status_code,
            "response_reason": response.reason,
            "response_headers": response.headers,
            "response_body": response.text
        }
        return result

    def request(self, request_info):
        request_type = request_info['请求方式']
        if request_type == "get":
            result = self.get(request_info)
        elif request_type == "post":
            result = self.post(request_info)
        else:
            result = {"code": 1, "error_message": "当前请求方式暂不支持!"}
        return result


if __name__ == '__main__':
    requests_info = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '', '用例步骤': 'step_01',
                     '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token',
                     '请求参数(get)': '{"grant_type":"client_credential","appid":"wxb6370bf1f0d","secret":"501123d2b9a9011d0f084"}',
                     '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token',
                     '断言类型': 'json_key', '期望结果': 'access_token,expires_in'}
    requests_info_post = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '', '用例步骤': 'step_02',
                          '接口名称': '创建标签接口', '请求方式': 'post',
                          '请求头部信息': '', '请求地址': '/cgi-bin/tags/create',
                          '请求参数(get)': '{"access_token":"51_136b9lRBH4SdLbSYI9C_1Sf1OogELivPJPNZ5z1mTzekmp3Yg4XQn8mx-sb3WxxV99NRWAX5CQhVIF6-uY12H_nRDjmEJ7H7oEbz9-qNHWV1g04V2t-29pslCsiuaSxIrkUChv4a2rPwdhnEEMHeADAMUP"}',
                          '请求参数(post)': '{   "tag" : {     "name" : "snsnssn" } } ', '取值方式': '正则取值',
                          '取值代码': '"id":(.+?),',
                          '取值变量': 'tag_id', '断言类型': 'none', '期望结果': ''}
    res = RequestsUtils()
    # res.get(requests_info)
    # res.post(requests_info_post)
    print(res.request(requests_info_post))

需求三: 按照顺序执行接口用例

    def request_steps(self, request_steps):
        """
        按照列表测试用例顺序执行测试用例
        :param request_steps:
        :return:
        """
        for request in request_steps:
            result = self.request(request)
            if result['code'] != 0:
                break
        return result

三个需求均完成

查看完整代码

# -*- coding: utf-8 -*-
# @Time : 2021/12/9 14:37
# @Author : Limusen
# @File : request_utils

import json
import requests
from common.config_utils import local_config


class RequestsUtils:

    def __init__(self):
        # 封装好的配置文件直接拿来用
        self.hosts = local_config.get_hosts
        # 全局session调用
        self.session = requests.session()

    def __get(self, request_info):
        """
        get请求封装
        :param request_info:
        :return:
        """
        #  request_info 是我们封装好的数据,可以直接拿来用
        url = "https://%s%s" % (self.hosts, request_info["请求地址"])
        response = self.session.get(url=url,
                                    params=json.loads(
                                        requests_info['请求参数(get)'] if requests_info['请求参数(get)'] else None),
                                    # 头部信息有时候可能为空,这里运用到三元运算符如果为空则设置为None
                                    headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None
                                    )
        result = {
            "code": 0,
            "response_code": response.status_code,
            "response_reason": response.reason,
            "response_headers": response.headers,
            "response_body": response.text
        }
        return result

    def __post(self, request_info):
        """
        post请求封装
        :param request_info:
        :return:
        """
        url = "https://%s%s" % (self.hosts, request_info["请求地址"])
        response = self.session.post(url=url,
                                     params=json.loads(
                                         requests_info["请求参数(get)"] if requests_info["请求参数(get)"] else None),
                                     headers=json.loads(request_info["请求头部信息"]) if request_info["请求头部信息"] else None,
                                     json=json.loads(request_info["请求参数(post)"])
                                     )
        result = {
            "code": 0,
            "response_code": response.status_code,
            "response_reason": response.reason,
            "response_headers": response.headers,
            "response_body": response.text
        }
        return result

    def request(self, request_info):
        """
        封装方法自动执行post或者get方法
        :param request_info:
        :return:
        """
        request_type = request_info['请求方式']
        if request_type == "get":
            # 私有化方法,其他类均不可调用
            result = self.__get(request_info)
        elif request_type == "post":
            result = self.__post(request_info)
        else:
            result = {"code": 1, "error_message": "当前请求方式暂不支持!"}
        return result

    def request_steps(self, request_steps):
        """
        按照列表测试用例顺序执行测试用例
        :param request_steps:
        :return:
        """
        for request in request_steps:
            result = self.request(request)
            if result['code'] != 0:
                break
        return result


if __name__ == '__main__':
    requests_info = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '', '用例步骤': 'step_01',
                     '接口名称': '获取access_token接口', '请求方式': 'get', '请求头部信息': '', '请求地址': '/cgi-bin/token',
                     '请求参数(get)': '{"grant_type":"client_credential","appid":"wxb637f897f0bf1f0d","secret":"501123d2d367b109a5cb9a9011d0f084"}',
                     '请求参数(post)': '', '取值方式': 'jsonpath取值', '取值代码': '$.access_token', '取值变量': 'token',
                     '断言类型': 'json_key', '期望结果': 'access_token,expires_in'}
    requests_info_post = {'测试用例编号': 'api_case_03', '测试用例名称': '删除标签接口测试', '用例执行': '', '用例步骤': 'step_02',
                          '接口名称': '创建标签接口', '请求方式': 'post',
                          '请求头部信息': '', '请求地址': '/cgi-bin/tags/create',
                          '请求参数(get)': '{"access_token":"51_136b9lRBH4SdLbSYI9C_1Sf1OogELivPJPNZ5z1mTzekmp3Yg4XQn8mx-sb3WxxV99NRWAX5CQhVIF6-uY12H_nRDjmEJ7H7oEbz9-qNHWV1g04V2t-29pslCsiuaSxIrkUChv4a2rPwdhnEEMHeADAMUP"}',
                          '请求参数(post)': '{   "tag" : {     "name" : "snsnssn" } } ', '取值方式': '正则取值',
                          '取值代码': '"id":(.+?),',
                          '取值变量': 'tag_id', '断言类型': 'none', '期望结果': ''}

    res = RequestsUtils()
    # res.get(requests_info)
    # res.post(requests_info_post)
    print(res.request(requests_info_post))

项目框架图:

本章节讲述的是封装request模块,下一章节将讲到接口关联,参数化替换

欢迎转载,请注明出处: https://www.cnblogs.com/yushengaqingzhijiao/p/15667548.html
原文地址:https://www.cnblogs.com/yushengaqingzhijiao/p/15667548.html