test_zw 2024-02-18 21:55 采纳率: 0%
浏览 12
已结题

在python+pytest+yaml,如何将log封装的方法添加到自身的请求封装中使用?

想完善一下自身的请求封装文件,学习别人的日志封装,但是无法加进来,会报错

img

img


上面两张图是日志的封装内容

log文件内容

import logging.handlers
## 单例模式的思想:通过逻辑控制,只生成一个对象
import os

#日志封装文件,还没用起来

class GetLogger:
    '''
    当已经创建了logger对象的时候,那么之后就不在创建了,也就是只创建一次对象
    '''
    # 把logger对象的初始值设置为None
    logger = None

    # 创建logger,并且返回这个logger
    @classmethod
    def get_logger(cls):
        if cls.logger is None:
            ########创建日志器,控制他的创建次数
            cls.logger = logging.getLogger('communityApiAutoTest')  # 这里的值是自定义的
            # 设置总的级别,debug/info/warning/error
            cls.logger.setLevel(logging.DEBUG)  # 设置debug级别,意味着高于debug的都会被收集
            # 2获取格式器
            # 2.1 要给格式器设置要输出的样式
            fmt = "%(asctime)s %(levelname)s [%(name)s] [%(filename)s (%(funcName)s:%(lineno)d] - %(message)s"
            # 2.2创建格式器,并且给他设置样式
            fm = logging.Formatter(fmt)
            # 获取项目路径
            project_path = os.path.dirname(os.path.abspath(__file__)).replace("common", "")
            # 3.创建处理器 按照时间进行切割文件
            tf = logging.handlers.TimedRotatingFileHandler(filename=project_path + f'/logs/requests.log',  # 原日志文件
                                                           when='H',  # 间隔多长时间把日志存放到新的文件中
                                                           interval=1,
                                                           backupCount=3,  # 除了原日志文件,还有3个备份
                                                           encoding='utf-8'
                                                           )
            logging.basicConfig(level=logging.DEBUG, format=fmt)  # 这是在控制台上打印日志信息

            # 在处理器中添加格式器
            tf.setFormatter(fm)
            # 在日志器中添加处理器
            cls.logger.addHandler(tf)

            # return cls.logger
        return cls.logger


if __name__ == '__main__':
    # 单例模式
    logger = GetLogger.get_logger()
    print(id(logger))
    logger1 = GetLogger.get_logger()
    print(id(logger1))
    logger.debug('调试')  # 相当print小括号中的信息
    logger.info('信息')
    logger.warning('警告')
    name = '测试'
    logger.error('这个变量是{}'.format(name))
    logger.critical('致命的')

img

下面是参考的请求封装内容,因为和我自身的有出入,我是yaml进行请求,他是没封装之前的
参考的请求封装文件


import requests
from allure_commons.utils import md5

from common.logger import GetLogger


class RequestsClient:
    # 使用类属性来定一个session,他将作为所有接口发起的全局对象
    session = requests.session()

    # 初始化数据,如果参数没传的情况下将参数置为None
    def __init__(self):
        self.session = RequestsClient.session
        self.method = None
        self.url = None
        self.data = None
        self.headers = None
        self.files = None
        self.json = None
        self.params = None
        self.resp = None
        self.logger = GetLogger.get_logger()

    # 封装requests请求
    def sendRequest(self, **kwargs):
        # 如果调用方,没有传任何的参数,那么就使用该对象的默认属性参数
        if 'url' not in kwargs.keys():
            kwargs['url'] = self.url
        if 'method' not in kwargs.keys():
            kwargs['method'] = self.method
        if 'headers' not in kwargs.keys():
            kwargs['headers'] = self.headers
        if 'data' not in kwargs.keys():
            kwargs['data'] = self.data
        if 'json' not in kwargs.keys():
            kwargs['json'] = self.json
        if 'files' not in kwargs.keys():
            kwargs['files'] = self.files
        if 'params' not in kwargs.keys():
            kwargs['params'] = self.params

        # 等价于
        # self.resp = self.session.request(**kwargs)
        # self.resp = self.session.request(url=kwargs['url'],method=kwargs['method'],headers=kwargs['headers'],data=kwargs['data'],json=kwargs['json'],params=kwargs['params'],files=kwargs['files'])

        # 将接口发起前的信息记录到日志中
        for key, value in kwargs.items():
            self.logger.info(f'接口的{key}是:{value}')
        try:
            self.resp = self.session.request(**kwargs)
            self.logger.info(f'接口响应状态码是:{self.resp.status_code}')
            self.logger.info(f'接口相应信息是:{self.resp.text}')
        except BaseException as e:
            self.logger.exception('接口请求报错!')
            raise BaseException(f'接口报错信息:{e}')
        return self.resp


if __name__ == '__main__':
    json = {
        "userName": "admin",
        "password": md5("123456")
    }
    request = RequestsClient()
    resp = request.sendRequest(method='post', url='http://localhost:8888/community/flogin', json=json)
    print(resp.json())

下面的是自身的请求封装,看看如何添加好呢?
自身的请求封装文件

import jsonpath
import requests
import json

from common.logger import GetLogger
from common.yaml_util import YamlUtil
from builtins import str
import re


class RequestUtil:


    #定义初始方法继承
    def __init__(self, two_node):
        # 接口地址的初始方法
        self.base_url = YamlUtil().read_config('base', two_node)


    # 替换值的方法
    # #(替换url,params,data,json,headers)
    # #(string,int,float,list,dict)
    def replace_value(self, data):
        if data:
            # 保存数据类型
            data_type = type(data)
            # 判断数据类型转换成str
            if isinstance(data, dict) or isinstance(data, list):
                str_data = json.dumps(data)
            else:
                str_data = str(data)
            for cs in range(1, str_data.count('${') + 1):
                # 替换
                if "${" in str_data and "}" in str_data:
                    start_index = str_data.index("${")
                    end_index = str_data.index("}", start_index)
                    old_value = str_data[start_index:end_index + 1]
                    new_value = YamlUtil().read_extract_yaml(old_value[2:-1])
                    str_data = str_data.replace(old_value, new_value)
            # 还原数据类型
            if isinstance(data, dict) or isinstance(data, list):
                data = json.loads(str_data)
                # 打印请求数据
                print(data)
            else:
                data = data_type(str_data)
        return data

    # 规范yaml测试用例
    def standard_yaml(self, caseinfo):
        caseinfo_keys = caseinfo.keys()
        # 判断一级关键字是否包含:name,request,validate
        if "name" in caseinfo_keys and "request" in caseinfo_keys and "validate" in caseinfo_keys:
            # 判断request下面是否包含:method、url
            request_keys = caseinfo["request"].keys()
            if "method" in request_keys and "url" in request_keys:
                print("yaml基本架构检查通过")
                method = caseinfo['request'].pop("method")  # pop() 函数用于移除列表中的一个元素,并且返回该元素的值。
                url = caseinfo['request'].pop("url")
                res = self.send_request(method, url, **caseinfo['request'])  # caseinfo需要解包加**
                return_text = res.text
                return_code = res.status_code
                return_json = ""
                try:
                    return_json = res.json()
                except Exception as e:
                    print("extract返回的结果不是JSON格式")
                # 提取值并写入extract.yaml文件
                if "extract" in caseinfo.keys():
                    for key, value in caseinfo["extract"].items():
                        if "(.*?)" in value or "(.+?)" in value:  # 正则表达式
                            zz_value = re.search(value, return_text)
                            if zz_value:
                                extract_value = {key: zz_value.group(1)}
                                YamlUtil().write_extract_yaml(extract_value)
                        else:  # jsonpath
                            try:
                                resturn_json = res.json()
                                js_value = jsonpath.jsonpath(resturn_json, value)
                                if js_value:
                                    extract_value = {key: js_value[0]}
                                    YamlUtil().write_extract_yaml(extract_value)
                            except Exception as e:
                                print("extract返回的结果不是JSON格式,不能使用jsonpath提取")
                # 断言:
                self.assert_result(caseinfo['validate'], return_json, return_code)
                return res
            else:
                print("在request下必须包含method,url")
        else:
            print("一级关键字必须包含name,request,validate")

    sess = requests.session()

    # 统一请求封装

    def send_request(self, method, url, **kwargs):
        method = str(method).lower()  # 转换小写
        # 基础路径的拼接和替换
        url = self.base_url + self.replace_value(url)
        #打印请求地址
        print(url)

        # 参数替换
        for key, value in kwargs.items():
            if key in ['params', 'data', 'json', 'headers']:
                kwargs[key] = self.replace_value(value)
            elif key == "files":
                for file_key, file_path in value.items():
                    value[file_key] = open(file_path, 'rb')

        res = RequestUtil.sess.request(method, url, **kwargs)
        print(res.text)
        return res





    # 断言
    def assert_result(self, yq_result, sj_result, return_code):
        all_flag = 0
        for yq in yq_result:
            for key, value in yq.items():
                print(key, value)
                if key == "equals":
                    flag = self.equals_assert(value, return_code, sj_result)
                    all_flag = all_flag + flag
                elif key == 'contains':
                    flag = self.contains_assert(value, sj_result)
                    all_flag = all_flag + flag
                else:
                    print("框架暂不支持此段断言方式")
        assert all_flag == 0

    # 相等断言
    def equals_assert(self, value, return_code, sj_result):
        flag = 0
        for assert_key, assert_value in value.items():
            print(assert_key, assert_value)
            if assert_key == "status_code":  # 状态断言
                assert_value == return_code
                if assert_value != return_code:
                    flag = flag + 1
                    print("断言失败,返回的状态码不等于%s" % assert_value)
            else:
                lists = jsonpath.jsonpath(sj_result, '$..%s' % assert_key)
                if lists:
                    if assert_value not in lists:
                        flag = flag + 1
                        print("断言失败:" + assert_key + "不等于" + str(assert_value))
                else:
                    flag = flag + 1
                    print("断言失败:返回的结果不存在:" + assert_key)
        return flag

    # 包含断言
    def contains_assert(self, value, sj_result):
        flag = 0
        if value not in str(sj_result):
            flag = flag + 1
            print("断言失败:返回的结果中不包含:" + value)
        return flag


上图是本身的封装请求文件,想添加日志文件进行填充

img

  • 写回答

1条回答 默认 最新

  • GISer Liu 2024-02-18 21:57
    关注

    该回答引用自GPT-3.5,由博主GIS_Liu编写:

    针对你的问题,如果你想将别人封装的日志功能整合到你自己的请求封装中,可能会出现报错的情况。这通常是因为你的请求封装和日志封装之间存在一些冲突或者依赖关系没有正确处理所致。

    首先,你需要确认你所使用的日志封装与你的请求封装之间的兼容性。确保它们都能够在同一个环境中正确运行,并且没有冲突的地方。

    其次,检查你封装的请求部分和日志部分是否有重复的命名或者方法。如果有重复的地方,可能会导致命名冲突或者方法调用错误。

    最后,确保你正确引入了日志封装的相关代码,并且按照其文档或者说明正确地使用了它提供的功能。

    如果以上步骤都没有解决你的问题,建议你提供具体的报错信息以及相关的代码片段,这样我才能更准确地帮助你解决这个问题。

    img

    如果该回答解决了您的问题,请采纳!如果没有,请详述问题所在!

    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 2月20日
  • 修改了问题 2月18日
  • 修改了问题 2月18日
  • 修改了问题 2月18日
  • 展开全部