想完善一下自身的请求封装文件,学习别人的日志封装,但是无法加进来,会报错


上面两张图是日志的封装内容
log文件内容
import logging.handlers
## 单例模式的思想:通过逻辑控制,只生成一个对象
import os
#日志封装文件,还没用起来
class GetLogger:
'''
当已经创建了logger对象的时候,那么之后就不在创建了,也就是只创建一次对象
'''
# 把logger对象的初始值设置为None
logger = None
# 创建logger,并且返回这个logger
@classmethod
def get_logger(cls):
if cls.logger is None:
########创建日志器,控制他的创建次数
cls.logger = logging.getLogger('communityApiAutoTest') # 这里的值是自定义的
# 设置总的级别,debug/info/warning/error
cls.logger.setLevel(logging.DEBUG) # 设置debug级别,意味着高于debug的都会被收集
# 2获取格式器
# 2.1 要给格式器设置要输出的样式
fmt = "%(asctime)s %(levelname)s [%(name)s] [%(filename)s (%(funcName)s:%(lineno)d] - %(message)s"
# 2.2创建格式器,并且给他设置样式
fm = logging.Formatter(fmt)
# 获取项目路径
project_path = os.path.dirname(os.path.abspath(__file__)).replace("common", "")
# 3.创建处理器 按照时间进行切割文件
tf = logging.handlers.TimedRotatingFileHandler(filename=project_path + f'/logs/requests.log', # 原日志文件
when='H', # 间隔多长时间把日志存放到新的文件中
interval=1,
backupCount=3, # 除了原日志文件,还有3个备份
encoding='utf-8'
)
logging.basicConfig(level=logging.DEBUG, format=fmt) # 这是在控制台上打印日志信息
# 在处理器中添加格式器
tf.setFormatter(fm)
# 在日志器中添加处理器
cls.logger.addHandler(tf)
# return cls.logger
return cls.logger
if __name__ == '__main__':
# 单例模式
logger = GetLogger.get_logger()
print(id(logger))
logger1 = GetLogger.get_logger()
print(id(logger1))
logger.debug('调试') # 相当print小括号中的信息
logger.info('信息')
logger.warning('警告')
name = '测试'
logger.error('这个变量是{}'.format(name))
logger.critical('致命的')

下面是参考的请求封装内容,因为和我自身的有出入,我是yaml进行请求,他是没封装之前的
参考的请求封装文件
import requests
from allure_commons.utils import md5
from common.logger import GetLogger
class RequestsClient:
# 使用类属性来定一个session,他将作为所有接口发起的全局对象
session = requests.session()
# 初始化数据,如果参数没传的情况下将参数置为None
def __init__(self):
self.session = RequestsClient.session
self.method = None
self.url = None
self.data = None
self.headers = None
self.files = None
self.json = None
self.params = None
self.resp = None
self.logger = GetLogger.get_logger()
# 封装requests请求
def sendRequest(self, **kwargs):
# 如果调用方,没有传任何的参数,那么就使用该对象的默认属性参数
if 'url' not in kwargs.keys():
kwargs['url'] = self.url
if 'method' not in kwargs.keys():
kwargs['method'] = self.method
if 'headers' not in kwargs.keys():
kwargs['headers'] = self.headers
if 'data' not in kwargs.keys():
kwargs['data'] = self.data
if 'json' not in kwargs.keys():
kwargs['json'] = self.json
if 'files' not in kwargs.keys():
kwargs['files'] = self.files
if 'params' not in kwargs.keys():
kwargs['params'] = self.params
# 等价于
# self.resp = self.session.request(**kwargs)
# self.resp = self.session.request(url=kwargs['url'],method=kwargs['method'],headers=kwargs['headers'],data=kwargs['data'],json=kwargs['json'],params=kwargs['params'],files=kwargs['files'])
# 将接口发起前的信息记录到日志中
for key, value in kwargs.items():
self.logger.info(f'接口的{key}是:{value}')
try:
self.resp = self.session.request(**kwargs)
self.logger.info(f'接口响应状态码是:{self.resp.status_code}')
self.logger.info(f'接口相应信息是:{self.resp.text}')
except BaseException as e:
self.logger.exception('接口请求报错!')
raise BaseException(f'接口报错信息:{e}')
return self.resp
if __name__ == '__main__':
json = {
"userName": "admin",
"password": md5("123456")
}
request = RequestsClient()
resp = request.sendRequest(method='post', url='http://localhost:8888/community/flogin', json=json)
print(resp.json())
下面的是自身的请求封装,看看如何添加好呢?
自身的请求封装文件
import jsonpath
import requests
import json
from common.logger import GetLogger
from common.yaml_util import YamlUtil
from builtins import str
import re
class RequestUtil:
#定义初始方法继承
def __init__(self, two_node):
# 接口地址的初始方法
self.base_url = YamlUtil().read_config('base', two_node)
# 替换值的方法
# #(替换url,params,data,json,headers)
# #(string,int,float,list,dict)
def replace_value(self, data):
if data:
# 保存数据类型
data_type = type(data)
# 判断数据类型转换成str
if isinstance(data, dict) or isinstance(data, list):
str_data = json.dumps(data)
else:
str_data = str(data)
for cs in range(1, str_data.count('${') + 1):
# 替换
if "${" in str_data and "}" in str_data:
start_index = str_data.index("${")
end_index = str_data.index("}", start_index)
old_value = str_data[start_index:end_index + 1]
new_value = YamlUtil().read_extract_yaml(old_value[2:-1])
str_data = str_data.replace(old_value, new_value)
# 还原数据类型
if isinstance(data, dict) or isinstance(data, list):
data = json.loads(str_data)
# 打印请求数据
print(data)
else:
data = data_type(str_data)
return data
# 规范yaml测试用例
def standard_yaml(self, caseinfo):
caseinfo_keys = caseinfo.keys()
# 判断一级关键字是否包含:name,request,validate
if "name" in caseinfo_keys and "request" in caseinfo_keys and "validate" in caseinfo_keys:
# 判断request下面是否包含:method、url
request_keys = caseinfo["request"].keys()
if "method" in request_keys and "url" in request_keys:
print("yaml基本架构检查通过")
method = caseinfo['request'].pop("method") # pop() 函数用于移除列表中的一个元素,并且返回该元素的值。
url = caseinfo['request'].pop("url")
res = self.send_request(method, url, **caseinfo['request']) # caseinfo需要解包加**
return_text = res.text
return_code = res.status_code
return_json = ""
try:
return_json = res.json()
except Exception as e:
print("extract返回的结果不是JSON格式")
# 提取值并写入extract.yaml文件
if "extract" in caseinfo.keys():
for key, value in caseinfo["extract"].items():
if "(.*?)" in value or "(.+?)" in value: # 正则表达式
zz_value = re.search(value, return_text)
if zz_value:
extract_value = {key: zz_value.group(1)}
YamlUtil().write_extract_yaml(extract_value)
else: # jsonpath
try:
resturn_json = res.json()
js_value = jsonpath.jsonpath(resturn_json, value)
if js_value:
extract_value = {key: js_value[0]}
YamlUtil().write_extract_yaml(extract_value)
except Exception as e:
print("extract返回的结果不是JSON格式,不能使用jsonpath提取")
# 断言:
self.assert_result(caseinfo['validate'], return_json, return_code)
return res
else:
print("在request下必须包含method,url")
else:
print("一级关键字必须包含name,request,validate")
sess = requests.session()
# 统一请求封装
def send_request(self, method, url, **kwargs):
method = str(method).lower() # 转换小写
# 基础路径的拼接和替换
url = self.base_url + self.replace_value(url)
#打印请求地址
print(url)
# 参数替换
for key, value in kwargs.items():
if key in ['params', 'data', 'json', 'headers']:
kwargs[key] = self.replace_value(value)
elif key == "files":
for file_key, file_path in value.items():
value[file_key] = open(file_path, 'rb')
res = RequestUtil.sess.request(method, url, **kwargs)
print(res.text)
return res
# 断言
def assert_result(self, yq_result, sj_result, return_code):
all_flag = 0
for yq in yq_result:
for key, value in yq.items():
print(key, value)
if key == "equals":
flag = self.equals_assert(value, return_code, sj_result)
all_flag = all_flag + flag
elif key == 'contains':
flag = self.contains_assert(value, sj_result)
all_flag = all_flag + flag
else:
print("框架暂不支持此段断言方式")
assert all_flag == 0
# 相等断言
def equals_assert(self, value, return_code, sj_result):
flag = 0
for assert_key, assert_value in value.items():
print(assert_key, assert_value)
if assert_key == "status_code": # 状态断言
assert_value == return_code
if assert_value != return_code:
flag = flag + 1
print("断言失败,返回的状态码不等于%s" % assert_value)
else:
lists = jsonpath.jsonpath(sj_result, '$..%s' % assert_key)
if lists:
if assert_value not in lists:
flag = flag + 1
print("断言失败:" + assert_key + "不等于" + str(assert_value))
else:
flag = flag + 1
print("断言失败:返回的结果不存在:" + assert_key)
return flag
# 包含断言
def contains_assert(self, value, sj_result):
flag = 0
if value not in str(sj_result):
flag = flag + 1
print("断言失败:返回的结果中不包含:" + value)
return flag
上图是本身的封装请求文件,想添加日志文件进行填充

