import time import multiprocessing from proxypool.processors.server import app from proxypool.processors.getter import Getter from proxypool.processors.tester import Tester from proxypool.setting import CYCLE_GETTER, CYCLE_TESTER, API_HOST, API_THREADED, API_PORT, ENABLE_SERVER, \ ENABLE_GETTER, ENABLE_TESTER, IS_WINDOWS from loguru import logger if IS_WINDOWS: multiprocessing.freeze_support() tester_process, getter_process, server_process = None, None, None class Scheduler(): def run_tester(self, cycle=CYCLE_TESTER): if not ENABLE_TESTER: logger.info('tester not enabled, exit') return tester = Tester() loop = 0 while True: logger.debug(f'tester loop {loop} start...') tester.run() loop += 1 time.sleep(cycle) # CYCLE_GETTER=100 def run_getter(self, cycle=CYCLE_GETTER): if not ENABLE_GETTER: logger.info('getter not enabled, exit') return getter = Getter() loop = 0 while True: logger.debug(f'getter loop {loop} start...') getter.run() loop += 1 time.sleep(cycle) def run_server(self): if not ENABLE_SERVER: logger.info('server not enabled, exit') return app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED) def run(self): global tester_process, getter_process, server_process try: logger.info('starting proxypool...') if ENABLE_TESTER: tester_process = multiprocessing.Process(target=self.run_tester) logger.info(f'starting tester, pid {tester_process.pid}...') tester_process.start() if ENABLE_GETTER: getter_process = multiprocessing.Process(target=self.run_getter) logger.info(f'starting getter, pid{getter_process.pid}...') getter_process.start() if ENABLE_SERVER: server_process = multiprocessing.Process(target=self.run_server) logger.info(f'starting server, pid{server_process.pid}...') server_process.start() tester_process.join() getter_process.join() server_process.join() except KeyboardInterrupt: logger.info('received keyboard interrupt signal') tester_process.terminate() getter_process.terminate() server_process.terminate() finally: # must call join method before calling is_alive tester_process.join() getter_process.join() server_process.join() logger.info(f'tester is {"alive" if tester_process.is_alive() else "dead"}') logger.info(f'getter is {"alive" if getter_process.is_alive() else "dead"}') logger.info(f'server is {"alive" if server_process.is_alive() else "dead"}') logger.info('proxy terminated') if name == 'main': scheduler = Scheduler() scheduler.run()给这段代码加注释
时间: 2024-04-05 13:35:04 浏览: 124
Python多进程multiprocessing.Pool类详解
5星 · 资源好评率100%
这段代码是一个代理池的调度程序,用于获取代理IP并测试其可用性。下面是对代码的详细注释:
```python
import time
import multiprocessing
from proxypool.processors.server import app # 导入 server 模块
from proxypool.processors.getter import Getter # 导入 getter 模块
from proxypool.processors.tester import Tester # 导入 tester 模块
from proxypool.setting import CYCLE_GETTER, CYCLE_TESTER, API_HOST, API_THREADED, API_PORT, ENABLE_SERVER, ENABLE_GETTER, ENABLE_TESTER, IS_WINDOWS # 导入配置文件
from loguru import logger # 导入日志库
if IS_WINDOWS:
multiprocessing.freeze_support() # Windows 平台需要调用这个函数
tester_process, getter_process, server_process = None, None, None # 定义进程对象
class Scheduler():
def run_tester(self, cycle=CYCLE_TESTER): # 定义 tester 进程的运行函数
if not ENABLE_TESTER: # 若不需要测试,则直接退出
logger.info('tester not enabled, exit')
return
tester = Tester()
loop = 0
while True:
logger.debug(f'tester loop {loop} start...')
tester.run() # 运行 tester
loop += 1
time.sleep(cycle) # 循环周期
def run_getter(self, cycle=CYCLE_GETTER): # 定义 getter 进程的运行函数
if not ENABLE_GETTER: # 若不需要获取,则直接退出
logger.info('getter not enabled, exit')
return
getter = Getter()
loop = 0
while True:
logger.debug(f'getter loop {loop} start...')
getter.run() # 运行 getter
loop += 1
time.sleep(cycle) # 循环周期
def run_server(self): # 定义 server 进程的运行函数
if not ENABLE_SERVER: # 若不需要启动 server,则直接退出
logger.info('server not enabled, exit')
return
app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED) # 运行 server
def run(self): # 定义调度程序的运行函数
global tester_process, getter_process, server_process # 定义全局变量
try:
logger.info('starting proxypool...')
if ENABLE_TESTER: # 若需要测试,则启动 tester 进程
tester_process = multiprocessing.Process(target=self.run_tester)
logger.info(f'starting tester, pid {tester_process.pid}...')
tester_process.start()
if ENABLE_GETTER: # 若需要获取,则启动 getter 进程
getter_process = multiprocessing.Process(target=self.run_getter)
logger.info(f'starting getter, pid{getter_process.pid}...')
getter_process.start()
if ENABLE_SERVER: # 若需要启动 server,则启动 server 进程
server_process = multiprocessing.Process(target=self.run_server)
logger.info(f'starting server, pid{server_process.pid}...')
server_process.start()
tester_process.join() # 等待 tester 进程结束
getter_process.join() # 等待 getter 进程结束
server_process.join() # 等待 server 进程结束
except KeyboardInterrupt: # 捕获键盘中断信号
logger.info('received keyboard interrupt signal')
tester_process.terminate() # 终止 tester 进程
getter_process.terminate() # 终止 getter 进程
server_process.terminate() # 终止 server 进程
finally: # 无论如何都要关闭进程
tester_process.join() # 等待 tester 进程结束
getter_process.join() # 等待 getter 进程结束
server_process.join() # 等待 server 进程结束
logger.info(f'tester is {"alive" if tester_process.is_alive() else "dead"}')
logger.info(f'getter is {"alive" if getter_process.is_alive() else "dead"}')
logger.info(f'server is {"alive" if server_process.is_alive() else "dead"}')
logger.info('proxy terminated')
if __name__ == '__main__':
scheduler = Scheduler()
scheduler.run() # 运行调度程序
```
这段代码中启动了三个进程:tester 进程用于测试代理 IP 的可用性,getter 进程用于获取代理 IP,server 进程用于提供代理 IP 的 API 接口。其中,每个进程都有自己的运行函数,它们会循环运行并在一定时间间隔内执行一次相应的操作。如果不需要某个进程,则会直接退出。调度程序会等待所有进程运行结束,并在捕获到键盘中断信号时及时关闭进程。
阅读全文