scheduler.start() logger.info(scheduler.add_task()) logger.debug('None') time.sleep(600)
时间: 2024-05-28 17:12:35 浏览: 172
这段代码看起来像是一个任务调度器的代码。首先,调用 `scheduler.start()` 来启动任务调度器。然后,使用日志记录器 `logger` 的 `info` 方法来记录添加任务的结果。接下来,使用 `debug` 方法记录了一个字符串 `'None'`。最后,使用 `time.sleep(600)` 让程序暂停 600 秒(即 10 分钟)。
需要注意的是,这段代码中的 `add_task()` 方法并没有被提供,因此无法确定它具体做了什么。同时,这段代码中的日志记录器也没有被初始化或配置,因此无法确定日志会被输出到哪里。
相关问题
所以报错这样是正常的对吗from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR import datetime import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %:H%M%S', filename='log1.txt', filemode='a') logging.FileHandler(encoding='utf-8', filename='log1.txt') def aps_test(x): print(datetime.datetime.now().strftime('%Y-%m-%d %:H%M%S'), x) def date_test(x): print(datetime.datetime.now().strftime('%Y-%m-%d %:H%M%S'), x) print(1/0) def my_listener(event): if event.exception: print('出错??????') else: print('正常///////////') scheduler = BlockingScheduler() scheduler.add_job(func=date_test, args=('一次性出错任务',), next_run_time=datetime.datetime.now()+datetime.timedelta(seconds=15), id='date_task') scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) scheduler._logger=logging scheduler.start()
这个报错是因为在logging.basicConfig()方法中,有一个格式字符串的语法错误,即datefmt参数的格式字符串中有一个“%”符号后面没有跟任何字符,应该把它改成“%H”。正确的格式字符串应该是'%Y-%m-%d %H:%M:%S'。修改后的代码如下:
```python
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import datetime
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S', # 修改这里的格式字符串
filename='log1.txt',
filemode='a')
logging.FileHandler(encoding='utf-8', filename='log1.txt')
def aps_test(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
def date_test(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
print(1/0)
def my_listener(event):
if event.exception:
print('出错??????')
else:
print('正常///////////')
scheduler = BlockingScheduler()
scheduler.add_job(func=date_test, args=('一次性出错任务',),
next_run_time=datetime.datetime.now()+datetime.timedelta(seconds=15),
id='date_task')
scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler._logger=logging
scheduler.start()
```
注意:如果你在使用这段代码时,日期时间格式不需要秒数,可以将datefmt参数改为'%Y-%m-%d %H:%M'。
import time import multiprocessing from proxypool.processors.server import app from proxypool.processors.getter import Getter from proxypool.processors.tester import Tester from proxypool.setting import CYCLE_GETTER, CYCLE_TESTER, API_HOST, API_THREADED, API_PORT, ENABLE_SERVER, \ ENABLE_GETTER, ENABLE_TESTER, IS_WINDOWS from loguru import logger if IS_WINDOWS: multiprocessing.freeze_support() tester_process, getter_process, server_process = None, None, None class Scheduler(): def run_tester(self, cycle=CYCLE_TESTER): if not ENABLE_TESTER: logger.info('tester not enabled, exit') return tester = Tester() loop = 0 while True: logger.debug(f'tester loop {loop} start...') tester.run() loop += 1 time.sleep(cycle) # CYCLE_GETTER=100 def run_getter(self, cycle=CYCLE_GETTER): if not ENABLE_GETTER: logger.info('getter not enabled, exit') return getter = Getter() loop = 0 while True: logger.debug(f'getter loop {loop} start...') getter.run() loop += 1 time.sleep(cycle) def run_server(self): if not ENABLE_SERVER: logger.info('server not enabled, exit') return app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED) def run(self): global tester_process, getter_process, server_process try: logger.info('starting proxypool...') if ENABLE_TESTER: tester_process = multiprocessing.Process(target=self.run_tester) logger.info(f'starting tester, pid {tester_process.pid}...') tester_process.start() if ENABLE_GETTER: getter_process = multiprocessing.Process(target=self.run_getter) logger.info(f'starting getter, pid{getter_process.pid}...') getter_process.start() if ENABLE_SERVER: server_process = multiprocessing.Process(target=self.run_server) logger.info(f'starting server, pid{server_process.pid}...') server_process.start() tester_process.join() getter_process.join() server_process.join() except KeyboardInterrupt: logger.info('received keyboard interrupt signal') tester_process.terminate() getter_process.terminate() server_process.terminate() finally: # must call join method before calling is_alive tester_process.join() getter_process.join() server_process.join() logger.info(f'tester is {"alive" if tester_process.is_alive() else "dead"}') logger.info(f'getter is {"alive" if getter_process.is_alive() else "dead"}') logger.info(f'server is {"alive" if server_process.is_alive() else "dead"}') logger.info('proxy terminated') if name == 'main': scheduler = Scheduler() scheduler.run()给这段代码加注释
这段代码是一个代理池的调度程序,用于获取代理IP并测试其可用性。下面是对代码的详细注释:
```python
import time
import multiprocessing
from proxypool.processors.server import app # 导入 server 模块
from proxypool.processors.getter import Getter # 导入 getter 模块
from proxypool.processors.tester import Tester # 导入 tester 模块
from proxypool.setting import CYCLE_GETTER, CYCLE_TESTER, API_HOST, API_THREADED, API_PORT, ENABLE_SERVER, ENABLE_GETTER, ENABLE_TESTER, IS_WINDOWS # 导入配置文件
from loguru import logger # 导入日志库
if IS_WINDOWS:
multiprocessing.freeze_support() # Windows 平台需要调用这个函数
tester_process, getter_process, server_process = None, None, None # 定义进程对象
class Scheduler():
def run_tester(self, cycle=CYCLE_TESTER): # 定义 tester 进程的运行函数
if not ENABLE_TESTER: # 若不需要测试,则直接退出
logger.info('tester not enabled, exit')
return
tester = Tester()
loop = 0
while True:
logger.debug(f'tester loop {loop} start...')
tester.run() # 运行 tester
loop += 1
time.sleep(cycle) # 循环周期
def run_getter(self, cycle=CYCLE_GETTER): # 定义 getter 进程的运行函数
if not ENABLE_GETTER: # 若不需要获取,则直接退出
logger.info('getter not enabled, exit')
return
getter = Getter()
loop = 0
while True:
logger.debug(f'getter loop {loop} start...')
getter.run() # 运行 getter
loop += 1
time.sleep(cycle) # 循环周期
def run_server(self): # 定义 server 进程的运行函数
if not ENABLE_SERVER: # 若不需要启动 server,则直接退出
logger.info('server not enabled, exit')
return
app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED) # 运行 server
def run(self): # 定义调度程序的运行函数
global tester_process, getter_process, server_process # 定义全局变量
try:
logger.info('starting proxypool...')
if ENABLE_TESTER: # 若需要测试,则启动 tester 进程
tester_process = multiprocessing.Process(target=self.run_tester)
logger.info(f'starting tester, pid {tester_process.pid}...')
tester_process.start()
if ENABLE_GETTER: # 若需要获取,则启动 getter 进程
getter_process = multiprocessing.Process(target=self.run_getter)
logger.info(f'starting getter, pid{getter_process.pid}...')
getter_process.start()
if ENABLE_SERVER: # 若需要启动 server,则启动 server 进程
server_process = multiprocessing.Process(target=self.run_server)
logger.info(f'starting server, pid{server_process.pid}...')
server_process.start()
tester_process.join() # 等待 tester 进程结束
getter_process.join() # 等待 getter 进程结束
server_process.join() # 等待 server 进程结束
except KeyboardInterrupt: # 捕获键盘中断信号
logger.info('received keyboard interrupt signal')
tester_process.terminate() # 终止 tester 进程
getter_process.terminate() # 终止 getter 进程
server_process.terminate() # 终止 server 进程
finally: # 无论如何都要关闭进程
tester_process.join() # 等待 tester 进程结束
getter_process.join() # 等待 getter 进程结束
server_process.join() # 等待 server 进程结束
logger.info(f'tester is {"alive" if tester_process.is_alive() else "dead"}')
logger.info(f'getter is {"alive" if getter_process.is_alive() else "dead"}')
logger.info(f'server is {"alive" if server_process.is_alive() else "dead"}')
logger.info('proxy terminated')
if __name__ == '__main__':
scheduler = Scheduler()
scheduler.run() # 运行调度程序
```
这段代码中启动了三个进程:tester 进程用于测试代理 IP 的可用性,getter 进程用于获取代理 IP,server 进程用于提供代理 IP 的 API 接口。其中,每个进程都有自己的运行函数,它们会循环运行并在一定时间间隔内执行一次相应的操作。如果不需要某个进程,则会直接退出。调度程序会等待所有进程运行结束,并在捕获到键盘中断信号时及时关闭进程。
阅读全文