首页
关于
友链
Search
1
ESXI 一些功能调整
442 阅读
2
SoftEther 客户端篇
423 阅读
3
天翼云网关3.0/4.0超管密码
414 阅读
4
SoftEther 服务端篇
308 阅读
5
远程桌面rustdesk使用说明
254 阅读
默认分类
代码相关
vue
html
python
系统
数据库
docker
安卓
软件分享
杂七杂八的工具
游戏分享
游戏相关
我的家庭影院
Ai
登录
Search
标签搜索
python
esxi
git
SoftEther
linux
apscheduler
在线
rclone
Ubuntu
list
列表
idm
激活码
Charles
pdf
免安装
鲁大师
图片查看器
蜂蜜浏览器
honeyview
哥特式
累计撰写
98
篇文章
累计收到
16
条评论
首页
栏目
默认分类
代码相关
vue
html
python
系统
数据库
docker
安卓
软件分享
杂七杂八的工具
游戏分享
游戏相关
我的家庭影院
Ai
页面
关于
友链
搜索到
2
篇与
的结果
2022-07-26
apscheduler cron定时任务触发接口
from apscheduler.schedulers.blocking import BlockingScheduler class Timing: def __init__(self, start_date, end_date, hour=None): self.start_date = start_date self.end_date = end_date self.hour = hour def cron(self, job, *value_list): """cron格式 在特定时间周期性地触发""" # year (int 或 str) – 年,4位数字 # month (int 或 str) – 月 (范围1-12) # day (int 或 str) – 日 (范围1-31) # week (int 或 str) – 周 (范围1-53) # day_of_week (int 或 str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun) # hour (int 或 str) – 时 (范围0-23) # minute (int 或 str) – 分 (范围0-59) # second (int 或 str) – 秒 (范围0-59) # start_date (datetime 或 str) – 最早开始日期(包含) # end_date (datetime 或 str) – 分 最晚结束时间(包含) # timezone (datetime.tzinfo 或str) – 指定时区 scheduler = BlockingScheduler() scheduler.add_job(job, 'cron', start_date=self.start_date, end_date=self.end_date, hour=self.hour, args=[*value_list]) scheduler.start() def interval(self, job, *value_list): """interval格式 周期触发任务""" # weeks (int) - 间隔几周 # days (int) - 间隔几天 # hours (int) - 间隔几小时 # minutes (int) - 间隔几分钟 # seconds (int) - 间隔多少秒 # start_date (datetime 或 str) - 开始日期 # end_date (datetime 或 str) - 结束日期 # timezone (datetime.tzinfo 或str) - 时区 scheduler = BlockingScheduler() # 在 2019-08-29 22:15:00至2019-08-29 22:17:00期间,每隔1分30秒 运行一次 job 方法 scheduler.add_job(job, 'interval', minutes=1, seconds=30, start_date=self.start_date, end_date=self.end_date, args=[*value_list]) scheduler.start() @staticmethod def date(job, *value_list): """date格式 特定时间点触发""" # run_date (datetime 或 str) - 作业的运行日期或时间 # timezone (datetime.tzinfo 或 str) - 指定时区 scheduler = BlockingScheduler() # 在 2019-8-30 01:00:01 运行一次 job 方法 scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=[*value_list]) scheduler.start()
2022年07月26日
88 阅读
0 评论
0 点赞
2022-05-26
APScheduler最基本的用法: “定时几秒后启动job”
APScheduler最基本的用法: “定时几秒后启动job”两种调度器: BackgroundScheduler和BlockingScheduler的区别,job执行时间大于定时调度时间特殊情况的问题及解决方法每个job都会以thread的方式被调度。1、基本的定时调度APScheduler是python的一个定时任务调度框架,能实现类似linux下crontab类型的任务,使用起来比较方便。它提供基于固定时间间隔、日期以及crontab配置类似的任务调度,并可以持久化任务,或将任务以daemon方式运行。下面是一个最基本的使用示例:from apscheduler.schedulers.blocking import BlockingScheduler def job(): print('job 3s') if __name__=='__main__': sched = BlockingScheduler(timezone='MST') sched.add_job(job, 'interval', id='3_second_job', seconds=3) sched.start()它能实现每隔3s就调度job()运行一次,所以程序每隔3s就输出'job 3s'。通过修改add_job()的参数seconds,就可以改变任务调度的间隔时间。2、BlockingScheduler与BackgroundScheduler区别APScheduler中有很多种不同类型的调度器,BlockingScheduler与BackgroundScheduler是其中最常用的两种调度器。那他们之间有什么区别呢? 简单来说,区别主要在于BlockingScheduler会阻塞主线程的运行,而BackgroundScheduler不会阻塞。所以,我们在不同的情况下,选择不同的调度器:BlockingScheduler: 调用start函数后会阻塞当前线程。当调度器是你应用中唯一要运行的东西时(如上例)使用。BackgroundScheduler: 调用start后主线程不会阻塞。当你不运行任何其他框架时使用,并希望调度器在你应用的后台执行。下面用两个例子来更直观的说明两者的区别。BlockingScheduler例子from apscheduler.schedulers.blocking import BlockingScheduler import time def job(): print('job 3s') if __name__=='__main__': sched = BlockingScheduler(timezone='MST') sched.add_job(job, 'interval', id='3_second_job', seconds=3) sched.start() while(True): # 不会被执行到 print('main 1s') time.sleep(1) 运行这个程序,我们得到如下的输出:job 3s job 3s job 3s job 3s 可见,BlockingScheduler调用start函数后会阻塞当前线程, 导致主程序中while循环不会被执行到。 BackgroundScheduler例子from apscheduler.schedulers.background import BackgroundScheduler import time def job(): print('job 3s') if __name__=='__main__': sched = BackgroundScheduler(timezone='MST') sched.add_job(job, 'interval', id='3_second_job', seconds=3) sched.start() while(True): print('main 1s') time.sleep(1) 可见,BackgroundScheduler调用start函数后并不会阻塞当前线程,所以可以继续执行主程序中while循环的逻辑。main 1s main 1s main 1s job 3s main 1s main 1s main 1s job 3s 通过这个输出,我们也可以发现,调用start函数后,job()并不会立即开始执行。而是等待3s后,才会被调度执行。如何让job在start()后就开始运行如何才能让调度器调用start函数后,job()就立即开始执行呢?其实APScheduler并没有提供很好的方法来解决这个问题,但有一种最简单的方式,就是在调度器start之前,就运行一次job(),如下from apscheduler.schedulers.background import BackgroundScheduler import time def job(): print('job 3s') if __name__=='__main__': job() # 执行一次就好了哟 sched = BackgroundScheduler(timezone='MST') sched.add_job(job, 'interval', id='3_second_job', seconds=3) sched.start() while(True): print('main 1s') time.sleep(1) 这样就能得到如下的输出job 3s main 1s main 1s main 1s job 3s main 1s main 1s main 1s这样虽然没有绝对做到“让job在start()后就开始运行”,但也能做到“不等待调度,而是刚开始就运行job”。如果job执行时间过长会怎么样如果执行job()的时间需要5s,但调度器配置为每隔3s就调用一下job(),会发生什么情况呢?我们写了如下例子:from apscheduler.schedulers.background import BackgroundScheduler import time def job(): print('job 3s') time.sleep(5) if __name__=='__main__': sched = BackgroundScheduler(timezone='MST') sched.add_job(job, 'interval', id='3_second_job', seconds=3) sched.start() while(True): print('main 1s') time.sleep(1)运行这个程序,我们得到如下的输出:main 1s main 1s main 1s job 3s main 1s main 1s main 1s Execution of job "job (trigger: interval[0:00:03], next run at: 2018-05-07 02:44:29 MST)" skipped: maximum number of running instances reached (1) main 1s main 1s main 1s job 3s main 1s可见,3s时间到达后,并不会“重新启动一个job线程”,而是会跳过该次调度,等到下一个周期(再等待3s),又重新调度job()。为了能让多个job()同时运行,我们也可以配置调度器的参数max_instances,如下例,我们允许2个job()同时运行:from apscheduler.schedulers.background import BackgroundScheduler import time def job(): print('job 3s') time.sleep(5) if __name__=='__main__': job_defaults = { 'max_instances': 2 } sched = BackgroundScheduler(timezone='MST', job_defaults=job_defaults) sched.add_job(job, 'interval', id='3_second_job', seconds=3) sched.start() while(True): print('main 1s') time.sleep(1)运行程序,我们得到如下的输出:main 1s main 1s main 1s job 3s main 1s main 1s main 1s job 3s main 1s main 1s main 1s job 3s每个job是怎么被调度的通过上面的例子,我们发现,调度器是定时调度job()函数,来实现调度的。那job()函数会被以进程的方式调度运行,还是以线程来运行呢?为了弄清这个问题,我们写了如下程序:from apscheduler.schedulers.background import BackgroundScheduler import time,os,threading def job(): print('job thread_id-{0}, process_id-{1}'.format(threading.get_ident(), os.getpid())) time.sleep(50) if __name__=='__main__': job_defaults = { 'max_instances': 20 } sched = BackgroundScheduler(timezone='MST', job_defaults=job_defaults) sched.add_job(job, 'interval', id='3_second_job', seconds=3) sched.start() while(True): print('main 1s') time.sleep(1)运行程序,我们得到如下的输出:main 1s main 1s main 1s job thread_id-10644, process_id-8872 main 1s main 1s main 1s job thread_id-3024, process_id-8872 main 1s main 1s main 1s job thread_id-6728, process_id-8872 main 1s main 1s main 1s job thread_id-11716, process_id-8872可见,每个job()的进程ID都相同,但线程ID不同。所以,job()最终是以线程的方式被调度执行。来源:https://www.jb51.net/article/218332.htm
2022年05月26日
82 阅读
0 评论
0 点赞