Механизмы синхронизации потоков в Python

автор

В данной статье мы рассмотрим, как синхронизировать доступ к общим ресурсам, а также как координировать выполнение потоков.

Синхронизация доступа к общим ресурсам

Одна важная проблема в случаях, когда используются потоки, это избегание конфликтов, когда больше одного потока нуждается в получении доступа к одной и той же переменной или ресурсу. Если вы будете действовать невнимательно, то перекрывающие доступы или модификации из нескольких потоков могут повлечь за собой множество различных проблем, и что самое страшное, эти проблемы имеют тенденцию возникать только при сильной загрузке или в ваших производственных серверах, или на более быстром железе, которое используется одним из ваших пользователей. Например, рассмотрим программу, которая выполняет определенный процесс и отслеживает, сколько раз этот процесс был выполнен:

Если вы вызовете эту функцию из более, чем одного потока, вы обнаружите, что подсчет происходит не очень аккуратно. Это работает в большинстве случаев, но иногда могут возникнуть проблемы. Причина в том, что дополнительные операции исполняются в три этапа. В первом этапе, интерпретатор извлекает текущее значение счетчика, после чего подсчитывает новое значение, и наконец, вписывает новое значение обратно в переменную.

Если другой поток получит контроль после того, как нынешний поток извлек переменную, он может извлечь переменную, увеличить её и вписать обратно, перед тем как нынешний поток сделает то же самое. И так как они оба видят то же исходное значение, только один из потоков получит к нему доступ.
Другая проблема это доступ к неполному или несогласованному состоянию, что может произойти, если один из потоков инициализирует или обновляет нетривиальную структуру данных, при этом другой поток пытается прочитать структуру, в то время, пока она обновляется.

Атомарные операции

Простейший способ синхронизации с общими переменными или другими ресурсами, это положиться на атомарные операции в интерпретаторе. Атомарная операция – это операция, которая осуществляет только один этап выполнения задачи, без каких либо шансов того, что другой поток получит контроль. В целом, такой подход работает, если общий ресурс состоит из одного экземпляра данных корневого типа, типа переменной строки, числа, словаря или списка. Вот пример нескольких безопасных операций:

  • Чтение или замена экземпляра атрибута;
  • Чтение или замена глобальной переменной;
  • Выборка элемента из списка;
  • Модификация списка на месте (другими словами, добавление объекта при помощи append);
  • Выборка объекта в словаре;
  • Модификация словаря на месте (другими словами, добавление объекта или вызов метода clear)

Обратите внимание на то, что как было сказано ранее, операции, которые считывают переменную или атрибут, модифицируют, после чего вписывают обратно, не являются защищенными от потоков. Другой поток может обновить переменную после того, как она уже была прочтена текущим потоком, а перед этим обновлена. Также обратите внимание на то, что код Python может быть выполнен, когда объекты разрушены, так что даже простая, на первый взгляд операция может спровоцировать другие потоки на действия, что может повести за собой конфликты. Если вы сомневаетесь, то используйте блокировки.

Блокировки (замки)

Блокировки – это фундаментальный механизм синхронизации, который предоставлен модулем threading Python. Замок может удерживаться одним потоком в любое время, или без потока вообще. Если поток попытается удержать один замок, который уже удерживается другим потоком, выполнение первого потока будет остановлена, пока не будет снята блокировка. Замки обычно используются для синхронизации доступа к общим ресурсам. Для каждого такого источника создается объект Lock. Когда вам нужно получить доступ к ресурсу, вызовите acquire для того, чтобы поставить блок, после чего вызовете release:

Для корректной работы важно снять блок, даже если что-то идет не так при доступе к ресурсу. Вы можете использовать try-finally для этой цели:

В Python 2.5 и в поздних версиях, вы можете также использовать оператор with. В работе с блоком, данный оператор автоматически получает доступ к замку перед входом в блок, и отпускает его после выхода из блока:

Метод acquire принимает опциональный флаг ожидания, который может быть использоваться для того, чтобы обойти блокировку, если замок удерживается той или иной частью кода. Если вы укажите False, то метод не будет блокироваться, но вернет False, если замок уже висит:

Вы можете использовать метод locked, чтобы проверить, работает ли замок. Обратите внимание на то, что вы не можете использовать этот метод, чтобы определить, блокируется вызов к acquire или нет. Какой-либо другой поток может получить доступ к замку между вызовом метода и следующим оператором.

Возможные проблемы с блокировкой

Стандартному объекту lock не важно, какой поток в данный момент держит замок. Если замок в данный момент удерживается, любой поток, который попытается получить к нему доступ будет заблокирован, даже если тот же поток в данный момент удерживает блок. Давайте взглянем на следующий пример:

Здесь мы имеем наш общий ресурс и две функции доступа, задача которых извлекать разные части ресурса. Функции доступа также используют замки для того, чтобы сторонний поток не менял ресурс, пока мы получаем к нему доступ. Теперь, если нам нужно добавить третью функцию, которая извлекает обе части, мы столкнемся с проблемой. Наивность данного подхода заключается в том, что мы просто вызываем две функции, и получаем суммарный результат:

Проблема заключается в том, что сторонний поток меняет ресурс между двумя вызовами, что приводит к противоречивым результатам. Очевидное решение данной проблемы – добавить замок в данной функции:

В любом случае, это не сработает. Индивидуальные функции доступа застрянут, так как другая функция в данный момент держит блок. Чтобы обойти это, вы можете добавить флаги к функциям доступа, которые позволят внешней функции отключить блок, но это чревато ошибками и может быстро выйти из-под контроля. К счастью, модуль threading содержит более практичный инструмент реализации блокировки – реентерабельные замки (RLock).

Реентерабельные замки (RLock)

Класс RLock – это версия замка, который выполняет функцию блокировки только в том случае, если замок удерживает другой поток. В то время как обычные замки блокируют тогда, когда тот же поток пытается получить к одному и тому же замку дважды, реентерабельный замок блокирует только в том случае, если другой поток уже держит замок. Если нынешний поток пытается получить доступ к замку, который и так удерживается, осуществление данной операции проходит в привычном порядке.

В основном это используется для вложенного доступа к общим ресурсам, как показано в предыдущем примере. Для того, чтобы наладить методы доступа в нашем примере, просто замените обычный Lock на RLock и вложенные вызовы заработают так, как надо.

При этом вы можете выбрать отдельные части, или обе за раз, не застревая и не получая взаимоисключающих данных. Обратите внимание на то, что этот замок отслеживает уровень рекурсии, так что вам все раз нужно вызвать release по одному разу для каждого вызова для получения доступа.

Семафоры

Семафор — это более продвинутый механизм блокировки. В семафоре есть внутренний счетчик, вместо флага блокировки, и блокирует он только в том случае, если большее количество потоков (чем указано) попытается удержать семафор. В зависимости от того, как инициализирован семафор, это позволяет нескольким потокам получить доступ к одной и той же части кода одновременно.

Счетчик уменьшается, когда к семафору получают доступ, и увеличивается, когда доступ к нему теряют. Если счетчик доходит до нуля, когда к семафору получен доступ, то поток блокируется. Когда счетчик увеличивается и его показатель выше нуля – один из заблокированных потоков (если таковые имеются) снова заработает. Семафоры обычно используются для ограничения доступа к ресурсу с ограниченными возможностями, такие как подключение к сети или сервер базы данных. Просто установите счетчик на максимум, и семафор позаботиться об остальном.

Если вы не укажите значение для счетчика, счетчик инициализируется как 1. Модуль threading предлагает два варианта работы семафоров. Класс Semaphore обеспечивает безлимитный семафор, который позволяет вам вызывать release любое количество раз для увеличения счетчика. Чтобы избежать простых программных ошибок, как правило, лучше использовать класс BoundedSemaphore, который воспринимает как ошибку вызов release чаще, чем вы вызывали acquire.

Синхронизация между потоками

Замки можно также использовать для синхронизации между потоками. Модуль threading содержит несколько классов, разработанных для этой цели.

Объект Event

Объект event – это простой объект синхронизации. Он представляет собой внутренний флаг, так что все потоки могут ожидать, пока флаг будет установлен, задавать, или убирать его.

Если флаг был задан, метод wait не будет делать ничего. Если флаг был убран, wait будет блокировать, пока его снова не установят. Любое количество потоков может дожидаться одного и того же объекта event.

Объект Condition

Объект condition это более продвинутая версия объекта event. Он представляет собой своего рода измененное состояние в приложении, и поток может дожидаться заданных условий, или сигнал о том, что условие было задано. Вот простой пример потребитель/производитель. Для начала, вам нужен объект condition:

Поток «Производитель» должен получить условие, прежде чем он сможет уведомить потребителя о наличии нового товара:

Потребитель должен получить условие (следовательно, и соответствующий замок), после чего может попытаться извлечь товар из ресурсов:

Метод wait освобождает замок, блокирует настоящий поток, пока другой поток вызывает notify или notifyAll на тех же условиях, после чего замок восстанавливается. Если несколько потоков ожидает, метод notify активирует один из потоков, в то время как notifyAll активирует все потоки. Обход блока в методе wait возможен при помощи передачи значения timeout, в виде числа с плавающей запятой в секундах. После этого, метод выдаст результат после указанного времени, даже если мы не вызывали notify. Если вы используете timeout, вам нужно проверить ресурс, чтобы увидеть, произошло ли что-нибудь. Обратите внимание на то, объект condition связан с замком, и этот замок должен удерживаться, пред тем как вы сможете получить доступ к объекту condition. Соответственно, этот замок должен быть освобожден, когда вы выполнили все, что связанно с доступом к condition. В производственном коде вы должны использовать try-finally или with, как показано выше. Для того, чтобы связать condition с существующим замком, передайте замок конструктору Condition. Это также полезно, если вам нужно несколько объектов condition в одном ресурсе:

Вот и все! Вопросы будут у меня их и у самого есть, но мы справимся. Надеюсь.

Вам может быть интересно

Scroll Up