Running scheduled tasks with ease in Python. This implementation API is based on Java ScheduledExecutorService but it has been pythonized. There are other methods of running scheduled tasks in python but this seems like a natural choice for asyncio.
The source code for the scheduler can be found here https://github.com/gregbugaj/marie-ai/blob/main/marie/concur/ScheduledExecutorService.py
The schedule_with_fixed_delay is the primary method of the scheduler with the following signature.
def schedule_with_fixed_delay( self, func: Callable, initial_delay: int, delay: int, unit: TimeUnit, *args, **kwargs, ) -> Any: """ Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period. Args: func: the task to execute initial_delay: the time to delay first execution delay: the period between successive executions unit: the time unit of the initial_delay and period parameters Return: a ScheduledTask representing pending completion of the task """
Creating a new scheduler and creating a task in just a few lines of code.
# Task to execute async def async_task(name: str): print(f"{name} : {current_thread().name} {time.time()} Start") await asyncio.sleep(1) print(f"{name} : {current_thread().name} {time.time()} Complete") # Create scheduler and schedule a task scheduler = ScheduledExecutorService.new_scheduled_asyncio_pool() t1 = scheduler.schedule_at_fixed_rate(async_task, 1, TimeUnit.MILLISECONDS, name="T1")
Sample Usage
import asyncio import threading import time from marie.concur import ScheduledExecutorService from threading import current_thread from marie.concur.ScheduledExecutorService import TimeUnit async def async_task(name: str): print(f"{name} : {current_thread().name} {time.time()} Start") await asyncio.sleep(1) print(f"{name} : {current_thread().name} {time.time()} Complete") async def cancel_callback(task): print(f"canceling task : {task}") await task.stop() async def main(): scheduler = ScheduledExecutorService.new_scheduled_asyncio_pool() t1 = scheduler.schedule_at_fixed_rate( async_task, 1, TimeUnit.MILLISECONDS, name="T1" ) t2 = scheduler.schedule_at_fixed_rate( async_task, 2, TimeUnit.MILLISECONDS, name="T2" ) asyncio.get_event_loop().call_later(3, asyncio.create_task, cancel_callback(t1)) async def main_single(): scheduler = ScheduledExecutorService.new_scheduled_asyncio_pool() t1 = scheduler.schedule_at_fixed_rate( async_task, 1, TimeUnit.MILLISECONDS, name="T1" ) # call_later() only supports callbacks (regular functions); you can’t pass in a coroutine. asyncio.get_event_loop().call_later(3, asyncio.create_task, cancel_callback(t1)) # await t1.task async def main_delay(): scheduler = ScheduledExecutorService.new_scheduled_asyncio_pool() t1 = scheduler.schedule_with_fixed_delay( async_task, 2, 1, TimeUnit.MILLISECONDS, name="T1" ) await t1.start() print(t1.task) asyncio.get_event_loop().call_later(3, cancel_callback, t1) await t1.task if __name__ == "__main__": print("Main") loop = asyncio.get_event_loop() try: # asyncio.ensure_future(main_single()) asyncio.ensure_future(main()) loop.run_forever() except KeyboardInterrupt: pass finally: print("Closing Loop") loop.close()
I am utilizing this in the Marie-AI project for my watchdog service.