Перейти к содержанию

AsyncIO

Очевидные и неочевидные моменты по работе с AsyncIO

Режимы работы

У asyncio есть 2 режима работы: product и debug. Если надо отлаживать асинхронный код, то есть 4 способа это сделать:

  • Установить переменную окружения PYTHONASYNCIODEBUG в 1
  • Использовать Python Development Mode
  • Запустить asyncio.run(debug=True)
  • Вызвать loop.set_debug()

Event loop

По своей сути - цикл собый, это очередь в которой хранится список задач или сообщений, а затем вход в бесконечный цикл, где они обрабатываются по мере их поступления. Создавая цикл событий, мы создаем пустую очередь задач, а затем добавляем в эту очередь задачи для выполнения.

Часть с вводом-выводом в цикле событий построена на select - он позволяет наблюдать за дескрипторами, есть ли в них данные для получения или появилась ли возможность для записи.

Работа цикла событий как раз заключается в том, чтобы обрабатывать события и запускать задачи.

В первую очередь цикл проверяет, есть ли готовые для обработки события с дескрипторами - он запускает select и ждёт. Когда в сокет попадают данные или буфер отправки опустошается, asyncio проверяет объект привязанный к дескриптору и устанавливает для него значение "готово". После этого задача выполняется - выполняются каллбеки, выполняется сама приостановленная задача.

Часто встречаемые проблемы

Запуск нескольких задач одновременно

Запустить несколько задач одновременно в asyncio можно несколькими способами.

asyncio.gather

asyncio.gather() одновременно запускает задачи, переданные в функции и ждет их выполнения. Здесь нет никакой обработки ошибки, поэтому лучше этот способ не использовать (с python 3.11)

async def test(name: str, to_sleep: int):
  await asyncio.sleep(to_sleep)
  print(f"{name} закончил выполнение")


async def main():
  futures = [
    test("A", 2),
    test("B", 3)
  ]
  await asyncio.gather(*futures)
  print("готово")

asyncio.run(main())
# А закончил выполнение
# B закончил выполнение
# готово

Task groups

TaskGroup - это асинхронный менеджер контекста, содержащий группу задач, появился в python 3.11.

Его стоит использовать, если есть необходимость в удобном и надежном способе ожидания завершения всех задач в группе и для обработки ошибок:

async def test(name: str, to_sleep: int) -> str:
  if to_sleep > 5:
    raise AttributeError(f"{name} sleep > 5")
  else:
    await asyncio.sleep(to_sleep)


async def main():
  try:
    async with asyncio.TaskGroup() as tg:
      task1 = tg.create_task(test("A", 1))
      task2 = tg.create_task(test("B", 2))
      task3 = tg.create_task(test("C", 10))
      task4 = tg.create_task(test("D", 6))
  except* AttributeError as errors:
    print([str(e) for e in errors.exceptions]) # ['C sleep > 5', 'D sleep > 5']

asyncio.run(main())

Запуск асинхронного кода в фоне

Например, нам нужно поставить асинхронную функцию на выполнение в фоне и не ожидая результата обрабатывать другие задачи:

import asyncio

async def delete_old_files():
    ## делаем чет тяжелое, например 10 секунд:
    await asyncio.sleep(10)
    ## выводим сообщение об конце задачи
    print("файлы удалены")

async def main():
    coro = delete_old_files()
    task = asyncio.create_task(coro)
    print("задача на удаление файлов запущена")
    ### даем задаче запуститься
    await asyncio.sleep(0)
    ### делаем что-то еще
    await asyncio.sleep(20)
    print("завершили работу")

asyncio.run(main())

Вывод будет следующий:

задача на удаление файлов запущена
файлы удалены
завершили работу

Запуск синхронного кода

Запустить синхронный код можно несколькими способами:

to_thread()

Подходит для запуска IO-bound задач, создает под капотом ThreadPoolExecutor и запускает задачи в нём.

import asyncio
import time

def blocking_task():
    print('Task starting')
    time.sleep(2)
    print('Task done')

async def main():
    print('Main running the blocking task')
    coro = asyncio.to_thread(blocking_task)
    task = asyncio.create_task(coro)
    print('Main doing other things')
    await asyncio.sleep(1)
    await task

asyncio.run(main())

Или пример без create_task:

# создаем корутину
blocking_coro = asyncio.to_thread(blocking, arg1, arg2)
# ожидаем корутину
result = await blocking_coro

run_in_executor()

Наиболее гибкий метод, позволяет самому выбирать экзекутор, который будет запускать задачи. Например, для CPU-bound задач можно воспользоваться ProcessPoolExecutor:

from concurrent.futures import ProcessPoolExecutor
import asyncio
import math

def blocking_task():
    print("Включаем считалочку")
    data = [math.sqrt(i) for i in range(50000000)]
    print("Считалочка посчитала")

async def main():
    print('Запускаем задачу')
    loop = asyncio.get_running_loop()
    exe = ProcessPoolExecutor(4)
    awaitable = loop.run_in_executor(exe, blocking_task)
    print('Работаем')
    await asyncio.sleep(1)
    await awaitable
    # закрываем экзекутор
    exe.shutdown()

if __name__ == '__main__':
    asyncio.run(main())

Fire-and-forget

Рассмотрим данный код:

async def process(url)
    ...

async def executor():
    for url in urls:
        asyncio.create_task(process(url))

Стратегия запуска задачи, описаная выше называется "fire-and-forget" - запустил и забыл. Сама стратегия не плохая, но:

  1. Здесь отсутствует какая либо обработка ошибок
  2. Количество спавнящихся задач не контроллируется, что может "перегрузить" приложение лишними задачами
  3. Нет возможности корректно завершить задачи, если приложения захочет завершится, что может нарушить консистентность компонентов
  4. Является причиной трудноотлавливаемого бага, суть которого в том, что задача не будет выполняться до конца, так как не будет ссылки на результат, из-за чего она будет прибита сбощиком мусора. Евентлуп сохраняет только слабые ссылки на задачи. Подробнее в блоке "Important" здесь.

Для решения этих проблем можно попробовать использовать aiojobs, task groups.

IO в конструкторе

Этот класс проблем относится вообще не только к асинхронному коду. Например, у нас есть такой код

class APIClient:

    def __init__(self):
        self.client = some_asyncio_func()

В конструкторе невозможно использовать await, это можно обойти двумя способами:

Factory method

В классе необходимо сделать отдельный асинхронный classmethod, который будет создавать объект нашего класса.

class APIClient:

    def __init__(self):
        self.client = some_asyncio_func()

    @classmethod
    async def create(cls):
        self = cls()
        self.client = await some_asyncio_func()
        return self

api_client = await APIClient.create()

DI

Ещё лучше можно сделать при помощи Dependency injection, вынеся наш клиент в конструктор:

class APIClient:

    def __init__(self, client: 'SomeClient'):
        self.client = client

client = await some_asyncio_func()
api_client = APIClient(client)

Очистка ресурсов

Допустим, у нас есть следующий код:

async def fetch(db):
    cursor = await db.execute("select * from table")
    results = []
    async for item in cursor:
        results.append(item)
    return results 

Вопрос - что случится с cursor?

По идее курсор должен должен закрыться с помощью GC через деструктор (__del__), но он синхронный. А что, если объект курсора - это корутина?

Здесь на помощь должны прийти финалайзеры. Как правило у таких объектов появляется либо метод вида async def close(self) либо используется асинхронный контекстный менеджер. В случае, если объект не закрывается, разработчики библиотек оставляют варнинги. Вот пример из ClientSession для aiohttp:

def __del__(self, _warnings: Any = warnings) -> None:
    try:
        # Видим НЕ ЗАКРЫТУЮ сессию, но ничего с ней сделать не можем
        if not self.closed:
            _warnings.warn(
                f"Unclosed client session {self!r}",
                ResourceWarning,
                source=self,
            )
            context = {"client_session": self, "message": "Unclosed client session"}
            if self._source_traceback is not None:
                context["source_traceback"] = self._source_traceback
            self._loop.call_exception_handler(context)
    except AttributeError:
        # loop was not initialized yet,
        # either self._connector or self._loop doesn't exist
        pass

Shielded execution

Есть следующий код на aiohttp:

async def handler(request):
    await request.config[db].execute("update...")
    return web.Response(text="updated")

Если соединение отвалится то обработчик упадет с ошибкой, так как некуда отправлять ответ. По хорошему задача должна отмениться, но что если мы хотим, чтобы она выполнилась наверняка?

Поможет нам в этом asycio.shield(). Он защищает задачу от отмены, даже в случае возникновения ошибки. Выглядит это следующим образом:

async def handler(request):
    await asyncio.shield(request.config[db].execute("update..."))
    return web.Response(text="updated")

Тестирование и event loop

При тестировании важно (если у вас не какой-то специальный кейс) для каждого отдельного теста создавать отдельный event loop. Причина проста - задачи, который не выполнились в прошлом тесте могут перетечь во второй и сломать его.

К счастью для нас, тот же pytest-asyncio создает новый эвент луп на каждый тест за нас.

Ссылки