def save_txt(str_list:list,name): with open(name,'w',encoding='utf-8') as f: for i in str_list: f.write(i+'\n')
时间: 2024-05-21 20:17:49 浏览: 8
这是一个保存列表到文本文件的函数,函数名为`save_txt`,接收两个参数:`str_list`和`name`。其中,`str_list`是要保存的字符串列表,`name`是要保存到的文件名。
函数内部使用`with open`语句打开文件并指定写入模式和编码方式。然后遍历`str_list`中的每个字符串,将其写入文件并在每个字符串末尾添加一个换行符。最后,自动关闭文件。
相关问题
import requests from bs4 import BeautifulSoup import csv def get_top250_movies(): url = 'https://movie.douban.com/top250' headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'} movie_info_list = [] for i in range(0, 250, 25): params = {'start': str(i)} res = requests.get(url, headers=headers, params=params) soup = BeautifulSoup(res.text, 'html.parser') movie_list = soup.find_all('div', class_='info') for movie in movie_list: title = movie.find('span', class_='title').text info = movie.find('div', class_='bd').p.text.strip().split('\n') director = info[0][4:] actors = info[1][3:] year = info[1][-5:-1] rating = movie.find('span', class_='rating_num').text comment_num = movie.find('div', class_='star').find_all('span')[3].text[:-3] movie_info_list.append([title, director, actors, year, rating, comment_num]) return movie_info_list def save_to_csv(movie_info_list): with open('movie_info.csv', 'w', newline='', encoding='utf-8-sig') as f: writer = csv.writer(f) writer.writerow(['电影名称', '导演', '演员', '上映年份', '评分', '评论数']) for movie_info in movie_info_list: writer.writerow(movie_info) if __name__ == '__main__': movie_info_list = get_top250_movies() save_to_csv(movie_info_list) print('电影信息保存成功!')将此代码运行成功后保存的六个信息分别单独成一列
可以在save_to_csv函数中将每个信息分别写入csv文件中的一列,修改后代码如下:
```python
import requests
from bs4 import BeautifulSoup
import csv
def get_top250_movies():
url = 'https://movie.douban.com/top250'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
}
movie_info_list = []
for i in range(0, 250, 25):
params = {'start': str(i)}
res = requests.get(url, headers=headers, params=params)
soup = BeautifulSoup(res.text, 'html.parser')
movie_list = soup.find_all('div', class_='info')
for movie in movie_list:
title = movie.find('span', class_='title').text
info = movie.find('div', class_='bd').p.text.strip().split('\n')
director = info[0][4:]
actors = info[1][3:]
year = info[1][-5:-1]
rating = movie.find('span', class_='rating_num').text
comment_num = movie.find('div', class_='star').find_all('span')[3].text[:-3]
movie_info_list.append([title, director, actors, year, rating, comment_num])
return movie_info_list
def save_to_csv(movie_info_list):
with open('movie_info.csv', 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.writer(f)
writer.writerow(['电影名称', '导演', '演员', '上映年份', '评分', '评论数'])
for movie_info in movie_info_list:
writer.writerow(movie_info)
# 将每个信息分别写入csv文件中的一列
with open('movie_info.csv', 'r', newline='', encoding='utf-8-sig') as f_read:
with open('movie_info_new.csv', 'w', newline='', encoding='utf-8-sig') as f_write:
reader = csv.reader(f_read)
writer = csv.writer(f_write)
for row in reader:
for i in range(len(row)):
writer.writerow([row[i]])
# 删除原文件
import os
os.remove('movie_info.csv')
# 重命名新文件
os.rename('movie_info_new.csv', 'movie_info.csv')
if __name__ == '__main__':
movie_info_list = get_top250_movies()
save_to_csv(movie_info_list)
print('电影信息保存成功!')
```
运行修改后的代码,会在同级目录下生成一个名为'movie_info.csv'的文件,其中每个信息分别单独成一列。
import http.client from html.parser import HTMLParser import argparse from concurrent.futures import ThreadPoolExecutor import multiprocessing.pool prefix = "save/" readed_path = multiprocessing.Manager().list() cur_path = multiprocessing.Manager().list() new_path = multiprocessing.Manager().list() lock = multiprocessing.Lock() class MyHttpParser(HTMLParser): def __init__(self): HTMLParser.__init__(self) self.tag = [] self.href = "" self.txt = "" def handle_starttag(self, tag, attrs): self.tag.append(tag) # print("start tag in list :" + str(self.tag)) if tag == "a": for att in attrs: if att[0] == 'href': self.href = att[1] def handle_endtag(self, tag): if tag == "a" and len(self.tag) > 2 and self.tag[-2] == "div": print("in div, link txt is %s ." % self.txt) print("in div, link url is %s ." % self.href) lock.acquire() if not self.href in readed_path: readed_path.append(self.href) new_path.append(self.href) # print("end tag in list :" + str(self.tag)) lock.release() self.tag.pop(-1) def handle_data(self, data): if len(self.tag) >= 1 and self.tag[-1] == "a": self.txt = data def LoadHtml(path, file_path): if len(file_path) == 0: file_path = "/" conn = http.client.HTTPConnection(path) try: conn.request("GET", file_path) response = conn.getresponse() print(response.status, response.reason, response.version) data = response.read().decode("utf-8") if response.status == 301: data = response.getheader("Location") lock.acquire() new_path.append(data) lock.release() data = "" #print(data) conn.close() return data except Exception as e: print(e.args) def ParseArgs(): # 初始化解析器 parser = argparse.ArgumentParser() # 定义参数 parser.add_argument("-p", "--path", help="域名") parser.add_argument("-d", "--deep", type=int, help="递归深度") # 解析 args = parser.parse_args() return args def formatPath(path): path = path.removeprefix("https://") path = path.removeprefix("http://") path = path.removeprefix("//") return path def doWork(path): path = formatPath(path) m = path.find("/") if m == -1: m = len(path) data = LoadHtml(path[:m], path[m:]) with open(prefix + path[:m] + ".html", "w+", encoding="utf-8") as f: f.write(data) parse.feed(data) def work(deep,maxdeep): if deep > maxdeep: return args = ParseArgs() cur_path.append(formatPath(args.path)) readed_path.append(formatPath(args.path)) parse = MyHttpParser() e = multiprocessing.Pool(4) for i in range(args.deep): size = len(cur_path) e.map(doWork,cur_path) cur_path[:]=[] for p in new_path: cur_path.append(p) new_path[:]=[] print(i)优化此代码能在windows下运行
在Windows下,`multiprocessing` 库不支持在交互式(interactive)窗口下运行,需要将其放在一个独立的脚本中运行。另外,如果要使用 `multiprocessing.Manager().list()` 实现进程间共享数据,需要使用 `multiprocessing.Manager()` 返回的对象来创建进程池(`multiprocessing.Pool()`)。
以下是对代码的优化:
1. 将 `multiprocessing.Manager().list()` 替换为 `multiprocessing.Manager().Queue()`,因为 Queue 更适合用于进程间通信。
2. 将 `multiprocessing.Pool()` 替换为 `concurrent.futures.ProcessPoolExecutor()`,因为后者可以更好地处理进程池中的异常。
3. 将 `work()` 函数改为使用 `concurrent.futures.ProcessPoolExecutor()` 来执行任务。
4. 将 `MyHttpParser` 类的属性(`tag`、`href`、`txt`)改为实例属性,因为多个进程可能同时访问这些属性。
5. 在 `doWork()` 函数中,改为使用 `requests` 库来发送 HTTP 请求,因为它比 `http.client` 更易用。
以下是优化后的代码:
```python
import requests
from html.parser import HTMLParser
import argparse
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import multiprocessing
prefix = "save/"
readed_path = multiprocessing.Manager().Queue()
cur_path = multiprocessing.Manager().Queue()
new_path = multiprocessing.Manager().Queue()
lock = multiprocessing.Lock()
class MyHttpParser(HTMLParser):
def __init__(self):
super().__init__()
self.tag = []
self.href = ""
self.txt = ""
def handle_starttag(self, tag, attrs):
self.tag.append(tag)
if tag == "a":
for att in attrs:
if att[0] == 'href':
self.href = att[1]
def handle_endtag(self, tag):
if tag == "a" and len(self.tag) > 2 and self.tag[-2] == "div":
print("in div, link txt is %s ." % self.txt)
print("in div, link url is %s ." % self.href)
if not self.href in readed_path.queue:
readed_path.put(self.href)
new_path.put(self.href)
self.tag.pop(-1)
def handle_data(self, data):
if len(self.tag) >= 1 and self.tag[-1] == "a":
self.txt = data
def LoadHtml(path, file_path):
if len(file_path) == 0:
file_path = "/"
url = f"http://{path}{file_path}"
try:
response = requests.get(url)
print(response.status_code, response.reason, response.raw.version)
data = response.content.decode("utf-8")
if response.status_code == 301:
data = response.headers["Location"]
if not data in readed_path.queue:
new_path.put(data)
data = ""
return data
except Exception as e:
print(e.args)
def ParseArgs():
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--path", help="domain name")
parser.add_argument("-d", "--deep", type=int, help="recursion depth")
args = parser.parse_args()
return args
def formatPath(path):
path = path.removeprefix("https://")
path = path.removeprefix("http://")
path = path.removeprefix("//")
return path
def doWork(path):
path = formatPath(path)
m = path.find("/")
if m == -1:
m = len(path)
data = LoadHtml(path[:m], path[m:])
with open(prefix + path[:m] + ".html", "w+", encoding="utf-8") as f:
f.write(data)
parse.feed(data)
def work(maxdeep):
args = ParseArgs()
cur_path.put(formatPath(args.path))
readed_path.put(formatPath(args.path))
parse = MyHttpParser()
with ProcessPoolExecutor(max_workers=4) as executor:
for i in range(args.deep):
size = cur_path.qsize()
futures = [executor.submit(doWork, cur_path.get()) for _ in range(size)]
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(e)
cur_path.queue.clear()
while not new_path.empty():
cur_path.put(new_path.get())
print(i)
if __name__ == '__main__':
work(5)
```