python如何使用yaml存储和提取做自动化
时间: 2024-03-09 14:47:35 浏览: 51
使用 Python 中的 PyYAML 库可以很方便地实现 YAML 文件的读取和写入。
首先,需要安装 PyYAML 库,可以使用 pip 命令进行安装:
```
pip install pyyaml
```
然后,通过以下代码可以将 Python 对象写入到 YAML 文件中:
```python
import yaml
data = {
'name': 'John',
'age': 30,
'city': 'New York'
}
with open('data.yaml', 'w') as file:
documents = yaml.dump(data, file)
```
以上代码将 Python 字典对象 `data` 写入到了 `data.yaml` 文件中。
接下来,可以通过以下代码将 YAML 文件中的数据读取到 Python 对象中:
```python
import yaml
with open('data.yaml') as file:
documents = yaml.full_load(file)
for item, doc in documents.items():
print(item, ":", doc)
```
以上代码将打印出从 `data.yaml` 文件中读取到的 Python 字典对象。
使用 PyYAML 库,可以很方便地将 Python 对象和 YAML 文件进行相互转换,从而实现自动化。
相关问题
使用python的pytest和yaml存储和提取参数执行自动化
好的,这是一个关于使用Python的pytest和YAML存储和提取参数执行自动化的问题。对于这个问题,我可以给出以下回答:
首先,pytest是一个Python的单元测试框架,它可以让我们编写更加简洁、可读性更高的测试代码。而YAML是一种轻量级的数据序列化格式,它可以让我们将数据以一种易读易写的方式存储在文件中。
在使用pytest进行自动化测试时,我们可以使用YAML文件来存储测试数据和参数,然后在测试代码中读取这些数据和参数。这样做的好处是可以将测试数据和参数与测试代码分离,从而让测试代码更加简洁、易读易维护。
具体实现可以参考以下步骤:
1. 安装pytest和pyyaml库
```
pip install pytest
pip install pyyaml
```
2. 创建YAML文件来存储测试数据和参数
例如,我们可以创建一个名为test_data.yaml的文件,用于存储测试数据和参数,格式如下:
```
- name: test_case_1
input: 1, 2
expect: 3
- name: test_case_2
input: 10, 20
expect: 30
```
其中,name表示测试用例的名称,input表示测试用例的输入参数,expect表示测试用例的预期输出结果。
3. 在测试代码中读取YAML文件并使用其中的数据和参数
例如,我们可以创建一个名为test_calc.py的测试文件,用于测试一个加法函数,代码如下:
```
import yaml
def add(x, y):
return x + y
def test_calc():
with open('test_data.yaml') as f:
test_data = yaml.safe_load(f)
for data in test_data:
name = data['name']
input_data = data['input']
expect = data['expect']
assert add(*input_data) == expect, f"{name} failed"
```
在这个测试代码中,我们首先使用yaml.safe_load()函数读取test_data.yaml文件中的数据,然后使用for循环遍历其中的测试数据和参数,最后使用assert语句来判断测试结果是否符合预期。
4. 运行测试代码
在终端中进入到测试文件所在的目录,运行以下命令即可执行测试:
```
pytest test_calc.py
```
总的来说,使用Python的pytest和YAML存储和提取参数执行自动化是一种非常方便、可读性高的测试方法,可以大大提高测试代码的可维护性和可读性。
生成基于basemodel,pytest,yaml,jsonpath,mysql,缓存机制的python接口自动化,分模块生成
好的,我将为您分模块生成基于 basemodel、pytest、yaml、jsonpath、mysql、缓存机制的 Python 接口自动化代码。
### 1. 基于 basemodel 的接口请求封装
首先,我们使用基于 basemodel 的接口请求封装,方便我们发送各种类型的接口请求。
```python
from typing import Dict, Union
from pydantic import BaseModel
import requests
class BaseResponse(BaseModel):
status: int
message: str
data: Union[Dict, str]
class BaseRequest(BaseModel):
url: str
method: str = "GET"
headers: Dict[str, str] = {}
params: Dict[str, Union[str, int]] = {}
data: Dict[str, Union[str, int]] = {}
def send(self) -> BaseResponse:
resp = requests.request(
method=self.method,
url=self.url,
headers=self.headers,
params=self.params,
json=self.data
)
return BaseResponse(
status=resp.status_code,
message=resp.reason,
data=resp.json()
)
```
### 2. pytest 测试用例编写
接下来,我们编写 pytest 测试用例,利用 BaseRequest 封装的接口请求,完成接口测试。
```python
import pytest
from base_request import BaseRequest, BaseResponse
@pytest.fixture(scope="module")
def api():
return BaseRequest(url="https://jsonplaceholder.typicode.com/posts")
@pytest.mark.parametrize("params, expected", [
({}, "200 OK"),
({"userId": 1}, "200 OK"),
({"id": 1}, "200 OK"),
({"id": 101}, "404 Not Found"),
])
def test_get_posts(api, params, expected):
api.params = params
resp: BaseResponse = api.send()
assert resp.status == 200
assert resp.message == expected
```
### 3. 使用 YAML 存储测试数据
为了方便管理测试数据,我们使用 YAML 文件存储测试数据。
```yaml
- name: "get all posts"
params: {}
expected: "200 OK"
- name: "get posts by userId"
params:
userId: 1
expected: "200 OK"
- name: "get posts by id"
params:
id: 1
expected: "200 OK"
- name: "get posts by invalid id"
params:
id: 101
expected: "404 Not Found"
```
### 4. 使用 jsonpath 提取响应数据
接下来,我们使用 jsonpath 提取响应数据,并进行断言。
```python
import pytest
from base_request import BaseRequest, BaseResponse
from jsonpath import jsonpath
import yaml
@pytest.fixture(scope="module")
def api():
return BaseRequest(url="https://jsonplaceholder.typicode.com/posts")
@pytest.fixture(scope="module")
def test_data():
with open("test_data.yaml", encoding="utf-8") as f:
data = yaml.safe_load(f)
return data
@pytest.mark.parametrize("test_case", test_data())
def test_get_posts(api, test_case):
api.params = test_case["params"]
resp: BaseResponse = api.send()
assert resp.status == 200
assert resp.message == test_case["expected"]
data = jsonpath(resp.data, "$..title")
assert len(data) > 0
```
### 5. 使用 MySQL 存储测试结果
最后,我们使用 MySQL 存储测试结果,方便后续数据分析。
```python
import pytest
from base_request import BaseRequest, BaseResponse
from jsonpath import jsonpath
import yaml
import pymysql
@pytest.fixture(scope="module")
def api():
return BaseRequest(url="https://jsonplaceholder.typicode.com/posts")
@pytest.fixture(scope="module")
def test_data():
with open("test_data.yaml", encoding="utf-8") as f:
data = yaml.safe_load(f)
return data
@pytest.fixture(scope="module")
def mysql():
conn = pymysql.connect(
host="localhost",
user="root",
password="password",
database="test"
)
yield conn
conn.close()
@pytest.mark.parametrize("test_case", test_data())
def test_get_posts(api, test_case, mysql):
api.params = test_case["params"]
resp: BaseResponse = api.send()
assert resp.status == 200
assert resp.message == test_case["expected"]
data = jsonpath(resp.data, "$..title")
assert len(data) > 0
cursor = mysql.cursor()
sql = """
INSERT INTO test_result (api, params, status, message)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(sql, (api.url, str(api.params), resp.status, resp.message))
mysql.commit()
cursor.close()
```
### 6. 使用缓存机制提高测试效率
如果我们的接口请求中有一些不经常变动的数据,我们可以使用缓存机制提高测试效率。
```python
import pytest
from base_request import BaseRequest, BaseResponse
from jsonpath import jsonpath
import yaml
import pymysql
import pickle
@pytest.fixture(scope="module")
def api():
return BaseRequest(url="https://jsonplaceholder.typicode.com/posts")
@pytest.fixture(scope="module")
def test_data():
with open("test_data.yaml", encoding="utf-8") as f:
data = yaml.safe_load(f)
return data
@pytest.fixture(scope="module")
def mysql():
conn = pymysql.connect(
host="localhost",
user="root",
password="password",
database="test"
)
yield conn
conn.close()
@pytest.fixture(scope="module")
def cache():
try:
with open("cache.pickle", "rb") as f:
data = pickle.load(f)
except:
data = {}
yield data
with open("cache.pickle", "wb") as f:
pickle.dump(data, f)
@pytest.mark.parametrize("test_case", test_data())
def test_get_posts(api, test_case, mysql, cache):
if api.url in cache and str(api.params) in cache[api.url]:
resp_data = cache[api.url][str(api.params)]
else:
resp: BaseResponse = api.send()
resp_data = jsonpath(resp.data, "$..title")
cache.setdefault(api.url, {})[str(api.params)] = resp_data
assert len(resp_data) > 0
cursor = mysql.cursor()
sql = """
INSERT INTO test_result (api, params, status, message)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(sql, (api.url, str(api.params), 200, "OK"))
mysql.commit()
cursor.close()
```
这样,我们就完成了基于 basemodel、pytest、yaml、jsonpath、mysql、缓存机制的 Python 接口自动化代码。
阅读全文