您现在的位置: 主页 > 上位机技术 > python > python:定时任务模块schedule
本文所属标签:
为本文创立个标签吧:

python:定时任务模块schedule

来源:net 网络用户发布,如有版权联系网管删除 2019-02-25 

1.安装

pip install schedule

2.文档

https://schedule.readthedocs.io/en/stable/faq.html#how-to-execute-jobs-in-parallel

3.官网使用demo

import scheduleimport timedef job():    print("I'm working...")schedule.every(10).minutes.do(job)schedule.every().hour.do(job)schedule.every().day.at("10:30").do(job)schedule.every(5).to(10).minutes.do(job)schedule.every().monday.do(job)schedule.every().wednesday.at("13:15").do(job)schedule.every().minute.at(":17").do(job)while True:    schedule.run_pending()    time.sleep(1)

4.我的schedule使用demo

import sysimport timeimport scheduleimport osimport loggingif not os.path.exists('/var/log/video_download/'):    os.makedirs('/var/log/video_download')log = logging.getLogger()log.setLevel(logging.DEBUG)fmt = logging.Formatter("%(asctime)s %(pathname)s %(filename)s %(funcName)s %(lineno)s %(levelname)s - %(message)s",                        "%Y-%m-%d %H:%M:%S")stream_handler = logging.FileHandler(    '/var/log/video_download/debug-%s.log' % (time.strftime('%Y-%m-%d', time.localtime(time.time()))))stream_handler.setLevel(logging.DEBUG)stream_handler.setFormatter(fmt)log.addHandler(stream_handler)def handler():    print("this is handler")def main():    if len(sys.argv) != 2:        print('python video_data.py hour')        sys.exit()    param = sys.argv[1]    if param == 'hour':        log.debug("enter main")        schedule.every().day.at("00:00").do(handler)        schedule.every().hour.do(handler)        while True:            schedule.run_pending()            time.sleep(1)    else:        print("python video_data.py hour")        sys.exit()if __name__ == "__main__":    main()

5.拓展:

  并行执行任务

  (1)默认情况下,schedule按顺序执行所有作业。这背后的原因是很难找到一个让每个人都开心的并行执行模型

import threadingimport timeimport scheduledef job():    print("I'm running on thread %s" % threading.current_thread())def run_threaded(job_func):    job_thread = threading.Thread(target=job_func)    job_thread.start()schedule.every(10).seconds.do(run_threaded, job)schedule.every(10).seconds.do(run_threaded, job)schedule.every(10).seconds.do(run_threaded, job)schedule.every(10).seconds.do(run_threaded, job)schedule.every(10).seconds.do(run_threaded, job)while 1:    schedule.run_pending()    time.sleep(1)

  (2)如果需要控制线程数,就需要用queue

import Queueimport timeimport threadingimport scheduledef job():    print("I'm working")def worker_main():    while 1:        job_func = jobqueue.get()        job_func()        jobqueue.task_done()jobqueue = Queue.Queue()schedule.every(10).seconds.do(jobqueue.put, job)schedule.every(10).seconds.do(jobqueue.put, job)schedule.every(10).seconds.do(jobqueue.put, job)schedule.every(10).seconds.do(jobqueue.put, job)schedule.every(10).seconds.do(jobqueue.put, job)worker_thread = threading.Thread(target=worker_main)worker_thread.start()while 1:    schedule.run_pending()    time.sleep(1)

  (3)抛出异常

import functoolsdef catch_exceptions(cancel_on_failure=False):    def catch_exceptions_decorator(job_func):        @functools.wraps(job_func)        def wrapper(*args, **kwargs):            try:                return job_func(*args, **kwargs)            except:                import traceback                print(traceback.format_exc())                if cancel_on_failure:                    return schedule.CancelJob        return wrapper    return catch_exceptions_decorator@catch_exceptions(cancel_on_failure=True)def bad_task():    return 1 / 0schedule.every(5).minutes.do(bad_task)

  (4)只运行一次

def job_that_executes_once():    # Do some work ...    return schedule.CancelJobschedule.every().day.at('22:30').do(job_that_executes_once)

  (5)一次取消多个任务

def greet(name):    print('Hello {}'.format(name))schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')schedule.clear('daily-tasks')

  (6)在任务中加入日志功能

import functoolsimport timeimport schedule# This decorator can be applied todef with_logging(func):    @functools.wraps(func)    def wrapper(*args, **kwargs):        print('LOG: Running job "%s"' % func.__name__)        result = func(*args, **kwargs)        print('LOG: Job "%s" completed' % func.__name__)        return result    return wrapper@with_loggingdef job():    print('Hello, World.')schedule.every(3).seconds.do(job)while 1:    schedule.run_pending()    time.sleep(1)

  (7)随机开展工作

def my_job():    # This job will execute every 5 to 10 seconds.    print('Foo')schedule.every(5).to(10).seconds.do(my_job)

  (8)传参给作业函数

def greet(name):    print('Hello', name)schedule.every(2).seconds.do(greet, name='Alice')schedule.every(4).seconds.do(greet, name='Bob')



              查看评论 回复



嵌入式交流网主页 > 上位机技术 > python > python:定时任务模块schedule
 使用 任务 HSDL-3208-02

"python:定时任务模块schedule"的相关文章

网站地图

围观()