django apscheduler在特定时间执行一次任务(run at a specify time only once)
如何使程序在特定时间只执行一次,我查了一下。
celery可以,时间以秒计。
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。task = mytask.apply_async(args=[10, 20], countdown=60)
最后找到了apscheduler。
from django.utils import timezone from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.schedulers.background import BackgroundScheduler #from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler( jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone()) #scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone()) #scheduler = BlockingScheduler(executors=executors, job_defaults=job_defaults, timezone=timezone.get_current_timezone()) def myjob(): pass try: scheduler.start()
# 5s后执行myjob # 传入时间去除毫秒 deadline = datetime.datetime.now().replace(microsecond=0) + datetime.timedelta(seconds=5) scheduler.add_job(myjob, 'date', run_date=deadline) except (KeyboardInterrupt, SystemExit): scheduler.shutdown()

更多精彩