Перейти к содержанию

AsyncIO

Очевидные и неочевидные моменты по работе с AsyncIO

Режимы работы

У asyncio есть 2 режима работы: product и debug. Если надо отлаживать асинхронный код, то есть 4 способа это сделать:

  • Установить переменную окружения PYTHONASYNCIODEBUG в 1
  • Использовать Python Development Mode
  • Запустить asyncio.run(debug=True)
  • Вызвать loop.set_debug()

Корутины (Coroutines)

Корутины — это функции, которые могут приостанавливать свое выполнение в определенной точке и затем продолжать его с того же места. Они отличаются от обычных функций тем, что не блокируют выполнение программы во время ожидания результата.

Для создания корутины используется слово async:

async def test():
    print("test")

Чтобы создать объект корутины, нам нужно "вызвать" её:

my_coroutine = test() # будет создан объект корутины, но сама функция не будет выполнена

Корутины используют модель кооперативной многозадачности - это когда задачи сами решают, когда им передать управление в цикл событий. Делается это при помощи await:

async def main():
    await asyncio.sleep(1) # корутина приостанавливается на 1 секунду, передавая управление в цикл событий.

А вот такой пример не передаст управление в цикл событий:

async def first():
    print(1)

async def test():
    await first()

В этой корутине выполняется только печать числа 1, и нет никаких await выражений - это означает, что она выполнится сразу, не приостанавливаясь и не передавая управления в цикл событий. Связано это с тем, что внутри нет никакого ожидания.

Задача (Task)

Задача — это обёртка вокруг корутины, которая планирует её выполнение в цикле событий и предоставляет способы управления ей.

Сами корутины не представляют способ отслеживания их состояния после вызова. Задача же позволяет отслеживать состояние корутины, получать результат, отменять выполнение и т.д.

Задачу можно создать при помощи asyncio.create_task(). Как только она будет создана - она немедленно (при ближайшем переключении контекста, например при встрече await) начинает выполняться в цикле событий:

import asyncio

async def test():
    await asyncio.sleep(1)
    print("Готово")
    return 1

async def main():
    # Создание задачи
    task = asyncio.create_task(test())
    await asyncio.sleep(10) # ждем 10 секунд на всякий случай, чтобы наша задача точно успела выполниться.

# Запуск главной корутины
asyncio.run(main())

Сначала выведется слово "Готово" из таски которую мы создали, а через 10 секунд программа завершит свою работу.

Во всяком случае, чтобы получить результат из нашей task и обработать возможные исключения, нам нужно использовать на ней await:

import asyncio

async def test():
    await asyncio.sleep(1)
    print("Готово")
    return 1

async def main():
    task = asyncio.create_task(test())
    # ожидаем окончания выполнения нашей задачи
    result = await task
    # выводим результат выполнения
    print(result)

asyncio.run(main())

Event loop

По своей сути - цикл событий, это очередь в которой хранится список задач или сообщений, а затем вход в бесконечный цикл, где они обрабатываются по мере их поступления. Создавая цикл событий, мы создаем пустую очередь задач, а затем добавляем в эту очередь задачи для выполнения.

Часть с вводом-выводом в цикле событий построена на select - он позволяет наблюдать за дескрипторами, есть ли в них данные для получения или появилась ли возможность для записи.

Работа цикла событий как раз заключается в том, чтобы обрабатывать события и запускать задачи.

В первую очередь цикл проверяет, есть ли готовые для обработки события с дескрипторами - он запускает select и ждёт. Когда в сокет попадают данные или буфер отправки опустошается, asyncio проверяет объект привязанный к дескриптору и устанавливает для него значение "готово". После этого задача выполняется - выполняются каллбеки, выполняется сама приостановленная задача.

Часто встречаемые проблемы

Запуск нескольких задач одновременно

Запустить несколько задач одновременно в asyncio можно несколькими способами.

asyncio.gather

asyncio.gather() одновременно запускает задачи, переданные в функции и ждет их выполнения. Здесь нет никакой обработки ошибок, поэтому лучше этот способ не использовать (с python 3.11)

async def test(name: str, to_sleep: int):
  await asyncio.sleep(to_sleep)
  print(f"{name} закончил выполнение")


async def main():
  futures = [
    test("A", 2),
    test("B", 3)
  ]
  await asyncio.gather(*futures)
  print("готово")

asyncio.run(main())
# А закончил выполнение
# B закончил выполнение
# готово

Task groups

TaskGroup - это асинхронный менеджер контекста, содержащий группу задач, появился в python 3.11.

Его стоит использовать, если есть необходимость в удобном и надежном способе ожидания завершения всех задач в группе и для обработки ошибок:

async def test(name: str, to_sleep: int) -> str:
  if to_sleep > 5:
    raise AttributeError(f"{name} sleep > 5")
  else:
    await asyncio.sleep(to_sleep)


async def main():
  try:
    async with asyncio.TaskGroup() as tg:
      task1 = tg.create_task(test("A", 1))
      task2 = tg.create_task(test("B", 2))
      task3 = tg.create_task(test("C", 10))
      task4 = tg.create_task(test("D", 6))
  except* AttributeError as errors:
    print([str(e) for e in errors.exceptions]) # ['C sleep > 5', 'D sleep > 5']

asyncio.run(main())

Запуск асинхронного кода в фоне

Например, нам нужно поставить асинхронную функцию на выполнение в фоне и не ожидая результата обрабатывать другие задачи:

import asyncio

async def delete_old_files():
    ## делаем чет тяжелое, например 10 секунд:
    await asyncio.sleep(10)
    ## выводим сообщение об конце задачи
    print("файлы удалены")

async def main():
    coro = delete_old_files()
    task = asyncio.create_task(coro)
    print("задача на удаление файлов запущена")
    ### даем задаче запуститься
    await asyncio.sleep(0)
    ### делаем что-то еще
    await asyncio.sleep(20)
    print("завершили работу")

asyncio.run(main())

Вывод будет следующий:

задача на удаление файлов запущена
файлы удалены
завершили работу

Запуск синхронного кода

Запустить синхронный код можно несколькими способами:

to_thread()

Подходит для запуска IO-bound задач, создает под капотом ThreadPoolExecutor и запускает задачи в нём.

import asyncio
import time

def blocking_task():
    print('Task starting')
    time.sleep(2)
    print('Task done')

async def main():
    print('Main running the blocking task')
    coro = asyncio.to_thread(blocking_task)
    task = asyncio.create_task(coro)
    print('Main doing other things')
    await asyncio.sleep(1)
    await task

asyncio.run(main())

Или пример без create_task:

# создаем корутину
blocking_coro = asyncio.to_thread(blocking, arg1, arg2)
# ожидаем корутину
result = await blocking_coro

run_in_executor()

Наиболее гибкий метод, позволяет самому выбирать экзекутор, который будет запускать задачи. Например, для CPU-bound задач можно воспользоваться ProcessPoolExecutor:

from concurrent.futures import ProcessPoolExecutor
import asyncio
import math

def blocking_task():
    print("Включаем считалочку")
    data = [math.sqrt(i) for i in range(50000000)]
    print("Считалочка посчитала")

async def main():
    print('Запускаем задачу')
    loop = asyncio.get_running_loop()
    exe = ProcessPoolExecutor(4)
    awaitable = loop.run_in_executor(exe, blocking_task)
    print('Работаем')
    await asyncio.sleep(1)
    await awaitable
    # закрываем экзекутор
    exe.shutdown()

if __name__ == '__main__':
    asyncio.run(main())

Fire-and-forget

Рассмотрим данный код:

async def process(url)
    ...

async def executor():
    for url in urls:
        asyncio.create_task(process(url))

Стратегия запуска задачи, описаная выше называется "fire-and-forget" - запустил и забыл. Сама стратегия не плохая, но:

  1. Здесь отсутствует какая либо обработка ошибок
  2. Количество спавнящихся задач не контроллируется, что может "перегрузить" приложение лишними задачами
  3. Нет возможности корректно завершить задачи, если приложения захочет завершится, что может нарушить консистентность компонентов
  4. Является причиной трудноотлавливаемого бага, суть которого в том, что задача не будет выполняться до конца, так как не будет ссылки на результат, из-за чего она будет прибита сбощиком мусора. Евентлуп сохраняет только слабые ссылки на задачи. Подробнее в блоке "Important" здесь.

Для решения этих проблем можно попробовать использовать aiojobs, task groups.

IO в конструкторе

Этот класс проблем относится вообще не только к асинхронному коду. Например, у нас есть такой код

class APIClient:

    def __init__(self):
        self.client = some_asyncio_func()

В конструкторе невозможно использовать await, это можно обойти двумя способами:

Factory method

В классе необходимо сделать отдельный асинхронный classmethod, который будет создавать объект нашего класса.

class APIClient:

    def __init__(self):
        self.client = some_asyncio_func()

    @classmethod
    async def create(cls):
        self = cls()
        self.client = await some_asyncio_func()
        return self

api_client = await APIClient.create()

DI

Ещё лучше можно сделать при помощи Dependency injection, вынеся наш клиент в конструктор:

class APIClient:

    def __init__(self, client: 'SomeClient'):
        self.client = client

client = await some_asyncio_func()
api_client = APIClient(client)

Очистка ресурсов

Допустим, у нас есть следующий код:

async def fetch(db):
    cursor = await db.execute("select * from table")
    results = []
    async for item in cursor:
        results.append(item)
    return results 

Вопрос - что случится с cursor?

По идее курсор должен должен закрыться с помощью GC через деструктор (__del__), но он синхронный. А что, если объект курсора - это корутина?

Здесь на помощь должны прийти финалайзеры. Как правило у таких объектов появляется либо метод вида async def close(self) либо используется асинхронный контекстный менеджер. В случае, если объект не закрывается, разработчики библиотек оставляют варнинги. Вот пример из ClientSession для aiohttp:

def __del__(self, _warnings: Any = warnings) -> None:
    try:
        # Видим НЕ ЗАКРЫТУЮ сессию, но ничего с ней сделать не можем
        if not self.closed:
            _warnings.warn(
                f"Unclosed client session {self!r}",
                ResourceWarning,
                source=self,
            )
            context = {"client_session": self, "message": "Unclosed client session"}
            if self._source_traceback is not None:
                context["source_traceback"] = self._source_traceback
            self._loop.call_exception_handler(context)
    except AttributeError:
        # loop was not initialized yet,
        # either self._connector or self._loop doesn't exist
        pass

Shielded execution

Есть следующий код на aiohttp:

async def handler(request):
    await request.config[db].execute("update...")
    return web.Response(text="updated")

Если соединение отвалится то обработчик упадет с ошибкой, так как некуда отправлять ответ. По хорошему задача должна отмениться, но что если мы хотим, чтобы она выполнилась наверняка?

Поможет нам в этом asycio.shield(). Он защищает задачу от отмены, даже в случае возникновения ошибки. Выглядит это следующим образом:

async def handler(request):
    await asyncio.shield(request.config[db].execute("update..."))
    return web.Response(text="updated")

Тестирование и event loop

При тестировании важно (если у вас не какой-то специальный кейс) для каждого отдельного теста создавать отдельный event loop. Причина проста - задачи, который не выполнились в прошлом тесте могут перетечь во второй и сломать его.

К счастью для нас, тот же pytest-asyncio создает новый эвент луп на каждый тест за нас.

Ссылки