schedule.every(9).to(10).hour.do(job)
时间: 2024-04-30 22:21:45 浏览: 268
这是一个使用schedule模块的定时任务,表示每天的9点到10点之间,执行一次名为"job"的任务。具体的实现可以参考以下示例代码:
```
import schedule
import time
def job():
print("I'm working...")
# 每天 9:00~10:00 执行一次 job()
schedule.every().day.at("09:00").do(job)
while True:
schedule.run_pending()
time.sleep(1)
```
需要注意的是,schedule模块需要不断地检查当前时间是否到达设定的执行时间,因此需要在一个循环中不断调用`schedule.run_pending()`方法。同时,为了避免无意义的循环,可以在每次循环中加入`time.sleep(1)`,让程序休眠一秒钟,等待下一次检查。
相关问题
def run(): schedule.every().hour.at(":10").do(cc_job) while True: schedule.run_pending() time.sleep(3540) if __name__ == '__main__': p = mp.Process(target=run) p.start()
这段代码使用了Python的`schedule`模块来设置一个每小时的定时任务,并以进程的方式在后台运行。具体步骤如下:
1. 首先导入需要使用的模块:
```
import schedule
import time
import multiprocessing as mp
```
2. 定义一个函数`cc_job`,用于执行定时任务:
```
def cc_job():
print("Run cc_job at every hour :10")
```
3. 定义一个函数`run`,用于设置定时任务并启动进程:
```
def run():
schedule.every().hour.at(":10").do(cc_job) # 每小时的10分执行一次
while True:
schedule.run_pending() # 运行所有的定时任务
time.sleep(3540) # 等待3540秒,即59分钟
```
这里使用`3540`秒是因为每个小时的10分执行一次,所以需要等待59分钟才能再次执行。
4. 在`__name__ == '__main__'`条件下,使用`mp.Process`创建一个进程,运行`run`函数:
```
if __name__ == '__main__':
p = mp.Process(target=run)
p.start()
```
这样就可以在后台运行一个每小时的定时任务了。需要注意的是,如果你要在服务器上运行这个程序,最好使用`nohup`命令或者`screen`命令来启动程序,以确保程序在后台一直运行。
此代码执行后就直接结束了from analysis import CC from datetime import datetime, timedelta import schedule from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger import multiprocessing as mp import time def cc_job(): # 获取当天日期 date_today = datetime.today().strftime('%Y%m%d') # 获取当前时间 now_time = datetime.now().time().strftime('%H:%M:%S') # 获取1小时前整点开始时间 one_hour_ago_start = (datetime.now() - timedelta(hours=1)).time().strftime('%H:00:00') # one_hour_ago_start = (datetime.now() - timedelta(hours=1)).time().strftime('%H:%M:%S') # 获取1小时前整点结束时间 one_hour_ago_end = (datetime.now() - timedelta(hours=1)).time().strftime('%H:59:59') # one_hour_ago_end = (datetime.now() - timedelta(hours=1)).time().strftime('%H:%M:%S') # print(one_hour_ago_start) # print(one_hour_ago_end) cc = CC(one_hour_ago_start, one_hour_ago_end, date=date_today) cc.wesocket_connect_new() def run(): schedule.every().hour.at(":39").do(cc_job) schedule.run_pending() # time.sleep(60) if __name__ == '__main__': # # p = mp.Process(target=run) # p.start() scheduler = BackgroundScheduler() trigger = CronTrigger(hour='*', minute='05', second='10') scheduler.add_job(cc_job, trigger=trigger) scheduler.start()
这段代码定义了一个cc_job函数,该函数会在每小时的39分执行。接下来,代码创建了一个后台任务调度器,使用CronTrigger在每小时的第5分10秒执行cc_job函数。最后,调用scheduler.start()方法启动调度器。
由于调度器是在后台执行的,因此程序并不会直接结束。但是,由于调度器没有设置退出条件,如果你想退出程序,需要手动终止程序运行。
此外,你需要确保你的CC类已经被正确导入并且可用。如果导入失败或者CC类存在问题,程序会因为无法调用cc_job函数而报错。
阅读全文