AsyncIO¶
Очевидные и неочевидные моменты по работе с AsyncIO
Режимы работы¶
У asyncio есть 2 режима работы: product и debug. Если надо отлаживать асинхронный код, то есть 4 способа это сделать:
- Установить переменную окружения
PYTHONASYNCIODEBUG
в 1 - Использовать Python Development Mode
- Запустить
asyncio.run(debug=True)
- Вызвать loop.set_debug()
Корутины (Coroutines)¶
Корутины — это функции, которые могут приостанавливать свое выполнение в определенной точке и затем продолжать его с того же места. Они отличаются от обычных функций тем, что не блокируют выполнение программы во время ожидания результата.
Для создания корутины используется слово async
:
Чтобы создать объект корутины, нам нужно "вызвать" её:
Корутины используют модель кооперативной многозадачности - это когда задачи сами решают, когда им передать управление в цикл событий. Делается это при помощи await:
async def main():
await asyncio.sleep(1) # корутина приостанавливается на 1 секунду, передавая управление в цикл событий.
А вот такой пример не передаст управление в цикл событий:
В этой корутине выполняется только печать числа 1, и нет никаких await
выражений - это означает, что она выполнится сразу, не приостанавливаясь и не передавая управления в цикл событий. Связано это с тем, что внутри нет никакого ожидания.
Задача (Task)¶
Задача — это обёртка вокруг корутины, которая планирует её выполнение в цикле событий и предоставляет способы управления ей.
Сами корутины не представляют способ отслеживания их состояния после вызова. Задача же позволяет отслеживать состояние корутины, получать результат, отменять выполнение и т.д.
Задачу можно создать при помощи asyncio.create_task()
. Как только она будет создана - она немедленно (при ближайшем переключении контекста, например при встрече await) начинает выполняться в цикле событий:
import asyncio
async def test():
await asyncio.sleep(1)
print("Готово")
return 1
async def main():
# Создание задачи
task = asyncio.create_task(test())
await asyncio.sleep(10) # ждем 10 секунд на всякий случай, чтобы наша задача точно успела выполниться.
# Запуск главной корутины
asyncio.run(main())
Сначала выведется слово "Готово" из таски которую мы создали, а через 10 секунд программа завершит свою работу.
Во всяком случае, чтобы получить результат из нашей task
и обработать возможные исключения, нам нужно использовать на ней await
:
import asyncio
async def test():
await asyncio.sleep(1)
print("Готово")
return 1
async def main():
task = asyncio.create_task(test())
# ожидаем окончания выполнения нашей задачи
result = await task
# выводим результат выполнения
print(result)
asyncio.run(main())
Event loop¶
По своей сути - цикл событий, это очередь в которой хранится список задач или сообщений, а затем вход в бесконечный цикл, где они обрабатываются по мере их поступления. Создавая цикл событий, мы создаем пустую очередь задач, а затем добавляем в эту очередь задачи для выполнения.
Часть с вводом-выводом в цикле событий построена на select - он позволяет наблюдать за дескрипторами, есть ли в них данные для получения или появилась ли возможность для записи.
Работа цикла событий как раз заключается в том, чтобы обрабатывать события и запускать задачи.
В первую очередь цикл проверяет, есть ли готовые для обработки события с дескрипторами - он запускает select и ждёт. Когда в сокет попадают данные или буфер отправки опустошается, asyncio проверяет объект привязанный к дескриптору и устанавливает для него значение "готово". После этого задача выполняется - выполняются каллбеки, выполняется сама приостановленная задача.
Часто встречаемые проблемы¶
Запуск нескольких задач одновременно¶
Запустить несколько задач одновременно в asyncio можно несколькими способами.
asyncio.gather¶
asyncio.gather()
одновременно запускает задачи, переданные в функции и ждет их выполнения. Здесь нет никакой обработки ошибок, поэтому лучше этот способ не использовать (с python 3.11)
async def test(name: str, to_sleep: int):
await asyncio.sleep(to_sleep)
print(f"{name} закончил выполнение")
async def main():
futures = [
test("A", 2),
test("B", 3)
]
await asyncio.gather(*futures)
print("готово")
asyncio.run(main())
# А закончил выполнение
# B закончил выполнение
# готово
Task groups¶
TaskGroup - это асинхронный менеджер контекста, содержащий группу задач, появился в python 3.11.
Его стоит использовать, если есть необходимость в удобном и надежном способе ожидания завершения всех задач в группе и для обработки ошибок:
async def test(name: str, to_sleep: int) -> str:
if to_sleep > 5:
raise AttributeError(f"{name} sleep > 5")
else:
await asyncio.sleep(to_sleep)
async def main():
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(test("A", 1))
task2 = tg.create_task(test("B", 2))
task3 = tg.create_task(test("C", 10))
task4 = tg.create_task(test("D", 6))
except* AttributeError as errors:
print([str(e) for e in errors.exceptions]) # ['C sleep > 5', 'D sleep > 5']
asyncio.run(main())
Запуск асинхронного кода в фоне¶
Например, нам нужно поставить асинхронную функцию на выполнение в фоне и не ожидая результата обрабатывать другие задачи:
import asyncio
async def delete_old_files():
## делаем чет тяжелое, например 10 секунд:
await asyncio.sleep(10)
## выводим сообщение об конце задачи
print("файлы удалены")
async def main():
coro = delete_old_files()
task = asyncio.create_task(coro)
print("задача на удаление файлов запущена")
### даем задаче запуститься
await asyncio.sleep(0)
### делаем что-то еще
await asyncio.sleep(20)
print("завершили работу")
asyncio.run(main())
Вывод будет следующий:
Запуск синхронного кода¶
Запустить синхронный код можно несколькими способами:
to_thread()¶
Подходит для запуска IO-bound задач, создает под капотом ThreadPoolExecutor и запускает задачи в нём.
import asyncio
import time
def blocking_task():
print('Task starting')
time.sleep(2)
print('Task done')
async def main():
print('Main running the blocking task')
coro = asyncio.to_thread(blocking_task)
task = asyncio.create_task(coro)
print('Main doing other things')
await asyncio.sleep(1)
await task
asyncio.run(main())
Или пример без create_task
:
# создаем корутину
blocking_coro = asyncio.to_thread(blocking, arg1, arg2)
# ожидаем корутину
result = await blocking_coro
run_in_executor()¶
Наиболее гибкий метод, позволяет самому выбирать экзекутор, который будет запускать задачи. Например, для CPU-bound задач можно воспользоваться ProcessPoolExecutor:
from concurrent.futures import ProcessPoolExecutor
import asyncio
import math
def blocking_task():
print("Включаем считалочку")
data = [math.sqrt(i) for i in range(50000000)]
print("Считалочка посчитала")
async def main():
print('Запускаем задачу')
loop = asyncio.get_running_loop()
exe = ProcessPoolExecutor(4)
awaitable = loop.run_in_executor(exe, blocking_task)
print('Работаем')
await asyncio.sleep(1)
await awaitable
# закрываем экзекутор
exe.shutdown()
if __name__ == '__main__':
asyncio.run(main())
Fire-and-forget¶
Рассмотрим данный код:
Стратегия запуска задачи, описаная выше называется "fire-and-forget" - запустил и забыл. Сама стратегия не плохая, но:
- Здесь отсутствует какая либо обработка ошибок
- Количество спавнящихся задач не контроллируется, что может "перегрузить" приложение лишними задачами
- Нет возможности корректно завершить задачи, если приложения захочет завершится, что может нарушить консистентность компонентов
- Является причиной трудноотлавливаемого бага, суть которого в том, что задача не будет выполняться до конца, так как не будет ссылки на результат, из-за чего она будет прибита сбощиком мусора. Евентлуп сохраняет только слабые ссылки на задачи. Подробнее в блоке "Important" здесь.
Для решения этих проблем можно попробовать использовать aiojobs, task groups.
IO в конструкторе¶
Этот класс проблем относится вообще не только к асинхронному коду. Например, у нас есть такой код
В конструкторе невозможно использовать await, это можно обойти двумя способами:
Factory method¶
В классе необходимо сделать отдельный асинхронный classmethod, который будет создавать объект нашего класса.
class APIClient:
def __init__(self):
self.client = some_asyncio_func()
@classmethod
async def create(cls):
self = cls()
self.client = await some_asyncio_func()
return self
api_client = await APIClient.create()
DI¶
Ещё лучше можно сделать при помощи Dependency injection, вынеся наш клиент в конструктор:
class APIClient:
def __init__(self, client: 'SomeClient'):
self.client = client
client = await some_asyncio_func()
api_client = APIClient(client)
Очистка ресурсов¶
Допустим, у нас есть следующий код:
async def fetch(db):
cursor = await db.execute("select * from table")
results = []
async for item in cursor:
results.append(item)
return results
Вопрос - что случится с cursor
?
По идее курсор должен должен закрыться с помощью GC через деструктор (__del__
), но он синхронный. А что, если объект курсора - это корутина?
Здесь на помощь должны прийти финалайзеры. Как правило у таких объектов появляется либо метод вида async def close(self)
либо используется асинхронный контекстный менеджер. В случае, если объект не закрывается, разработчики библиотек оставляют варнинги. Вот пример из ClientSession
для aiohttp
:
def __del__(self, _warnings: Any = warnings) -> None:
try:
# Видим НЕ ЗАКРЫТУЮ сессию, но ничего с ней сделать не можем
if not self.closed:
_warnings.warn(
f"Unclosed client session {self!r}",
ResourceWarning,
source=self,
)
context = {"client_session": self, "message": "Unclosed client session"}
if self._source_traceback is not None:
context["source_traceback"] = self._source_traceback
self._loop.call_exception_handler(context)
except AttributeError:
# loop was not initialized yet,
# either self._connector or self._loop doesn't exist
pass
Shielded execution¶
Есть следующий код на aiohttp:
async def handler(request):
await request.config[db].execute("update...")
return web.Response(text="updated")
Если соединение отвалится то обработчик упадет с ошибкой, так как некуда отправлять ответ. По хорошему задача должна отмениться, но что если мы хотим, чтобы она выполнилась наверняка?
Поможет нам в этом asycio.shield()
. Он защищает задачу от отмены, даже в случае возникновения ошибки. Выглядит это следующим образом:
async def handler(request):
await asyncio.shield(request.config[db].execute("update..."))
return web.Response(text="updated")
Тестирование и event loop¶
При тестировании важно (если у вас не какой-то специальный кейс) для каждого отдельного теста создавать отдельный event loop. Причина проста - задачи, который не выполнились в прошлом тесте могут перетечь во второй и сломать его.
К счастью для нас, тот же pytest-asyncio создает новый эвент луп на каждый тест за нас.