import os.path import pprint import textwrap import threading import time import requests import re import json from queue import Queue q_list = Queue(100) from threading import Thread headers = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ ' 'Safari/537.36' } # 获取m3u8视频片段的所有地址 def get_links(url): # 获取视频页的网页源代码 r = requests.get(url, headers=headers) info = re.findall('window.pageInfo = window.videoInfo =(.?)window.videoResource', r.text, re.DOTALL)[0].strip()[0:-1] # 获取m3u8列表地址 filename = json.loads(info)['title'] m3u8_url = json.loads(json.loads(info)["currentVideoInfo"]["ksPlayJson"])['adaptationSet'][0]['representation'][1]['url'] m3u8_list = requests.get(m3u8_url, headers=headers).text ts_files = re.sub('#.', '', m3u8_list).split() ts_length = len(ts_files) # 获取m3u8地址片段 for num, ts in enumerate(ts_files): ts_url = '' + ts q_list.put([ts_url, num]) return filename, ts_length # print(filename, ts_url) # 分别下载这些视频片段-多线程 def download(filename): while not q_list.empty(): ts_url, num = q_list.get() video_content = requests.get(ts_url, headers=headers).content with open(f'video/{filename}{num}.ts', 'wb') as f: f.write(video_content) print(f'{threading.current_thread().name}已下载...第{num}个片段') # 合并视频-构成完整的片段 def combine(filename, ts_length): fp = open(f'video/{filename}.mp4', 'ab') for i in range(ts_length): if os.path.exists(f'video/{filename}{i}.ts'): with open(f'video/{filename}{i}.ts', 'rb') as f: ts_slice = fp.write(ts_slice) print(f'已合并...第{i}个片段') os.remove(f'video/{filename}{i}.ts') print(f'已删除...第{i}个片段') fp.close() # 主文件调用 def main(): start_time = time.time() url = '' filename, ts_length = get_links(url) tasks = [] for i in range(3): th = Thread(target=download, args=(filename,), name=f'线程{i}') th.start() tasks.append(th) for t in tasks: t.join() combine(filename, ts_length) end_time = time.time() print(f'总共耗时{end_time - start_time}')运行无结果
时间: 2023-06-27 07:06:39 浏览: 170
这段代码是一个用于下载 AcFun 视频的 Python 脚本。它使用 requests 库来获取视频页面的源代码,并从中提取出视频的标题和 m3u8 列表地址。然后,它使用多线程来下载 m3u8 列表中的所有视频片段,并将它们合并成一个完整的视频文件。最后,它删除所有下载的视频片段。
import json import os.path import requests from time import sleep from lxml import etree import pprint
import json是Python中的一个内置模块,它提供了一种方便的方式来将Python对象转换为JSON格式,以及将JSON格式的数据转换为Python对象。在Python中使用import json导入该模块后,我们可以使用json.dumps()将Python对象转换为JSON格式,或者使用json.loads()将JSON格式的数据转换为Python对象。
优化import os.path import pprint import textwrap import threading import time import requests import re import json from queue import Queue q_list = Queue(100) from threading import Thread headers = { 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ ' 'Safari/537.36' } # 获取m3u8视频片段的所有地址 def get_links(url): # 获取视频页的网页源代码 r = requests.get(url, headers=headers) info = re.findall('window.pageInfo = window.videoInfo =(.*?)window.videoResource', r.text, re.DOTALL)[0].strip()[0:-1] # 获取m3u8列表地址 filename = json.loads(info)['title'] m3u8_url = json.loads(json.loads(info)["currentVideoInfo"]["ksPlayJson"])['adaptationSet'][0]['representation'][1]['url'] m3u8_list = requests.get(m3u8_url, headers=headers).text ts_files = re.sub('#.*', '', m3u8_list).split() ts_length = len(ts_files) # 获取m3u8地址片段 for num, ts in enumerate(ts_files): ts_url = '' + ts q_list.put([ts_url, num]) return filename, ts_length # print(filename, ts_url) # 分别下载这些视频片段-多线程 def download(filename): while not q_list.empty(): ts_url, num = q_list.get() video_content = requests.get(ts_url, headers=headers).content with open(f'video/{filename}_{num}.ts', 'wb') as f: f.write(video_content) print(f'{threading.current_thread().name}已下载...第{num}个片段') # 合并视频-构成完整的片段 def combine(filename, ts_length): fp = open(f'video/{filename}.mp4', 'ab') for i in range(ts_length): if os.path.exists(f'video/{filename}_{i}.ts'): with open(f'video/{filename}_{i}.ts', 'rb') as f: ts_slice = fp.write(ts_slice) print(f'已合并...第{i}个片段') os.remove(f'video/{filename}_{i}.ts') print(f'已删除...第{i}个片段') fp.close() # 主文件调用 def main(): start_time = time.time() url = '' filename, ts_length = get_links(url) tasks = [] for i in range(3): th = Thread(target=download, args=(filename,), name=f'线程{i}') th.start() tasks.append(th) for t in tasks: t.join() combine(filename, ts_length) end_time = time.time() print(f'总共耗时{end_time - start_time}')
1. 将所有的 import 放在文件开头,按照标准库、第三方库和本地库的顺序进行排列,方便阅读和维护。
2. 可以将获取视频片段地址和下载视频片段的代码放在同一个函数中,这样可以减少函数调用和确保两个步骤的原子性。
3. 可以使用 with 语句来打开文件,这样可以确保文件对象被正确关闭,避免出现文件泄露和意外删除的情况。
4. 可以使用 f-strings 来格式化输出,这样可以使代码更加简洁、易读和易维护。
5. 可以对代码中的变量名进行调整,使其更符合 Python 的命名规范,并且更加易于理解和使用。
import os.path
import pprint
import textwrap
import threading
import time
import requests
import re
import json
from queue import Queue
from threading import Thread
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ Safari/537.36'
def download_video_segments(url, ts_queue):
# 获取视频页的网页源代码
response = requests.get(url, headers=headers)
info = re.findall('window.pageInfo = window.videoInfo =(.*?)window.videoResource', response.text, re.DOTALL)[0].strip()[0:-1]
# 获取m3u8列表地址
filename = json.loads(info)['title']
m3u8_url = json.loads(json.loads(info)["currentVideoInfo"]["ksPlayJson"])['adaptationSet'][0]['representation'][1]['url']
m3u8_list = requests.get(m3u8_url, headers=headers).text
ts_files = re.sub('#.*', '', m3u8_list).split()
ts_length = len(ts_files)
# 获取m3u8地址片段
for num, ts in enumerate(ts_files):
ts_url = '' + ts
ts_queue.put([ts_url, num])
return filename, ts_length
def download_video_segment(filename, ts_queue):
while not ts_queue.empty():
ts_url, num = ts_queue.get()
video_content = requests.get(ts_url, headers=headers).content
with open(f'video/{filename}_{num}.ts', 'wb') as f:
def combine_video_segments(filename, ts_length):
with open(f'video/{filename}.mp4', 'ab') as fp:
for i in range(ts_length):
if os.path.exists(f'video/{filename}_{i}.ts'):
with open(f'video/{filename}_{i}.ts', 'rb') as f:
ts_slice =
def main():
start_time = time.time()
url = ''
ts_queue = Queue(100)
filename, ts_length = download_video_segments(url, ts_queue)
tasks = []
for i in range(3):
th = Thread(target=download_video_segment, args=(filename, ts_queue), name=f'线程{i}')
for t in tasks:
combine_video_segments(filename, ts_length)
end_time = time.time()
print(f'总共耗时{end_time - start_time}')