import asyncio
async def print_B(): #Simple async def
print("B")
async def main_def():
print("A")
await asyncio.gather(print_B())
print("C")
asyncio.run(main_def())
# The function you wait for must include async
# The function you use await must include async
# The function you use await must run by asyncio.run(THE_FUNC())
import asyncio
async def eternity():
# Sleep for 60 minutes
await asyncio.sleep(3600)
print('yay!')
async def main():
try:
# timeout if function takes longer than 1 second
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')
asyncio.run(main())
# output timeout!
import asyncio
import time
async def say_after(delay, msg):
"""
:param delay: how many seconds to delay
:param msg: message to print to console
:return: msg
"""
await asyncio.sleep(delay)
print(msg)
return msg
async def main():
print(f"started at {time.strftime('%X')}")
# execute async functions concurrently and return the results as a list
results = await asyncio.gather(say_after(delay=3, msg='hello'), say_after(delay=1, msg='world'))
print(f"finished at {time.strftime('%X')}")
print(f"Results of async gather {results}")
# run async main function
asyncio.run(main())
# output
# started at 16:57:46
# world
# hello
# finished at 16:57:49
# Results of async gather ['hello', 'world']
import asyncio
import time
async def say_after(delay, msg):
"""
:param delay: how many seconds to delay
:param msg: message to print to console
:return: None
"""
await asyncio.sleep(delay)
print(msg)
async def main():
print(f"started at {time.strftime('%X')}")
# execute async functions in order
await say_after(delay=3, msg='hello')
await say_after(delay=2, msg='world')
print(f"finished at {time.strftime('%X')}")
# run async main function
asyncio.run(main())
# output
# started at 16:47:08
# hello
# world
# finished at 16:47:13
async def get_chat_id(name):
await asyncio.sleep(3)
return "chat-%s" % name
async def main():
id_coroutine = get_chat_id("django")
result = await id_coroutine
import asyncio
from PIL import Image
import urllib.request as urllib2
async def getPic(): #Proof of async def
pic = Image.open(urllib2.urlopen("https://c.files.bbci.co.uk/E9DF/production/_96317895_gettyimages-164067218.jpg"))
return pic
async def main_def():
print("A")
print("Must await before get pic0...")
pic0 = await asyncio.gather(getPic())
print(pic0)
asyncio.run(main_def())
async def sleep():
print(f'Time: {time.time() - start:.2f}')
await asyncio.sleep(1)
import asyncio
import time
from asgiref.sync import sync_to_async
def blocking_function(seconds: int) -> str:
time.sleep(seconds)
return f"Finished in {seconds} seconds"
async def main():
seconds_to_sleep = 5
function_message = await sync_to_async(blocking_function)(seconds_to_sleep)
print(function_message)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()