python res = requests.get(url)后,需要sleep,防止获取不到res,是否可以做到sleep自己监控到如果res了自动唤醒
时间: 2024-03-25 09:37:23 浏览: 177
可以使用Python中的异步编程库来实现这个功能,例如asyncio。在获取URL的请求后,可以使用asyncio.sleep()来等待一段时间,然后再次检查是否有响应。如果有响应,就可以跳出等待循环并继续执行后续代码。如果没有响应,则继续等待一段时间。这样可以避免在长时间等待时阻塞整个程序。
以下是一个使用asyncio的示例代码:
```
import asyncio
import requests
async def get_response(url):
while True:
res = requests.get(url)
if res:
break
await asyncio.sleep(1)
# continue with further processing of the response
asyncio.run(get_response('http://example.com'))
```
在这个例子中,我们定义了一个异步函数get_response(),它在循环中获取URL的响应。如果没有响应,它会使用asyncio.sleep()等待一秒钟,然后再次尝试获取响应。如果有响应,则跳出循环并继续执行后续代码。最后,我们使用asyncio.run()运行这个异步函数。
注意,使用异步编程需要理解协程和事件循环的概念。如果您不熟悉这些概念,请先学习异步编程的基础知识。
相关问题
帮我把一下代码设置一个合理请求头,并加入一个延时import requests import os from bs4 import BeautifulSoup class NovelDownloader: def __init__(self, root_url): self.root_url = root_url self.book_list = [] self.chapter_list = [] def get_url(self, url): while True: try: res = requests.get(url) if res.status_code == 200: print("页面获取成功!") return res.text else: print("页面返回异常!", res.status_code) except: print("页面获取错误!") def get_book_list(self): res = self.get_url(self.root_url) html = BeautifulSoup(res, "html.parser") a_list = html.find_all("a", {"class": "name"}) for a in a_list: self.book_list.append(a["href"]) self.book_list = [self.root_url + i for i in self.book_list] self.book_list.remove('http://www.biquge5200.cc/') def get_chapter_list(self, url): res = self.get_url(url) html = BeautifulSoup(res, "html.parser") a_list = html.find_all("a", {"class": "chapter"}) for a in a_list: self.chapter_list.append((a["href"], a.text.replace("\n", ""))) def get_content(self, chapter): url = self.root_url + chapter[0] print(url) book_name = chapter[0].split("/")[1] print(book_name) if not os.path.exists(book_name): os.mkdir(book_name) res = self.get_url(url) html = BeautifulSoup(res, "html.parser") content = html.find("div", {"id": "content"}).text print(content) path = os.path.join(book_name, chapter[1]) with open(path, "w", encoding="utf8") as f: f.write(content) def main(self): self.get_book_list() for book in self.book_list: self.get_chapter_list(book) for chapter in self.chapter_list: self.get_content(chapter) if __name__ == '__main__': root_url = "http://www.biquge5200.cc/" nd = NovelDownloader(root_url) nd.main()
好的,我可以帮你添加一个合理的请求头和延时。在 `get_url` 方法中,你可以添加一个请求头字典,来模拟浏览器发送请求。同时,在每次请求后添加一个时间延时,避免频繁请求被网站屏蔽。
修改后的代码如下所示:
```python
import requests
import os
import time
from bs4 import BeautifulSoup
class NovelDownloader:
def __init__(self, root_url):
self.root_url = root_url
self.book_list = []
self.chapter_list = []
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
} # 添加请求头
def get_url(self, url):
while True:
try:
res = requests.get(url, headers=self.headers)
if res.status_code == 200:
print("页面获取成功!")
return res.text
else:
print("页面返回异常!", res.status_code)
except:
print("页面获取错误!")
time.sleep(1) # 添加延时
def get_book_list(self):
res = self.get_url(self.root_url)
html = BeautifulSoup(res, "html.parser")
a_list = html.find_all("a", {"class": "name"})
for a in a_list:
self.book_list.append(a["href"])
self.book_list = [self.root_url + i for i in self.book_list]
self.book_list.remove('http://www.biquge5200.cc/')
def get_chapter_list(self, url):
res = self.get_url(url)
html = BeautifulSoup(res, "html.parser")
a_list = html.find_all("a", {"class": "chapter"})
for a in a_list:
self.chapter_list.append((a["href"], a.text.replace("\n", "")))
def get_content(self, chapter):
url = self.root_url + chapter[0]
print(url)
book_name = chapter[0].split("/")[1]
print(book_name)
if not os.path.exists(book_name):
os.mkdir(book_name)
res = self.get_url(url)
html = BeautifulSoup(res, "html.parser")
content = html.find("div", {"id": "content"}).text
print(content)
path = os.path.join(book_name, chapter[1])
with open(path, "w", encoding="utf8") as f:
f.write(content)
def main(self):
self.get_book_list()
for book in self.book_list:
self.get_chapter_list(book)
for chapter in self.chapter_list:
self.get_content(chapter)
if __name__ == '__main__':
root_url = "http://www.biquge5200.cc/"
nd = NovelDownloader(root_url)
nd.main()
```
这样就可以更好地模拟浏览器发送请求,避免被网站屏蔽。另外,可以根据需要调整延时时间,以达到更好的效果。
import os.path import random import time from multiprocessing import Pool import requests from lxml import etree headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36' } def get_img_src(page): ''' 抓取页面的图片的src :return: ''' for i in range(1, page+1): url = f'https://www.pkdoutu.com/photo/list/?page={i}' print(f'抓取{url}页数据') res = requests.get(url, headers=headers) res.encoding = res.apparent_encoding tree = etree.HTML(res.text) # 抓取图片 img_list = tree.xpath('//img[@referrerpolicy="no-referrer"]/@data-original') yield img_list def download_img(url): ''' 下载图片 :return: ''' time.sleep(random.uniform(0, 1)) img_res = requests.get(url, headers=headers) img_name = url.split('_')[-1] # 通过图片url进行拆分,拿到图片名称和后缀 path = 'img' # 判断路径是否存在,不存在则创建 if not os.path.exists(path): os.mkdir(path) # 写入到文件中 with open(os.path.join(path, img_name), 'wb') as f: f.write(img_res.content) if __name__ == '__main__': pool = Pool() # 通过生成器get_img_src返回图片列表 for url_list in get_img_src(2): for url in url_list: # 循环加入进程池 pool.apply_async(download_img, args=(url, )) pool.close() pool.join() print('over')
这是一个 Python 程序,主要用于从指定网站上抓取图片并下载到本地。程序使用了 multiprocessing 库中的进程池来实现多进程下载,加快了下载速度。其中,get_img_src 函数用于抓取指定页数的图片链接,download_img 函数用于下载指定链接的图片并保存到本地。主函数中使用了进程池来异步下载所有图片。
程序的具体实现步骤如下:
1. 导入所需的库,包括 os.path、random、time、multiprocessing 和 requests。
2. 定义抓取图片链接的函数 get_img_src,通过循环抓取指定页数的页面,并使用 lxml 库解析页面内容,抓取图片链接并返回。
3. 定义下载图片的函数 download_img,通过 requests 库下载指定链接的图片,并将图片保存到本地。
4. 在主函数中使用进程池来异步下载所有图片,具体实现如下:
a. 创建进程池 pool。
b. 通过生成器 get_img_src 返回图片链接列表,循环遍历每个链接。
c. 将 download_img 函数加入进程池,并传入图片链接作为参数。
d. 关闭进程池,并等待所有进程完成。
e. 打印下载完成提示信息。
该程序适用于需要大量下载图片的场景,可以通过调整进程池的大小和抓取的页数来适应不同的需求。
阅读全文