Running scheduled tasks with ease in Python. This implementation API is based on Java ScheduledExecutorService but it has been pythonized. There are other methods of running scheduled tasks in python but this seems like a natural choice for asyncio.
The source code for the scheduler can be found here https://github.com/gregbugaj/marie-ai/blob/main/marie/concur/ScheduledExecutorService.py
The schedule_with_fixed_delay is the primary method of the scheduler with the following signature.
def schedule_with_fixed_delay(
self,
func: Callable,
initial_delay: int,
delay: int,
unit: TimeUnit,
*args,
**kwargs,
) -> Any:
"""
Creates and executes a periodic action that becomes enabled first after the given initial delay,
and subsequently with the given period.
Args:
func: the task to execute
initial_delay: the time to delay first execution
delay: the period between successive executions
unit: the time unit of the initial_delay and period parameters
Return:
a ScheduledTask representing pending completion of the task
"""
Creating a new scheduler and creating a task in just a few lines of code.
# Task to execute
async def async_task(name: str):
print(f"{name} : {current_thread().name} {time.time()} Start")
await asyncio.sleep(1)
print(f"{name} : {current_thread().name} {time.time()} Complete")
# Create scheduler and schedule a task
scheduler = ScheduledExecutorService.new_scheduled_asyncio_pool()
t1 = scheduler.schedule_at_fixed_rate(async_task, 1, TimeUnit.MILLISECONDS, name="T1")
Sample Usage
import asyncio
import threading
import time
from marie.concur import ScheduledExecutorService
from threading import current_thread
from marie.concur.ScheduledExecutorService import TimeUnit
async def async_task(name: str):
print(f"{name} : {current_thread().name} {time.time()} Start")
await asyncio.sleep(1)
print(f"{name} : {current_thread().name} {time.time()} Complete")
async def cancel_callback(task):
print(f"canceling task : {task}")
await task.stop()
async def main():
scheduler = ScheduledExecutorService.new_scheduled_asyncio_pool()
t1 = scheduler.schedule_at_fixed_rate(
async_task, 1, TimeUnit.MILLISECONDS, name="T1"
)
t2 = scheduler.schedule_at_fixed_rate(
async_task, 2, TimeUnit.MILLISECONDS, name="T2"
)
asyncio.get_event_loop().call_later(3, asyncio.create_task, cancel_callback(t1))
async def main_single():
scheduler = ScheduledExecutorService.new_scheduled_asyncio_pool()
t1 = scheduler.schedule_at_fixed_rate(
async_task, 1, TimeUnit.MILLISECONDS, name="T1"
)
# call_later() only supports callbacks (regular functions); you can’t pass in a coroutine.
asyncio.get_event_loop().call_later(3, asyncio.create_task, cancel_callback(t1))
# await t1.task
async def main_delay():
scheduler = ScheduledExecutorService.new_scheduled_asyncio_pool()
t1 = scheduler.schedule_with_fixed_delay(
async_task, 2, 1, TimeUnit.MILLISECONDS, name="T1"
)
await t1.start()
print(t1.task)
asyncio.get_event_loop().call_later(3, cancel_callback, t1)
await t1.task
if __name__ == "__main__":
print("Main")
loop = asyncio.get_event_loop()
try:
# asyncio.ensure_future(main_single())
asyncio.ensure_future(main())
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print("Closing Loop")
loop.close()
I am utilizing this in the Marie-AI project for my watchdog service.
