此代码执行后就直接结束了from analysis import CC from datetime import datetime, timedelta import schedule from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger import multiprocessing as mp import time def cc_job(): # 获取当天日期 date_today = datetime.today().strftime('%Y%m%d') # 获取当前时间 now_time = datetime.now().time().strftime('%H:%M:%S') # 获取1小时前整点开始时间 one_hour_ago_start = (datetime.now() - timedelta(hours=1)).time().strftime('%H:00:00') # one_hour_ago_start = (datetime.now() - timedelta(hours=1)).time().strftime('%H:%M:%S') # 获取1小时前整点结束时间 one_hour_ago_end = (datetime.now() - timedelta(hours=1)).time().strftime('%H:59:59') # one_hour_ago_end = (datetime.now() - timedelta(hours=1)).time().strftime('%H:%M:%S') # print(one_hour_ago_start) # print(one_hour_ago_end) cc = CC(one_hour_ago_start, one_hour_ago_end, date=date_today) cc.wesocket_connect_new() def run(): schedule.every().hour.at(":39").do(cc_job) schedule.run_pending() # time.sleep(60) if __name__ == '__main__': # # p = mp.Process(target=run) # p.start() scheduler = BackgroundScheduler() trigger = CronTrigger(hour='*', minute='05', second='10') scheduler.add_job(cc_job, trigger=trigger) scheduler.start()
时间: 2024-04-28 07:21:39 浏览: 24
这段代码定义了一个cc_job函数,该函数会在每小时的39分执行。接下来,代码创建了一个后台任务调度器,使用CronTrigger在每小时的第5分10秒执行cc_job函数。最后,调用scheduler.start()方法启动调度器。
由于调度器是在后台执行的,因此程序并不会直接结束。但是,由于调度器没有设置退出条件,如果你想退出程序,需要手动终止程序运行。
此外,你需要确保你的CC类已经被正确导入并且可用。如果导入失败或者CC类存在问题,程序会因为无法调用cc_job函数而报错。
相关问题
#定时任务 : (任意APP下的views.py下或者urls.py下) from apscheduler.schedulers.background import BackgroundScheduler from django_apscheduler.jobstores import DjangoJobStore, register_job from apps.intelligent_draw.models import Machines from datetime import datetime, timedelta from utils.const import AllConsts from django.core.mail import send_mail #将已经过时的工作都删除 __lt小于 # from django_apscheduler.models import DjangoJob # DjangoJob.objects.filter(next_run_time__lte=datetime.now()).delete() # 将数据库中已经过时的工作删除 # 开启定时配置 # 实例化调度器 scheduler = BackgroundScheduler() # 调度器使用默认的DjangoJobStore() scheduler.add_jobstore(DjangoJobStore(), 'default') # trigger: 任务执行的方式,共有三种:'date':一次性任务、'interval':循环任务、'cron':定时任务。 # @register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_test') #注册一个定时任务 (replace_existing:重新启动时替换现有作业;防止id冲突报错) # 新增一个定时任务,每隔5分钟检测一次,如果发现机器的last_updated比现在的时间晚了半个小时,则更新机器状态为异常,并且发送一封邮件进行告警。 @register_job(scheduler, 'cron', minute='*/5', id='job', replace_existing=True) # 这里写你要执行的任务 def job_task(): print("定时任务开始:") half_hour_ago = datetime.now() - timedelta(minutes=30) machines = Machines.objects.filter(last_updated__lte=half_hour_ago) machine_list = [] for machine in machines: machine_list.append(machine.id) machines.update(status=AllConsts.MACHINE_STATUS_ILLEGAL) send_mail('邮件标题', '邮件内容', 'yourmail@qq.com', ['yourmail@126.com', 'yormail@qq.com']) # 定时任务开始 scheduler.start()这段代码有没有什么问题?
这段代码看起来没有明显的语法错误,但是需要注意一些细节问题:
1. 在使用 `BackgroundScheduler` 之前,需要先初始化 Django,即在代码开头加上以下代码:
```python
import django
django.setup()
```
2. 如果需要在 Django 的 `views.py` 文件中启动定时任务,需要在 `wsgi.py` 文件中添加以下代码:
```python
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore
scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), 'default')
scheduler.start()
```
然后在 `views.py` 中添加需要执行的定时任务。
3. 邮件发送需要配置相关的 SMTP 服务器信息,可以在 Django 的 `settings.py` 文件中添加以下配置:
```python
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_HOST = 'smtp.qq.com'
EMAIL_PORT = 465
EMAIL_USE_SSL = True
EMAIL_HOST_USER = 'yourmail@qq.com'
EMAIL_HOST_PASSWORD = 'yourpassword'
```
其中 `EMAIL_HOST_USER` 和 `EMAIL_HOST_PASSWORD` 分别为 SMTP 服务器的账号和密码。
希望对你有所帮助。
所以报错这样是正常的对吗from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR import datetime import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', datefmt='%Y-%m-%d %:H%M%S', filename='log1.txt', filemode='a') logging.FileHandler(encoding='utf-8', filename='log1.txt') def aps_test(x): print(datetime.datetime.now().strftime('%Y-%m-%d %:H%M%S'), x) def date_test(x): print(datetime.datetime.now().strftime('%Y-%m-%d %:H%M%S'), x) print(1/0) def my_listener(event): if event.exception: print('出错??????') else: print('正常///////////') scheduler = BlockingScheduler() scheduler.add_job(func=date_test, args=('一次性出错任务',), next_run_time=datetime.datetime.now()+datetime.timedelta(seconds=15), id='date_task') scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) scheduler._logger=logging scheduler.start()
这个报错是因为在logging.basicConfig()方法中,有一个格式字符串的语法错误,即datefmt参数的格式字符串中有一个“%”符号后面没有跟任何字符,应该把它改成“%H”。正确的格式字符串应该是'%Y-%m-%d %H:%M:%S'。修改后的代码如下:
```python
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import datetime
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S', # 修改这里的格式字符串
filename='log1.txt',
filemode='a')
logging.FileHandler(encoding='utf-8', filename='log1.txt')
def aps_test(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
def date_test(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
print(1/0)
def my_listener(event):
if event.exception:
print('出错??????')
else:
print('正常///////////')
scheduler = BlockingScheduler()
scheduler.add_job(func=date_test, args=('一次性出错任务',),
next_run_time=datetime.datetime.now()+datetime.timedelta(seconds=15),
id='date_task')
scheduler.add_job(func=aps_test, args=('循环任务',), trigger='interval', seconds=3, id='interval_task')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler._logger=logging
scheduler.start()
```
注意:如果你在使用这段代码时,日期时间格式不需要秒数,可以将datefmt参数改为'%Y-%m-%d %H:%M'。