Слышали об асинхронном программировании в Python? Интересно познакомиться с его особенностями и практическими областями применения? Быть может, вам даже пришлось столкнуться с определенными проблемами во время написания многопоточных программ. В любом случае, если вы хотите получше познакомиться с темой, это правильное место.
Содержание статьи
- Особенности асинхронного программирования в Python
- Создания синхронного веб-сервера
- Иной подход к программированию в Python
- Программирование родительского элемента: не так уж просто!
- Использование асинхронных особенностей Python на практике
- Синхронное программирование Python
- Совместный параллелизм с блокирующими вызовами
- Кооперативный параллелизм с неблокирующими вызовами Python
- Синхронные (блокирующие) HTTP вызовы
- Асинхронные (неблокирующие) HTTP вызовы Python
Основные пункты данной статьи:
- Что такое синхронное программирование;
- Что такое асинхронное программирование;
- Когда требуется написание асинхронных программ;
- Как использовать асинхронные особенности Python.
Особенности асинхронного программирования в Python
Синхронная программа выполняется поэтапно. Даже при наличии условных операторов, циклов и вызовов функций, код можно рассматривать как процесс, где за раз выполняется один шаг. По завершении одного шага программа переходит к другому.
Вот два примера программ, которые работают синхронно:
- Программы для пакетной обработки обычно создаются синхронно. Вы получаете некие входные данные, обрабатываете их и создаете определенный вывод. Шаг следует за шагом до тех пор, пока программа не достигнет желаемого вывода. При написании кода важно только следить за этапами и их правильном порядком;
- Программы для командной строки является небольшими, быстрыми процессами, которые запускаются в терминале. Данные скрипты используются для создания или трансформирования чего-то, генерации отчета или создания списка данных. Все это может быть создано через серию шагов, которые выполняются последовательно до завершения окончания программы.
Асинхронная программа действует иначе. Код по-прежнему будет выполняться шаг за шагом.
Основная разница в том, что системе не обязательно ждать завершения одного этапа перед переходом к следующему.
Это значит, что программа перейдет к выполнению следующего этапа, когда предыдущий еще не завершен и все еще выполняется где-то параллельно. Это также значит, что программе известно, что нужно делать после окончания предыдущего этапа.
Зачем же писать код подобным образом? Далее будет дан подробный ответ на данный вопрос, а также предоставлены инструменты для элегантного решения интересных асинхронных задач.
Создания синхронного веб-сервера
Процесс создания веб-сервера в общем и целом схож с пакетной обработкой. Сервер получает определенные входные данные, обрабатывает их и создает вывод. Написанная таким образом синхронная программа создает рабочий веб-сервер.
Однако такой веб-сервер был бы просто ужасным.
Почему? В данном случае каждая единица работы (ввод, обработка, вывод) не является единственной целью. Настоящая цель заключается в быстром выполнении сотен или даже тысяч единиц работы. Это может продолжаться на протяжении длительного времени, и несколько рабочих единиц могут поступить одновременно.
Можно ли сделать синхронный веб-сервер лучше? Конечно можно попробовать оптимизировать этапы выполнения для наиболее быстрой работы. К сожалению, у этого подхода есть ограничения. Результатом может быть веб-сервер, который отвечает медленно, не справляется с работой или копит невыполненные задачи даже по завершении срока.
На заметку: Есть и другие ограничения, с которыми можно столкнуться при попытке оптимизировать указанный выше подход. В их число входит скорость сети, скорость I/O (ввод-вывода) файла, скорость запроса базы данных (MySQL, SQLite) и скорость других подсоединенных устройств. Общая особенность в том, что везде есть функции ввода-вывода. Все эти элементы работают на порядок медленнее, чем скорость обработки CPU.
В синхронной программе, если шаг выполнения запускает запрос к базе данных, тогда CPU практически не используется, пока не будет возвращен запрос к базе данных. Для пакетно-ориентированных программ большую часть времени это не является приоритетом. Обработка результатов этой операции ввода-вывода является целью. Часто это может занять больше времени, чем сама операция ввода-вывода. Любые усилия по оптимизации будут сосредоточены на обработке, а не на вводе-выводе.
Техники асинхронного программирования позволяют программам использовать преимущества относительно медленных процессов ввода-вывода, освобождая CPU для выполнения другой работы.
Иной подход к программированию в Python
В начале изучения асинхронного программирования вы можете столкнуться с многочисленными дискуссиями относительно важности блокирования и написания неблокирующего кода. У меня, например, было много сложностей при разборе данных концепций, как во время разбора документации, так и при обсуждении темы с другими программистами.
Что такое неблокирующий код? Возникает встречный вопрос — что такое блокирующий код? Помогут ли ответы на данные вопросы при создании лучшего веб-сервера? Если да, как это сделать? Будем выяснять!
Написание асинхронных программ требует несколько иного подхода к программированию. Новый взгляд на устоявшуюся в сознании тему может быть непривычным, но это интересное упражнение. Все оттого, что реальный мир сам по себе по большей части асинхронный, как и то, как мы с ним взаимодействуем.
Представьте следующее: вы родитель, что пытается совмещать сразу несколько задач. Вам нужно заняться подсчетом коммунальных услуг, стиркой и присмотреть за детьми. Вы делаете эти вещи параллельно, особенно не задумываясь о том, как именно. Давайте разберем все по полочкам:
- Подсчет коммунальных услуг является синхронной задачей. Шаг за шагом, пока все не оплачено. За данный процесс вы отвечаете полностью сами;
- Тем не менее, вы можете отвлечься от подсчетов и заняться стиркой. Можно высушить постиранное белье и загрузить в стиральную машинку новую партию;
- Работа со стиральной машинкой и сушкой является синхронной задачей, и основная часть работы приходится на то, что происходит после загрузки одежды. Машинка стирает сама, поэтому вы можете вернуться к подсчету коммунальных услуг. К данному моменту сушка и стирка стали асинхронными задачами. Сушилка и стиральная машинка теперь будут работать независимо от вас и друг от друга до тех пор, пока звуковой сигнал не сообщит о завершении процесса;
- Присмотр за детьми является другой асинхронной задачей. По большей части они могут играть самостоятельно. Возможно, кто-то захочет перекусить, или кому-то понадобится помощь, тогда вам нужно будет как-то отреагировать. Особенно это важно в случае, если ребенок поранится или заплачет. Дети являются долгоиграющей задачей с высшим приоритетом. Присмотр за ними намного важнее стирки и подсчета коммунальных платежей.
Данные примеры могут помочь представить концепты блокирующего и неблокирующего кода. Рассмотрим их, заменив примеры на термины программирования. В роли центрального процессора CPU будете выступать вы сами. Во время погружения одежды в стиральную машинку вы (CPU) заняты и заблокированы от других задач, к примеру, подсчета коммунальных услуг. Но ничего страшного, ведь самой стиркой вам заниматься не нужно.
С другой стороны, работающая стиральная машинка не блокируют вас от занятия другими задачами. Это асинхронная функция, так как вам не нужно ждать ее завершения. После запуска машинки вы можете заняться чем-то другим. Это называется переключением контекста, или context switch. Контекст того, что вы делаете изменился, но через некоторое время звуковой сигнал сообщит о завершении стирки.
Будучи людьми, в большинстве случаев мы так и действуем. Нам естественно постоянно переключаться от дела к делу, даже не задумываясь об этом. Разработчику важно суметь перевести поведение подобного рода на язык кода, который бы работал аналогичным образом.
Программирование родительского элемента: не так уж просто!
Если вы узнали себя (или своих родителей) в вышеуказанном примере, отлично! Вам будет проще разобраться в асинхронном программировании. Напомним, что вы можете переключать контекст, легко менять, выбирать новые задачи и завершать старые. Теперь попробуем воплотить данную манеру поведения в коде по отношению к виртуальным родителям.
Мысленный эксперимент #1: Синхронный родитель
Каким образом вы бы создали родительскую программу, что выполняла бы все вышеперечисленные задачи в синхронной манере? Так как присмотр за детьми является приоритетной задачей, возможно, ваша программа только этим и будет заниматься. Родитель будет присматривать за детьми, ожидая чего-то, что может потребовать его внимания. Однако ничего другого (вроде подсчета коммунальных услуг или стирки) на протяжении данного сценария сделано не будет.
Теперь вы можете назначать приоритеты задачам так, как вам хочется. Однако только одна задача может произойти в любой момент времени. Это результат синхронного, пошагового подхода. Как и синхронный веб-сервер, описанный выше, это может сработать, однако многим такая жизнь может показаться не очень удобной. Родитель не сможет ничем заняться, пока дети не уснут. Все другие задачи будут выполняться позже, до поздней ночи. От такой жизни многие с ума сойдут уже через несколько дней.
Мысленный эксперимент #2: Родитель опросник
При использовании опросника, или polling, можно изменить вещи подобным образом, чтобы многочисленные задачи были завершены. В данном подходе родитель периодически отрывается от текущей задачи и проверяет, не требуют ли другие задачи внимания.
Давайте сделаем интервал опросника примерно в пятнадцать минут. Теперь каждые пятнадцать минут родитель проверяет, не нужно ли заняться стиральной машиной, высушенной одеждой или детьми. Если нет, то родитель может вернуться к работе с подсчетом коммунальных услуг. Однако, если какое-либо из этих заданий требует внимания, родитель позаботится об этом, прежде чем вернуться к подсчетам. Этот цикл продолжается до следующего тайм-аута из цикла опросника.
Этот подход также работает, ведь внимание уделяется множеству задач. Однако у него есть несколько проблем:
- Родитель может потратить много времени, проверяя те вещи, на которых не нужно акцентировать внимания: Стиральная машинка еще не закончила работа, другая одежда все еще сушится, а детям потребуется уделить внимание только в том случае, если произойдет что-то непредвиденное;
- Родитель может пропустить завершение задач, которые требуют определенного внимания. К примеру, если стирка завершилась в начале интервала опросника, на это никто не будет обращать внимания целые пятнадцать минут! Кроме того, присмотр за детьми должен иметь наивысший приоритет. Столкнувшись с проблемой, ребенок вряд ли станет ждать родителей пятнадцать минут, ему потребуется внимание сразу же.
Можно решить эти проблемы, сократив интервал опросника, но теперь родитель (CPU) будет тратить больше времени на переключение контекста между задачами. Это происходит, когда вы начинаете достигать точки убывающей отдачи. Опять же, немногие смогут нормально так жить.
Мысленный эксперимент #3: Родитель потока
«Вот бы у меня был клон…» Если вы родитель, тогда мысли подобного рода у вас наверняка периодически возникают. Во время программирования виртуальных родителей это действительно можно сделать, используя потоки. Данный механизм позволяет одновременно запускать несколько секций программы. Каждая секция кода, запущенная независимо, называется потоком, и все потоки разделяют одно и то же пространство памяти.
Если вы рассматриваете каждую задачу как часть одной программы, можете разделить их и запустить в виде потоков. Другими словами, можно «клонировать» родителя, создав по одному экземпляру для каждой задачи: присмотр за детьми, работой стиральной машинки, сушилки и подсчет коммунальных услуг. Все эти «клоны» работают независимо.
Это звучит как довольно хорошее решение, но у него есть и некоторые сложности. Одной из них является тот факт, что вам придется указывать каждому родительскому экземпляру, что именно делать в программе. Это может привести к некоторым проблемам, поскольку все экземпляры программы используют одни и те же элементы.
К примеру, скажем, Родитель А следит за сушилкой. Увидев, что вещи высушились, Родитель А уберет их и развесит новые. В то же время Родитель В замечает, что стиральная машинка завершила работу, поэтому он начинает вытаскивать одежду. Однако Родителю В также нужно заняться сушилкой, чтобы развесить постиранное белье. Сейчас это невозможно, так как в данный момент сушилкой занимается Родитель А.
Через некоторое время Родитель А заканчивает собирать одежду. Теперь ему хочется заняться стиральной машинкой и переместить вещи на пустую сушилку. Это также невозможно, ведь у стиральной машинки сейчас Родитель В.
Сейчас эти два родителя находятся в состоянии взаимной блокировки, или deadlock. Они оба имеют контроль над своим собственным ресурсом, но также хотят контролировать другой ресурс. Им придется ждать вечно, пока другой родительский экземпляр не освободит контроль. Как программист, вы должны написать код, чтобы разрешить такую ситуацию.
На заметку: Многопоточные программы позволяют создавать несколько параллельных путей выполнения, которые совместно используют одно и то же пространство памяти. Это может быть как преимуществом, так и недостатком. Любая память, совместно используемая потоками, подчиняется одному или нескольким потокам, пытающимся одновременно использовать одну и ту же общую память. Это может привести к повреждению данных, чтению данных в поврежденном состоянии и просто к беспорядочным данным в целом.
В многопоточном программировании переключение контекста происходит под управлением системы, а не программиста. Система контролирует, когда переключать контексты и когда предоставлять потокам доступ к общим данным, тем самым изменяя контекст использования памяти. Все виды проблем подобного рода управляемы в многопоточном коде, однако их трудно разрешить и отладить без ошибок.
Вот еще одна проблема, которая может возникнуть из-за многопоточности. Предположим, что ребенок получил травму и нуждается в неотложной помощи. Родителю «С» было поручено присматривать за детьми, поэтому он сразу же забирает ребенка. При оказании неотложной помощи Родителю «C» необходимо выписать достаточно большой чек, чтобы покрыть расходы на посещение врача.
Тем временем Родитель «D» дома работает над подсчетом коммунальных платеже, следовательно, сейчас он отвечает за финансы. Он не знает о дополнительных расходах на врача, поэтому очень удивлен, что на оплату счетов средств не хватает.
Помните, что эти два родительских экземпляра работают в одной программе. Семейные финансы являются общим ресурсом, поэтому вам нужно найти способ, чтобы родитель, наблюдающий за ребенком, проинформировал родителя, который занимает подсчетом средств. В противном случае вам потребуется предоставить какой-то механизм блокировки, чтобы финансовый ресурс мог использовать только один родитель за раз, с обновлениями.
Использование асинхронных особенностей Python на практике
Попробуем воспользоваться некоторыми вышеуказанным подходами и превратим их в функционирующие программы Python.
Все примеры статьи были протестированы на Python 3.8. В файле requirements.txt
указано, какие модули вам нужно установить, чтобы запустить все примеры.
1 2 3 4 5 6 7 8 9 10 11 |
aiohttp==3.6.2 async-timeout==3.0.1 attrs==19.3.0 certifi==2019.11.28 chardet==3.0.4 codetiming==1.1.0 idna==2.8 multidict==4.7.4 requests==2.22.0 urllib3==1.25.7 yarl==1.4.2 |
Сохраните как requirements.txt
и выполните команду в терминале:
1 |
pip3 install -r requirements.txt |
Вам также потребуется установить виртуальную среду Python для запуска кода, чтобы не мешать системному Python.
Синхронное программирование Python
Первый пример представляет собой несколько ответвленный способ создания задачи для извлечения работы из очереди и последующей ее обработки. Очередь в Python является структурой данных FIFO (first in, first out — «первым пришел — первым ушел»). Она предоставляет методы для размещения элементов в очередь и их повторного вывода в том порядке, в котором они были поставлены.
В данном случае работа состоит в том, чтобы получить номер из очереди и рассчитать количество циклов до этого числа. Оно выводится на консоль, когда начинается цикл, и снова при общем выводе. Программа демонстрирует способ, при котором несколько синхронных задач обрабатывают работу в очереди.
Программа, названная example_1.py
, полностью представлена ниже:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
import queue def task(name, work_queue): if work_queue.empty(): print(f"Task {name} nothing to do") else: while not work_queue.empty(): count = work_queue.get() total = 0 print(f"Task {name} running") for x in range(count): total += 1 print(f"Task {name} total: {total}") def main(): """ Это основная точка входа в программу """ # Создание очереди работы work_queue = queue.Queue() # Помещение работы в очередь for work in [15, 10, 5, 2]: work_queue.put(work) # Создание нескольких синхронных задач tasks = [(task, "One", work_queue), (task, "Two", work_queue)] # Запуск задач for t, n, q in tasks: t(n, q) if __name__ == "__main__": main() |
Рассмотрим важные строки программы:
- Строка 1 импортирует модуль
queue
. Здесь программа хранит работу, которая должна быть выполнена задачами; - Строки с 3 по 13 определяют
task()
. Данная функция извлекает работу из очередиwork_queue
и обрабатывает ее до тех пор, пока больше не нужно ничего делать; - Строка 15 определяет функцию
main()
для запуска задач программы; - Строка 20 создает
work_queue
. Все задачи используют этот общий ресурс для извлечения работы; - Строки с 23 по 24 помещают работу в
work_queue
. В данном случае это просто случайное количество значений для задач, которые нужно обработать; - Строка 27 создает список кортежей задач со значениями параметров, передаваемых задачами;
- Строки с 30 по 31 перебирают список кортежей задач, вызывая каждый из них и передавая ранее определенные значения параметров;
- Строка 34 вызывает
main()
для запуска программы.
Задача в данной программе является просто функцией, что принимает строку и очередь в качестве параметров. При выполнении она ищет что-либо в очереди для обработки. Если есть над чем поработать, из очереди извлекаются значения, запускается цикл for для подсчета до этого значения и выводится итоговое значение в конце. Получение работы из очереди продолжается до тех пор, пока на не закончится.
Есть вопросы по Python?
На нашем форуме вы можете задать любой вопрос и получить ответ от всего нашего сообщества!
Паблик VK
Одно из самых больших сообществ по Python в социальной сети ВК. Видео уроки и книги для вас!
При запуске данной программы будет получен следующий вывод:
1 2 3 4 5 6 7 8 9 |
Task One running Task One total: 15 Task One running Task One total: 10 Task One running Task One total: 5 Task One running Task One total: 2 Task Two nothing to do |
Здесь показано, что всю работу выполняет Task One
. Цикл while, в котором задействован Task One
внутри task()
, потребляет всю работу в очереди и обрабатывает ее. Когда этот цикл завершается, Task Two
получает шанс на выполнение. Однако он обнаруживает, что очередь пуста, поэтому Task Two
выводит оператор, который говорит, что ему нечего делать, и затем завершается. В коде нет ничего, что позволяло бы Task One
и Task Two
переключать контексты и работать вместе.
Простой кооперативный параллелизм в Python
Следующая версия программы позволяет двум задачам работать вместе. Добавление оператора yield
означает, что цикл получит контроль в указанной точке, сохраняя при этом свой контекст. Таким образом, уступающая задача может быть возобновлена позже.
Оператор yield
превращает task()
в генератор. Функция генератора вызывается так же, как и любая другая функция в Python, но когда выполняется оператор yield
, управление возвращается вызывающей функции. По сути, это переключение контекста, поскольку управление переходит от функции генератора к вызывающей стороне.
Интересная часть заключается в том, что функции генератора можно вернуть контроль, вызвав в генераторе next()
. Получается переключение контекста обратно к функции генератора, что запускает выполнение со всеми переменными функции, которые были определены до того, как выход все еще остается неизменным.
Цикл while
в main()
использует данное преимущество при вызове next(t)
. Данный оператор перезапускает задачу с того места, где оно было ранее выполнено. Это значит, что у вас есть контроль во время переключения контекста: когда оператор yield
выполняется в task()
.
Это форма совместной многозадачности. У программы контроль над своим текущим контекстом, и теперь можно запустить что-то еще. В таком случае цикл while
в main()
способен запускать два экземпляра task()
в качестве функции генератора. Каждый экземпляр потребляет работу из одной и той же очереди. Это довольно умно, но для достижения тех же результатов, что и в первой программе, требуется потрудиться.
Программа example_2.py
демонстрирует простой параллелизм и приведена ниже:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
import queue def task(name, queue): while not queue.empty(): count = queue.get() total = 0 print(f"Task {name} running") for x in range(count): total += 1 yield print(f"Task {name} total: {total}") def main(): """ Это основная точка входа в программу """ # Создание очереди работы work_queue = queue.Queue() # Размещение работы в очереди for work in [15, 10, 5, 2]: work_queue.put(work) # Создание задач tasks = [task("One", work_queue), task("Two", work_queue)] # Запуск задач done = False while not done: for t in tasks: try: next(t) except StopIteration: tasks.remove(t) if len(tasks) == 0: done = True if __name__ == "__main__": main() |
Рассмотрим, что именно происходит в коде выше:
- Строки с 3 по 11 определяют
task()
, как и раньше. Кроме того, в Строке 10 добавляетсяyield
, превращая функцию в генератор. В этом случае происходит переключение контекста и управление возвращается обратно в циклwhile
вmain()
; - Строка 25 создает список задач, но немного иначе, чем вы видели в предыдущем примере кода. В этом случае каждая задача вызывается с параметрами, указанными в переменной списка задач. Это необходимо для запуска функции генератора
task()
в первый раз; - Строки с 31 по 36 являются модификациями цикла
while
вmain()
, которые позволяют совместно выполнятьtask()
. Управление возвращается к каждому экземпляруtask()
, позволяя циклу продолжаться и запустить другую задачу; - Строка 32 возвращает контроль к
task()
и продолжает выполнение после точки, где был вызванyield
; - Строка 36 устанавливает переменную
done
. Циклwhile
заканчивается, когда все задачи завершены и удалены изtasks
.
При запуске вышеуказанной программы будет получен следующий вывод:
1 2 3 4 5 6 7 8 |
Task One running Task Two running Task Two total: 10 Task Two running Task One total: 15 Task One running Task Two total: 5 Task One total: 2 |
Здесь видно, что Task One
и Task Two
выполняются и потребляют работу из очереди. Именно это требуется, поскольку обе задачи обрабатывают работу, и каждая отвечает за два элемента в очереди. Это интересно, но опять же, для достижения этих результатов требуется немало усилий.
Хитрость заключается в использовании оператора
yield
, который превращаетtask()
в генератор и выполняет переключение контекста. Программа использует переключатель контекста для управления цикломwhile
вmain()
, позволяя двум экземплярам задачи выполняться совместно.
Обратите внимание на то, как Task Two
выводит итоговую сумму первой. Может показаться, что задачи выполняются асинхронно. Тем не менее, это все еще синхронная программа. Она структурирована так, что две задачи могут передавать контексты вперед и обратно. Причина, по которой Task Two
выводит итоговую сумму в первую очередь, состоит в том, что она считает только до 10, а Task One
до 15. Task Two
просто достигает своей первой итоговой суммы, поэтому она выводит выходные данные на консоль раньше Task One
.
На заметку: В коде из примера выше используется модуль codetiming, что фиксирует и выводит время, нужное для выполнения фрагментов кода. Более подробно почитать о данном модуле можете в данной статье на сайте RealPython.
Этот модуль является частью Python Package Index. Он создан Geir Arne Hjelle, одним из авторов популярного сайта Real Python. Если занимаетесь написанием кода, который должен включать функции синхронизации, то обязательно стоит обратить внимание на модуль
codetiming
.Для того чтобы модуль
codetiming
был доступен, его требуется установить. Это можно сделать с помощью команды pip:pip install codetiming
Совместный параллелизм с блокирующими вызовами
Следующая версия программы такая же, как и предыдущая, за исключением добавления time.sleep(delay)
в теле вашего цикла задач. Добавляется задержка, основанная на значении, полученном из рабочей очереди, к каждой итерации цикла задачи. Задержка имитирует эффект блокирующего вызова в вашей задачи.
Блокирующий вызов является кодом, который не дает CPU делать что-либо еще в течение некоторого периода времени. В вышеупомянутых мысленных экспериментах, если родитель не мог отвлечься от подсчета коммунальных услуг до завершения задачи, такой процесс был бы блокирующим вызовом.
В данном примереtime.sleep(delay)
делает то же самое, потому что CPU не может ничего сделать, кроме ожидания истечения задержки.
Сперва установим нужные библиотеки:
1 |
pip3 install codetiming |
Код:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
import time import queue from codetiming import Timer def task(name, queue): timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}") while not queue.empty(): delay = queue.get() print(f"Task {name} running") timer.start() time.sleep(delay) timer.stop() yield def main(): """ Это основная точка входа в программу """ # Создание очереди работы work_queue = queue.Queue() # Добавление работы в очередь for work in [15, 10, 5, 2]: work_queue.put(work) tasks = [task("One", work_queue), task("Two", work_queue)] # Запуск задач done = False with Timer(text="\nTotal elapsed time: {:.1f}"): while not done: for t in tasks: try: next(t) except StopIteration: tasks.remove(t) if len(tasks) == 0: done = True if __name__ == "__main__": main() |
Изменения, что были сделаны в данном коде:
- Строка 1 импортирует модуль time, чтобы у программы был доступ к time.sleep();
- Строка 3 импортирует код Timer из модуля codetiming;
- Строка 6 создает экземпляр класса Timer, используемый для измерения времени, нужного для итерации каждой задачи цикла;
- Строка 10 запускает экземпляр timer;
- Строка 11 изменяет
task()
для включенияtime.sleep(delay)
для имитации задержки IO. Это заменяет циклfor
, что отвечал за подсчет вexample_1.py
; - Строка 12 останавливает экземпляр
timer
и выводит, истекшее с момента вызоваtimer.start()
, время; - Строка 30 создает менеджер контекста Timer, что выводит истекшее время с момента начала всего цикла.
При запуске программы будет получен следующий вывод:
1 2 3 4 5 6 7 8 9 10 |
Task One running Task One elapsed time: 15.0 Task Two running Task Two elapsed time: 10.0 Task One running Task One elapsed time: 5.0 Task Two running Task Two elapsed time: 2.0 Total elapsed time: 32.0 |
Как и ранее, Task On
и Task Two
запускаются, собирая работу из очереди обрабатывая ее. Однако даже при добавлении задержки видно, что кооперативный параллелизм ничего не привнес. Задержка останавливает обработку всей программы, а CPU просто ждет, чтобы задержка IO завершилась.
Именно под этим и подразумевается блокирующий код Python в документации по асинхронизации. Вы заметите, что время, необходимое для запуска всей программы, является просто совокупным временем всех задержек. Выполнение заданий таким способом нельзя считать успешным.
Кооперативный параллелизм с неблокирующими вызовами Python
Следующая версия программы подверглась небольшим изменениям. Здесь используются асинхронные особенности asyncio/await, появившиеся в Python 3.
Модули time
и queue
были заменены пакетом asyncio
. Программа получает доступ к асинхронной дружественной (неблокирующей) функциональности сна и очереди. Изменение task()
определяет ее как асинхронную, добавляя на строке 4 префикса async
. В Python это является показателем того, что функция будет асинхронной.
Другим большим изменением является удаление операторов time.sleep(delay)
и yield
с их последующей заменой на замена их на await asyncio.sleep(delay)
. Создается неблокирующая задержка, которая выполнит переключение контекста обратно к вызывающей main()
.
Цикла while
внутри main()
больше не существует. Вместо task_array
есть вызов await asyncio.gather(...)
. Это сообщает asyncio
о двух вещах:
- Создание двух задач на основе
task()
и их запуск; - Ожидание завершения обеих задач до перехода к дальнейшим действиям.
Последней строкой программы является asyncio.run(main())
. Здесь создается цикл событий. Данный цикл запускает main()
, что в свою очередь запускает два экземпляра task()
.
Цикл событий лежит в основе асинхронной системы Python. Он запускает весь код, включая main()
. Когда код задачи выполняется, CPU занят выполнением работы. С приближением ключевого слова await
происходит переключение контекста, и контроль возвращается обратно в цикл событий. Цикл событий просматривает все задачи, ожидающие события (в данном случае asyncio.sleep(delay
, и передает управление задаче с событием, которое готово.
await asyncio.sleep(delay)
является неблокирующим по отношению к CPU. Вместо ожидания истечения времени ожидания, CPU регистрирует событие сна в очереди задач цикла событий и выполняет переключение контекста, передавая контроль циклу событий. Цикл событий непрерывно ищет завершенные события и передает контроль задаче, ожидающей этого события. Таким образом CPU может оставаться занятым, если работа доступна, а цикл обработки событий отслеживает события, которые произойдут в будущем.
На заметку: Асинхронная программа запускается в одном потоке выполнения. Переключение контекста с одного раздела кода на другой, который может повлиять на данные, полностью под вашим контролем. Это значит, что вы можете разбить и завершить весь доступ к данным совместно используемой памяти, прежде чем переключать контекст. Это помогает решить проблему общей памяти, что присуща многопоточному коду.
Код example_4.py
приведен ниже:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
import asyncio from codetiming import Timer async def task(name, work_queue): timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}") while not work_queue.empty(): delay = await work_queue.get() print(f"Task {name} running") timer.start() await asyncio.sleep(delay) timer.stop() async def main(): """ Это главная точка входа для главной программы """ # Создание очереди работы work_queue = asyncio.Queue() # Помещение работы в очередь for work in [15, 10, 5, 2]: await work_queue.put(work) # Запуск задач with Timer(text="\nTotal elapsed time: {:.1f}"): await asyncio.gather( asyncio.create_task(task("One", work_queue)), asyncio.create_task(task("Two", work_queue)), ) if __name__ == "__main__": asyncio.run(main()) |
Вот отличия данной программы от example_3.py
:
- Строка 1 импортирует
asyncio
для получения доступа к асинхронной функциональности Python. Это замена импортаtime
; - Строка 2 импортирует класс
Timer
из модуляcodetiming
; - Строка 4 добавляет ключевое слово
async
перед определениемtask()
. Это сообщает программе, чтоtask
может выполняться асинхронно; - Строка 5 создается экземпляр
Timer
, используемый для измерения времени, необходимого для каждой итерации цикла задач; - Строка 9 запускает экземпляр
timer
; - Строка 10 заменяет
time.sleep(delay)
неблокирующимasyncio.sleep(delay)
, что также возвращает контроль (или переключает контексты) обратно в цикл основного события; - Строка 11 останавливается экземпляр
timer
и выводится истекшее время с момента вызоваtimer.start()
; - Строка 18 создает неблокирующую асинхронную
work_queue
; - Строки 21-22 помещают работу в
work_queue
асинхронно с использованием ключевого словаawait
; - Строка 25 создается менеджер контекста
Timer
, который выводит истекшее время, затраченное на выполнение циклаwhile
; - Строки 26-29 создают две задачи и собирают их вместе, поэтому программа будет ожидать завершения обеих задач;
- Строка 32 запускает программу асинхронно. Здесь также запускается внутренний цикл событий.
При анализе вывода программы обратите внимание на одновременный запуск Task One
и Task Two
, а затем ложный вызов IO:
1 2 3 4 5 6 7 8 9 10 |
Task One running Task Two running Task Two total elapsed time: 10.0 Task Two running Task One total elapsed time: 15.0 Task One running Task Two total elapsed time: 5.0 Task One total elapsed time: 2.0 Total elapsed time: 17.0 |
Это указывает на то, что await asyncio.sleep(delay)
неблокирующая, и другая работа завершена.
В конце программы можно заметить, что общее время по сути в два раза меньше, чем при запуске example_3.py
. Это преимущество программы, что использует асинхронные особенности. Каждая задача может одновременно запускать await asyncio.sleep(delay)
. Общее время выполнения программы теперь меньше, чем общее время частей. Нам удалось избавиться от синхронной модели.
Синхронные (блокирующие) HTTP вызовы
Следующая версия программы является как шагом вперед, так и отступлением назад. Программа выполняет некоторую работу с реальным I/O, отправляя HTTP запросы из списка с URL и получая содержимое страницы. Однако это происходит блокирующим (синхронным) образом.
Программа была изменена для импорта отличного модуля requests, чтобы сделать фактические HTTP-запросы. Кроме того, очередь теперь содержит список URL, а не номеров. Кроме того, task()
больше не увеличивает счетчик. Вместо этого запросы получают содержимое URL из очереди и выводят потраченное на данное действие время.
Код example_5.py
приведен ниже:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
import queue import requests from codetiming import Timer def task(name, work_queue): timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}") with requests.Session() as session: while not work_queue.empty(): url = work_queue.get() print(f"Task {name} getting URL: {url}") timer.start() session.get(url) timer.stop() yield def main(): """ Это основная точка входа в программу """ # Создание очереди работы work_queue = queue.Queue() # Помещение работы в очередь for url in [ "http://google.com", "http://yahoo.com", "http://linkedin.com", "http://apple.com", "http://microsoft.com", "http://facebook.com", "http://twitter.com", ]: work_queue.put(url) tasks = [task("One", work_queue), task("Two", work_queue)] # Запуск задачи done = False with Timer(text="\nTotal elapsed time: {:.1f}"): while not done: for t in tasks: try: next(t) except StopIteration: tasks.remove(t) if len(tasks) == 0: done = True if __name__ == "__main__": main() |
Вот что происходит в данной программе:
- Строка 2 импортирует
requests
, что предоставляет удобный способ совершать HTTP вызовы. - Строка 3 импортирует класс
Timer
из модуляcodetiming
. - Строка 6 создается экземпляр
Timer
, используемый для измерения времени, необходимого для каждой итерации цикла задач. - Строка 11 запускает экземпляр
timer
- Строка 12 создает задержку, похожую на то, что в
example_3.py
. Однако на этот раз вызываетсяsession.get(url)
, который возвращает содержимое URL, полученного изwork_queue
. - Строка 13 останавливает экземпляр
timer
и выводит истекшее время с момента вызоваtimer.start()
. - Строки с 23 по 32 помещают список URL в
work_queue
. - Строка 39 создается менеджер контекста
Timer
, который выводит истекшее время, затраченное на выполнение всего циклаwhile
.
При запуске этой программы вы увидите следующий вывод:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
Task One getting URL: http://google.com Task One total elapsed time: 0.3 Task Two getting URL: http://yahoo.com Task Two total elapsed time: 0.8 Task One getting URL: http://linkedin.com Task One total elapsed time: 0.4 Task Two getting URL: http://apple.com Task Two total elapsed time: 0.3 Task One getting URL: http://microsoft.com Task One total elapsed time: 0.5 Task Two getting URL: http://facebook.com Task Two total elapsed time: 0.5 Task One getting URL: http://twitter.com Task One total elapsed time: 0.4 Total elapsed time: 3.2 |
Как и в более ранних версиях программы, yield
превращает task()
в генератор. Он также выполняет переключение контекста, позволяющее запустить другой экземпляр задачи.
Каждая задача получает URL из рабочей очереди, извлекает содержимое страницы и сообщает, сколько времени потребовалось для получения этого содержимого.
Как и раньше, yield
позволяет обеим задачам работать совместно. Однако, поскольку эта программа работает синхронно, каждый вызов session.get()
блокирует CPU, пока не будет получена страница. Обратите внимание на общее время, необходимое для запуска всей программы в конце. Это будет иметь смысл для следующего примера.
Асинхронные (неблокирующие) HTTP вызовы Python
Эта версия программы модифицирует предыдущую версию для использования асинхронных функций Python. Здесь импортируется модуль aiohttp, который является библиотекой для асинхронного выполнения HTTP запросов с использованием asyncio
.
Задачи были изменены, чтобы удалить вызов yield
, поскольку код для выполнения HTTP GET запроса больше не блокирующий. Он также выполняет переключение контекста обратно в цикл событий.
Программа example_6.py
приведена ниже:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
import asyncio import aiohttp from codetiming import Timer async def task(name, work_queue): timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}") async with aiohttp.ClientSession() as session: while not work_queue.empty(): url = await work_queue.get() print(f"Task {name} getting URL: {url}") timer.start() async with session.get(url) as response: await response.text() timer.stop() async def main(): """ Это основная точка входа в программу """ # Создание очереди работы work_queue = asyncio.Queue() # Помещение работы в очередь for url in [ "http://google.com", "http://yahoo.com", "http://linkedin.com", "http://apple.com", "http://microsoft.com", "http://facebook.com", "http://twitter.com", ]: await work_queue.put(url) # Запуск задач with Timer(text="\nTotal elapsed time: {:.1f}"): await asyncio.gather( asyncio.create_task(task("One", work_queue)), asyncio.create_task(task("Two", work_queue)), ) if __name__ == "__main__": asyncio.run(main()) |
В данной программе происходит следующее:
- Строка 2 импортирует библиотеку
aiohttp
, которая обеспечивает асинхронный способ выполнения HTTP вызовов. - Строка 3 импортирует класс
Timer
из модуляcodetiming
. - Строка 5 помечает
task()
как асинхронную функцию. - Строка 6 создает экземпляр
Timer
, используемый для измерения времени, необходимого для каждой итерации цикла задач. - Строка 7 создается менеджер контекста сессии
aiohttp
. - Строка 8 создает менеджер контекста ответа
aiohttp
. Он также выполняет HTTP вызовGET
для URL, взятого изwork_queue
. - Строка 11 запускает экземпляр
timer
- Строка 12 использует сеанс для асинхронного получения текста из URL.
- Строка 13 останавливает экземпляр
timer
и выводит истекшее время с момента вызоваtimer.start()
. - Строка 39 создает менеджер контекста
Timer
, который выводит истекшее время, затраченное на выполнение всего циклаwhile
.
При запуске программы вы увидите следующий вывод:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
Task One getting URL: http://google.com Task Two getting URL: http://yahoo.com Task One total elapsed time: 0.3 Task One getting URL: http://linkedin.com Task One total elapsed time: 0.3 Task One getting URL: http://apple.com Task One total elapsed time: 0.3 Task One getting URL: http://microsoft.com Task Two total elapsed time: 0.9 Task Two getting URL: http://facebook.com Task Two total elapsed time: 0.4 Task Two getting URL: http://twitter.com Task One total elapsed time: 0.5 Task Two total elapsed time: 0.3 Total elapsed time: 1.7 |
Посмотрите на общее прошедшее время, а также на индивидуальное время, чтобы получить содержимое каждого URL. Вы увидите, что длительность составляет примерно половину совокупного времени всех HTTP GET запросов. Это связано с тем, что HTTP GET запросы выполняются асинхронно. Другими словами, вы эффективно используете преимущества CPU, позволяя ему делать несколько запросов одновременно.
Поскольку CPU очень быстрый, этот пример может создать столько же задач, сколько URL. В этом случае время выполнения программы будет соответствовать времени самого медленного поиска URL.
Заключение
В статье были предоставлены инструменты для начала работы с техниками асинхронного программирования. Использование асинхронных функций Python обеспечивает вас программным контролем во время переключения контекста. Теперь сложности, которые возникают в процессе многопоточного программирования, разрешить гораздо легче.
Асинхронное программирование является мощным инструментом, однако оно подойдет не для каждой программы. Например, если вы пишете программу, которая вычисляет число Пи с точностью до миллионных знаков после запятой, то асинхронный код не поможет. Такая программа связана с CPU, без большого количества I/O. Однако, если вы пытаетесь реализовать сервер или программу, которая выполняет IO (например, доступ к файлам или сети), использование асинхронных функций Python может иметь огромное преимущество.
Подведем итоги изученных тем:
- Что такое синхронное программирование
- Отличия асинхронных программ, их мощность и управляемость
- Случаи необходимости использования асинхронных программ
- Использование асинхронных особенностей Python
Теперь, получив все необходимые знания, вы сможете писать программы совершенно иного уровня!
Являюсь администратором нескольких порталов по обучению языков программирования Python, Golang и Kotlin. В составе небольшой команды единомышленников, мы занимаемся популяризацией языков программирования на русскоязычную аудиторию. Большая часть статей была адаптирована нами на русский язык и распространяется бесплатно.
E-mail: vasile.buldumac@ati.utm.md
Образование
Universitatea Tehnică a Moldovei (utm.md)
- 2014 — 2018 Технический Университет Молдовы, ИТ-Инженер. Тема дипломной работы «Автоматизация покупки и продажи криптовалюты используя технический анализ»
- 2018 — 2020 Технический Университет Молдовы, Магистр, Магистерская диссертация «Идентификация человека в киберпространстве по фотографии лица»