Personalcam.ru

Авто Аксессуары
0 просмотров
Рейтинг статьи
1 звезда2 звезды3 звезды4 звезды5 звезд
Загрузка...

6. 3. 2 Синхронизация потоков

6.3.2 Синхронизация потоков

Pthread-ы обеспечивают синхронизацию потоков в форме взаимного исключения (мьютексов) и условных переменных.

Мьютекс представляет собой двоичный семафор, который обеспечивает монопольный доступ к структуре с общими данными. Он поддерживает две основные операции: блокировку и разблокировку. Поток должен заблокировать мьютекс перед входом в критическую секцию и разблокировать его по завершении работы. Поток блокируется, если он пытается заблокировать мьютекс, который уже заблокирован. Он пробуждается, когда мьютекс разблокируется. Операция блокировки мьютекса является атомарной. Если два потока пытаются захватить мьютекс в одно и то же время, гарантируется, что одна операция будет завершена или заблокирована перед началом другой. Также поддерживается неблокирующая версия операции блокировки, trylock (пробная блокировка). Пробная блокировка возвращает состояние успеха, если мьютекс приобретается; она возвращает состояние неудачи, если мьютекс уже заблокирован.

Общая последовательность для защиты структуры общих данных с помощью мьютекса:

работаем с общими данными

Условная переменная представляет собой механизм синхронизации, который является более полезным для ожидания событий, чем для блокировки ресурса. Условная переменная связана с предикатом (логическим выражением, которое имеет значение ИСТИНА, TRUE, или ЛОЖЬ, FALSE), основанным на некоторых общих данных. Функции отправляются спать на условной переменной и пробуждают один или все потоки, если результат предиката изменяется.

В нашем примере плеера структурой общих данных между основным потоком и потоком декодера является очередь. Основной поток читает данные из файла и помещает их в очередь. Поток декодера извлекает данные и обрабатывает их. Если очередь пуста, поток декодера спит, пока в очередь не поступят данные. Основной поток после помещения данных в очередь пробуждает поток декодера. Вся логика синхронизации осуществляется с помощью привязки к очереди условной переменной. Общие данные являются очередью и предикат представляет собой условие "очередь не пуста". Поток декодера спит на условной переменной, если предикат ЛОЖЬ (то есть очередь пуста). Он пробуждается, когда основной поток "изменяет" предикат добавлением в очередь данных.

Давайте подробно обсудим реализацию мьютекса и условной переменной для pthread-ов.

Мьютекс для pthread-ов

Мьютекс инициализируется во время определения:

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

Он также может быть проинициализирован во время выполнения вызовом функции pthread_mutex_init .

int pthread_mutex_init(pthread_mutex_t *mutex,

const pthread_mutexattr_t *mutexattr);

Первым аргументом является указатель на мьютекс, который инициализируется, а вторым аргументом — атрибуты мьютекса. Если mutexattr является NULL, устанавливаются атрибуты по умолчанию (подробнее об атрибутах мьютекса позже).

Мьютекс захватывается вызовом функции pthread_mutex_lock . Он освобождается вызовом функции pthread_mutex_unlock . pthread_mutex_lock или захватывает мьютекс, или приостанавливает выполнение вызывающего потока, пока владелец мьютекса (то есть поток, который захватил мьютекс вызовом функции pthread_mutex_lock ранее) не освободит его вызовом pthread_mutex_unlock .

int pthread_mutex_lock(pthread_mutex_t *mutex);

int pthread_mutex_unlock(pthread_mutex_t *mutex);

Общие данные могут быть защищены с помощью функций блокировки и разблокировки мьютекса следующим образом:

/* работаем с общими данными */

Есть три типа мьютекса.

▪ Мьютекс проверки ошибки

Поведения этих трёх типов мьютекса похожи; они отличаются только когда владелец мьютекса вновь вызывает pthread_mutex_lock , что снова захватить его.

▪ Для быстрого мьютекса возникает тупиковая ситуация, поскольку поток теперь ждёт самого себя, чтобы разблокировать мьютекс

▪ Для рекурсивного мьютекса функция возвращается сразу и счётчик захвата мьютекса увеличивается на единицу. Мьютекс разблокируется только если счетчик дойдёт до нуля; то есть поток должен вызвать pthread_mutex_unlock для каждого вызова pthread_mutex_lock .

▪ Для мьютекса проверки ошибки pthread_mutex_lock возвращает ошибку с кодом ошибки EDEADLK .

Быстрый, рекурсивный мьютекс и мьютекс проверки ошибки инициализируются во время определения следующим образом:

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

/* Мьютекс проверки ошибки */

Они также могут быть проинициализированы во время выполнения вызовом функции pthread_mutex_init . Напомним, что передача NULL в качестве второго аргумента pthread_mutex_init устанавливает для мьютекса атрибуты по умолчанию. По умолчанию мьютекс инициализируется как быстрый мьютекс.

Рекурсивные мьютекс инициализируется во время выполнения следующим образом:

Мьютекс проверки ошибки инициализируется во время выполнения аналогично показанному выше; только в вызове pthread_mutexattr_settype изменяется тип мьютекса на PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP .

Условная переменная pthread-ов

Как и мьютекс, условная переменная инициализируется либо во время определения, либо во время выполнения вызовом функции pthread_cond_init .

pthread_cond_t cond_var = PTHREAD_COND_INITIALIZER;

Давайте вернёмся к нашему MP3-плееру, чтобы понять различные операции с условной переменной pthread-ов. В них участвуют три объекта, показанные на Рисунке 6.6.

▪ Основной поток: он порождает поток декодера звука. Он также поставляет данные для потока декодера.

▪ Поток декодера: он декодирует и воспроизводит звук из данных, предоставленных основным потоком.

▪ Очередь: это общая структура данных между основным потоком и потоком декодера. Основной поток читает данные из файла и помещает их в очередь, а поток декодера извлекает данные и работает с ними.

Основной поток порождает поток декодера при запуске приложения.

/* Создаём поток декодера звука */

if (pthread_create(&decoder_tid, NULL, audio_decoder,

printf("Audio decoder thread creation failed.n");

Возникают три вопроса:

▪ Как поток декодера узнаёт, что в очереди есть данные? Должен ли он делать опрос? Есть ли более эффективный механизм?

Читайте так же:
Автомагнитолы синхронизация с айфона

▪ Есть ли способ для основного потока проинформировать поток декодера о наличии в очереди данных?

▪ Как можно защитить очередь от одновременного доступа со стороны основного потока и потока декодера?

Чтобы ответить на эти вопросы, давайте сначала посмотрим на детали потока декодера звука.

void* audio_decoder(void *unused)<

printf("Audio Decoder thread startedn");

/* декодируем данные в буфере */

/* посылаем декодированные данные на выход для воспроизведения */

Обратите, пожалуйста, внимание на следующий кусок кода в функции audio_decoder .

Здесь мы ввели условную переменную cond . Предикатом для этой условной переменной является "очередь не пуста". Таким образом, если предикат является ложным (то есть очередь пуста), поток засыпает на условной переменной, вызвав функцию pthread_cond_wait . Поток будет оставаться в состоянии ожидания, пока какой-нибудь другой поток не просигнализирует об изменении условия (то есть изменит предикат путём добавления в очередь данных, что сделает её не пустой). Прототип этой функции:

int pthread_cond_wait(pthread_cond_t *cond,

Вы можете увидеть в приведённой выше декларации, что с условной переменной также связан мьютекс. Мьютекс необходим для защиты предиката, когда поток проверяет её состояние. Это позволяет избежать состояния гонок, когда один поток готовится ждать на условной переменной, а другой поток просигнализировал о состоянии как раз перед тем, как первый поток на самом деле впал в состояние ожидания.

<— О состоянии сигнализирует другой поток —>

В нашем примере если бы не было мьютекса, поток декодера ждал бы на условной переменной, даже если бы в очереди были данные. Так что правило: проверка предиката и сон на условной переменной должны быть атомарной операцией. Эта атомарность достигается введением мьютекса вместе с условной переменной. Таким образом, шагами являются:

pthread_mutex_lock(&lock); <— Получаем мьютекс

while(is_empty_queue()) <— проверяем предикат

pthread_cond_wait(&cond,&lock);<— засыпаем на условной переменной

Чтобы сделать эту работу, необходимо, чтобы все участвующие потоки должны были захватывать мьютекс, изменять/проверять состояние, а затем освобождать мьютекс.

Теперь, что происходит с мьютексом, когда поток переходит в спящий режим в pthread_cond_wait ? Если мьютекс остаётся в состоянии блокировки, то никакой другой поток не сможет просигнализировать о состоянии, так как этот поток тоже попытается захватить тот же мьютекс перед изменением предиката. У нас проблема: один поток удерживает блокировки и спит на условной переменной, а другой поток ожидает блокировку для установки состояния. Для того чтобы избежать такой проблемы, соответствующий мьютекс должен быть разблокирован после засыпания потока на условной переменной. Это делается в функции pthread_cond_wait . Функция помещает поток в состояние сна и автоматически освобождает мьютекс.

Поток, спящий на условной переменной, пробуждается и функция pthread_cond_wait возвращается, когда другой поток устанавливает условие (вызовом функции pthread_cond_signal или pthread_cond_broadcast , как описано далее в этом разделе). pthread_cond_wait также перед возвратом повторно захватывает мьютекс. Поток теперь может проверить условие и освободить мьютекс.

pthread_mutex_lock(&lock); <— Получаем мьютекс

while(is_empty_queue()) <— Проверяем условие

pthread_cond_wait(&cond,&lock); <— ждём на условной переменной

<— Мьютекс повторно захватывается внутри pthread_cond_wait —>

buffer = get_queue(); <— Обрабатываем условие

pthread_mutex_unlock(&lock);<— По завершении освобождаем мьютекс

Давайте посмотрим, как поток может просигнализировать о состоянии. Шагами являются:

▪ Пробуждение одного или всех потоков, которые спали на условной переменной.

Основной поток нашего плеера пробуждает поток декодера звука после добавления в очередь данных.

char *buffer = (char *)malloc(MAX_SIZE);

fread(buffer, MAX_SIZE, 1, fp);

pthread_mutex_lock(&lock); <— Получаем мьютекс

add_queue(buffer); <— изменяем условие. Добавление

буфера в очередь делает её

pthread_mutex_unlock(&lock); <— Освобождаем мьютекс

pthread_cond_signal(&cond); <— Пробуждаем поток декодера

pthread_cond_signal пробуждает единственный поток, спящий на условной переменной. Чтобы пробудить все потоки, которые спят на условной переменной, также доступна функция pthread_cond_broadcast .

Условная переменная против семафора

Когда следует использовать семафор, а когда — условную переменную (CondVar)?

Замки используются для взаимного исключения. Если вы хотите, чтобы фрагмент кода был атомарным, поставьте вокруг него блокировку. Теоретически для этого можно использовать двоичный семафор, но это особый случай.

Семафоры и условные переменные строятся на основе взаимного исключения, обеспечиваемого блокировками, и используются для обеспечения синхронизированного доступа к общим ресурсам. Их можно использовать для аналогичных целей.

Переменная условия обычно используется, чтобы избежать ожидания занятости (многократного цикла при проверке условия) при ожидании доступности ресурса. Например, если у вас есть поток (или несколько потоков), который не может продолжать работу до тех пор, пока очередь не станет пустой, подход к занятому ожиданию будет заключаться в том, чтобы просто сделать что-то вроде:

Проблема в том, что вы тратите время процессора, заставляя этот поток повторно проверять условие. Почему бы вместо этого не иметь переменную синхронизации, которая может сигнализировать потоку о доступности ресурса?

Предположительно, у вас будет нить где-то еще, которая вытаскивает вещи из очереди. Когда очередь пуста, она может вызвать syncVar.signal() пробуждение случайного потока, который спит syncVar.wait() (или обычно есть метод signalAll() or broadcast() для пробуждения всех ожидающих потоков).

Читайте так же:
Регулировка зажигания мопеда дельта ссср

Обычно я использую такие переменные синхронизации, когда у меня есть один или несколько потоков, ожидающих одного конкретного условия (например, чтобы очередь была пустой).

Аналогичным образом можно использовать семафоры, но я думаю, что их лучше использовать, когда у вас есть общий ресурс, который может быть доступен или недоступен на основе некоторого целого числа доступных вещей. Семафоры хороши для ситуаций производитель / потребитель, когда производители распределяют ресурсы, а потребители их потребляют.

Подумайте, был ли у вас автомат по продаже газировки. Есть только один автомат с газировкой, и это общий ресурс. У вас есть один поток, который является поставщиком (производителем), который отвечает за хранение машины, и N потоков, которые являются покупателями (потребителями), которые хотят получать газированные напитки из машины. Количество содовой в машине — это целое число, которое будет управлять нашим семафором.

Каждый поток покупателя (потребителя), который приходит к автомату с газировкой, вызывает down() метод семафора, чтобы взять газировку. Это приведет к получению содовой из машины и уменьшению количества доступных газированных напитков на 1. Если есть доступные газированные напитки, код просто продолжит down() без проблем проходить мимо оператора. Если газированные напитки недоступны, поток будет спать здесь, ожидая уведомления о том, когда газированные напитки снова станут доступны (когда в машине будет больше газированных напитков).

Поток поставщика (производителя) по существу будет ждать, пока автомат с газировкой не станет пустым. Продавец получает уведомление, когда из автомата берется последняя газировка (и один или несколько потребителей потенциально ждут, чтобы достать газировку). Поставщик будет пополнять запасы газированной воды с помощью up() метода семафоров , доступное количество газированных напитков будет увеличиваться каждый раз, и, таким образом, ожидающие потоки потребителей будут уведомлены о том, что доступно больше содовой.

Эти wait() и signal() методы переменной синхронизации , как правило , должны быть скрыты внутри down() и up() операций семафора.

Конечно, эти два варианта частично совпадают. Существует множество сценариев, в которых семафор или переменная условия (или набор переменных условия) могут служить вашим целям. И семафоры, и переменные условия связаны с объектом блокировки, который они используют для поддержания взаимного исключения, но затем они предоставляют дополнительную функциональность поверх блокировки для синхронизации выполнения потока. В основном вам решать, какой из них наиболее подходит для вашей ситуации.

Это не обязательно самое техническое описание, но оно имеет смысл в моей голове.

синхронизация потоков с использованием mutex и переменной условия

Я пытаюсь реализовать многопоточную работу, производителя и потребителя, и в основном то, что я хочу сделать, это когда потребитель заканчивает данные, он уведомляет производителя, чтобы производитель предоставил новые данные.

Хитрость заключается в том, что в моем текущем impl производитель и потребитель одновременно уведомляют друг друга и ждут друг друга, я не знаю, как правильно реализовать эту часть.

Например, смотрите код ниже,

Я не уверен, что приведенный выше код с использованием mutex и condition_variable является правильным способом реализации моей идеи, пожалуйста, дайте мне несколько советов!

5 ответов

  • Переменная условия вместе с Mutex

Немного путаницы! В чем может быть проблема, если мы рассмотрим следующий сценарий: моя цель состоит в том, чтобы понять смесь переменной условия с mutex. T1 LOCK < MUTEX >ПРОВЕРЬТЕ ЗНАЧЕНИЕ ПЕРЕМЕННОЙ ЕСЛИ НЕ ЗАДАНО, ДОЖДИТЕСЬ УСЛОВНОЙ ПЕРЕМЕННОЙ UNLOCK ПЕРЕЙТИ К 1 Т2 ИЗМЕНИТЬ.

У меня есть три потока в моем приложении, первый поток должен ждать, пока будут готовы данные из двух других потоков. Эти два потока готовят данные одновременно. Для этого я использую переменную условия в C++ следующим образом: boost::mutex mut; boost::condition_variable cond; Thread1 : bool.

Код неправильно предполагает, что vector<int>::size() и vector<int>::swap() являются атомарными. Это не так.

Кроме того, ложные пробуждения должны обрабатываться циклом while (или другой перегрузкой cv::wait ).

Возможно, это близко к тому, чего вы хотите достичь. Я использовал 2 условные переменные для уведомления производителей и потребителей друг о друге и ввел переменную, обозначающую, какой поворот сейчас:

  • Что происходит с mutex, удерживаемым потоком, ожидающим переменной условия

Для программы, которую я пишу, я хочу, чтобы поток содержал два мьютекса одновременно. Затем он будет ждать переменную условия, используя один из этих мьютексов в качестве mutex, связанного с переменной условия. Что происходит с другим mutex, пока он ждет? Она все еще заперта?

У меня есть вопрос, связанный с std::condition_variable . Я много читал об этом, и все показанные примеры заблокировали mutex перед использованием std::condition_variable таким образом: std::unique_lock<std::mutex> lock(mutex); condition_variable.wait(lock); //. или вот так.

Вот фрагмент кода. Поскольку рабочие дорожки уже синхронизированы, требование двух буферов исключается. Таким образом, для моделирования сценария используется простая очередь:

Не полный ответ, хотя я думаю, что могут быть полезны две переменные условия: одна с именем buffer_empty , которую будет ждать поток-производитель, и другая с именем buffer_filled , которую будет ждать поток-потребитель. Количество мьютексов, как зацикливаться и так далее, я не могу комментировать, так как сам не уверен в деталях.

  1. Доступ к общим переменным должен осуществляться только при удержании mutex, который защищает его
  2. condition_variable::wait должен проверить условие.
    1. Условие должно быть общей переменной, защищенной mutex, которую вы передаете condition_variable::wait .
    2. Способ проверить условие состоит в том, чтобы обернуть вызов wait в while loop или использовать перегрузку с 2 аргументами wait (что эквивалентно версии while-loop)

    Примечание: Эти правила не являются строго необходимыми, если вы действительно понимаете, что делает оборудование. Однако эти проблемы быстро усложняются при использовании простых структур данных, и вам будет легче доказать, что ваш алгоритм работает правильно, если вы будете следовать им.

    Ваши Q и Q_buf являются общими переменными. В соответствии с Правилом 1 я бы предпочел, чтобы они были объявлены в качестве локальных переменных в функции , которая их использует ( consume() и produce() соответственно). Будет 1 общий буфер, который будет защищен mutex. Производитель добавит в свой локальный буфер. Когда этот буфер заполнен, он получает mutex и перемещает локальный буфер в общий буфер. Затем он ждет, пока потребитель примет этот буфер, прежде чем создавать дополнительные данные.

    Потребитель ждет, пока этот общий буфер достигнет «arrive», затем он получает mutex и заменяет свой пустой локальный буфер общим буфером. Затем он сигнализирует производителю, что буфер принят, чтобы он знал, что нужно снова начать производство.

    Семантически я не вижу причин использовать swap вместо move , так как в каждом случае один из контейнеров все равно пуст. Возможно, вы хотите использовать swap , потому что знаете что-то о лежащей в основе памяти. Вы можете использовать все, что захотите, и это будет быстро и работать одинаково (по крайней мере, алгоритмически).

    Эту проблему можно решить с помощью переменной условия 1, но, возможно, будет немного легче подумать, если вы используете 2.

    Вот что я придумал. Проверено на Visual Studio 2017 (15.6.7) и GCC 5.4.0. Мне не нужно быть зачисленным или что-то в этом роде (это такая простая вещь), но юридически я должен сказать, что я не даю никаких гарантий.

    Похожие вопросы:

    Каждая переменная условия в Java (wait() notify()) связана с одной mutex (синхронизированной). Мне сказали, что (в Java) каждый mutex также связан с одной переменной условия, и это менее эффективная.

    pthread_cond_broadcast используется для пробуждения нескольких потоков, ожидающих переменную условия. Но в то же время также говорится, что мы должны поместить mutex перед переменной условия, чтобы.

    Я пытаюсь создать четыре потока, печатающие какое-то сообщение. У меня возникли некоторые проблемы с синхронизацией. Вот как выглядит мой main() pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;.

    Немного путаницы! В чем может быть проблема, если мы рассмотрим следующий сценарий: моя цель состоит в том, чтобы понять смесь переменной условия с mutex. T1 LOCK < MUTEX >ПРОВЕРЬТЕ ЗНАЧЕНИЕ.

    У меня есть три потока в моем приложении, первый поток должен ждать, пока будут готовы данные из двух других потоков. Эти два потока готовят данные одновременно. Для этого я использую переменную.

    Для программы, которую я пишу, я хочу, чтобы поток содержал два мьютекса одновременно. Затем он будет ждать переменную условия, используя один из этих мьютексов в качестве mutex, связанного с.

    У меня есть вопрос, связанный с std::condition_variable . Я много читал об этом, и все показанные примеры заблокировали mutex перед использованием std::condition_variable таким образом.

    Я обнаружил проблему голодания mutex, и предложенный ответ заключается в использовании переменных состояния вместо этого int main () < std::mutex m; std::condition_variable cv; std::thread t.

    Когда поток ожидает переменную условия, связанный mutex (атомарно) освобождается (разблокируется). Когда эта переменная условия сигнализируется (другим потоком), один (для сигнала) или все (для.

    Я создаю программу для одного производителя и нескольких потребителей в C++. Я начинаю с вызова потоков-потребителей, а затем добавляю элементы в массив. Все работает нормально, но в конце концов.

    Python. Урок 23. Потоки и процессы в Python. Часть 2. Синхронизация потоков

    Для синхронизации доступа к ресурсам из нескольких потоков Python предоставляет набор объектов, каждый из которых обладает рядом особенностей, делающих их пригодными для решения некоторой группы специфических задач. В этом уроке будут рассмотрены: Lock– и RLock-объекты, условные переменные (Condition), семафоры (Semaphore), события (Event), таймеры (Timer) и барьеры (Barrier).

    Синхронизация потоков

    Начнем наш обзор с наиболее простого и, в то же время, общего по своим свойствам Lock -объекта.

    Lock-объект

    Lock -объект может находится в двух состояниях: захваченное (заблокированное) и не захваченное (не заблокированное, свободное). После создания он находится в свободном состоянии . Для работы с Lock -объектом используются методы acquire() и release() . Если Lock свободен, то вызов метода acquire() переводит его в заблокированное состояние. Повторный вызов acquire() приведет к блокировке инициировавшего это действие потока до тех пор, пока Lock не будет разблокирован каким-то другим потоком с помощью метода release() . Вывоз метода release() на свободном Lock -объекте приведет к выбросу исключения RuntimeError .

    Метод acquire() имеет следующую сигнатуру:

    acquire(blocking=True, timeout=-1)

    • blocking
      • Если параметр равен True , то при вызове на захваченном Lock -объекте выполнение потока остановится, после того как захват будет произведет, метод вернет True . Если параметр равен False , то при вызове на захваченном Lock -объекте поток не будет заблокирован и метод вернет False , если захват будет произведет, то вернет True .
      • Задает время, в течении которого поток будет находиться в заблокированном состоянии при попытке захватить уже занятый Lock -объект. Если в течении заданного времени поток не освободится, то метод вернет значение False .

      При успешном захвате Lock -объекта, метод acquire() возвращает значение True .

      У Lock -объекта также есть метод locked() , который возвращает True если объект захвачен, False в противном случае.

      Мы уже успели познакомиться с Lock -объектом, когда изучали вопрос принудительного завершения работы потока в “Уроке 22. Процессы и потоки в Python . Часть 1 ”.

      Освободить Lock -объект может любой поток (на обязательно тот, который вызвал acquire() ).

      Хорошей практикой при работе с Lock -объектами является помещение кода работы с разделяемым ресурсом в блоке try , а освобождать блокировку следует в finally :

      Lock -объекты поддерживают протокол менеджера контекста (см. “ Урок 21. Работа с контекстным менеджером“ ), это позволяет работать с ними через оператор with . Приведенный выше код с try…finally эквивалентен следующему:

      RLock-объект

      В отличии от рассмотренного выше Lock -объекта RLock может освободить только тот поток, который его захватил . Повторный захват потоком уже захваченного RLock -объекта не блокирует его. RLock -объекты поддерживают возможность вложенного захвата, при этом освобождение происходит только после того, как был выполнен release() для внешнего acquire() . Сигнатуры и назначение методов release() и acquire() RLock -объектов совпадают с приведенными для Lock , но в отличии от него у RLock нет метода locked() . RLock -объекты поддерживают протокол менеджера контекста.

      Условные переменные (threading.Condition)

      Основное назначение условных переменных – это синхронизация работы потоков, которая предполагает ожидание готовности некоторого ресурса и оповещение об этом событии. Наиболее явно такой тип работы выражен в паттерне Producer-Consumer (Производитель – Потребитель). Условные переменные для организации работы внутри себя используют Lock- или RLock -объекты, захватом и освобождением которых управлять не придется, хотя и возможно, если возникнет такая необходимость.

      Порядок работы с условными переменными выглядит так:

      На стороне Consumer’а : проверить доступен ли ресурс, если нет, то перейти в режим ожидания с помощью метода wait() , и ожидать оповещение от Producer’а о том, что ресурс готов и с ним можно работать. Метод wait() может быть вызван с таймаутом, по истечении которого поток выйдет из состояния блокировки и продолжит работу.

      На стороне Producer’а : произвести работы по подготовке ресурса, после того, как ресурс готов оповестить об этом ожидающие потоки с помощью методов notify() или notify_all() . Разница между ними в том, что notify() разблокирует только один поток (если он вызван без параметров), а notify_all() все потоки, которые находятся в режиме ожидания.

      Ниже представлен пример работы с условной переменной.

      В этом примере мы создаем функцию order_processor , которая может реализовывать в себе бизнес логику, например, обработку заказа. При этом, если она получает сообщение stop , то прекращает свое выполнение. В главном потоке мы создаем и запускаем три потока для обработки заказов. Запущенные потоки видят, что очередь пуста и “встают на блокировку” при вызове wait() . В главном потоке в очередь добавляются десять заказов и сообщения для остановки обработчиков, после этого вызывается метод notify_all() для оповещения всех заблокированных потоков о том, что данные для обработки есть в очереди.

      При создании объекта Condition вы можете передать в конструктор объект Lock или RLock , с которым хотите работать. Перечислим методы объекта Condition с кратким описанием:

      • acquire(*args)
        • Захват объекта-блокировки.
        • Освобождение объекта-блокировки.
        • Блокировка выполнения потока до оповещения о снятии блокировки. Через параметр timeout можно задать время ожидания оповещения о снятии блокировки. Если вызвать wait() на Условной переменной, у которой предварительно не был вызван acquire() , то будет выброшено исключение RuntimeError .
        • Метод позволяет сократить количество кода, которое нужно написать для контроля готовности ресурса и ожидания оповещения. Он заменяет собой следующую конструкцию:
        • notify(n=1)
          • Снимает блокировку с остановленного методом wait() потока. Если необходимо разблокировать несколько потоков, то для этого следует передать их количество через аргумент n .
          • Снимает блокировку со всех остановленных методом wait() потоков.

          Семафоры (threading.Semaphore)

          Реализация классического семафора, предложенного Дейкстрой. Суть его идеи заключается в том, при каждом вызове метода acquire() происходит уменьшение счетчика семафора на единицу, а при вызове release() – увеличение. Значение счетчика не может быть меньше нуля, если на момент вызова acquire() его значение равно нулю, то происходит блокировка потока до тех пор, пока не будет вызван release() .

          Семафоры поддерживают протокол менеджера контекста.

          Для работы с семафорами в Python есть класс Semaphore , при создании его объекта можно указать начальное значение счетчика через параметр value . Semaphore предоставляет два метода:

          • acquire(blocking=True, timeout=None)
            • Если значение внутреннего счетчика больше нуля, то счетчик уменьшается на единицу и метод возвращает True . Если значение счетчика равно нулю, то вызвавший данный метод поток блокируется, до тех пор, пока не будет кем-то вызван метод release() . Дополнительно при вызове метода можно указать параметры blocking и timeout , их назначение совпадает с acquire() для Lock .
            • Увеличивает значение внутреннего счетчика на единицу.

            Существует ещё один класс, реализующий алгоритм семафора BoundedSemaphore , в отличии от Semaphore , он проверяет, чтобы значение внутреннего счетчика было не больше того, что передано при создании объекта через аргумент value , если это происходит, то выбрасывается исключение ValueError .

            С помощью семафоров удобно управлять доступом к ресурсу, который имеет ограничение на количество одновременных обращений к нему (например, количество подключений к базе данных и т.п.)

            В качестве примера приведем программу, моделирующую продажу билетов: обслуживание одного клиента занимает одну секунду, касс всего три, клиентов пять.

            Как вы можете видеть, вначале обслуживание получили клиенты с номерами 0, 1, 2 и только после того, как кассы по продаже билетов освободились, были обслужены клиенты 3 и 4.

            События (threading.Event)

            События по своему назначению и алгоритму работы похожи на рассмотренные ранее условные переменные. Основная задача, которую они решают – это взаимодействие между потоками через механизм оповещения. Объект класса Event управляет внутренним флагом, который сбрасывается с помощью метода clear() и устанавливается методом set() . Потоки, которые используют объект Event для синхронизации блокируются при вызове метода wait() , если флаг сброшен.

            Методы класса Event :

            • is_set()
              • Возвращает True если флаг находится в взведенном состоянии.
              • Переводит флаг в взведенное состояние.
              • Переводит флаг в сброшенное состояние.
              • Блокирует вызвавший данный метод поток если флаг соответствующего Event- объекта находится в сброшенном состоянии. Время нахождения в состоянии блокировки можно задать через параметр timeout .

              Пример работы с Event- объектом:

              Содержимое консоли после вызова приведенной Python -программы:

              Порядок пробуждения потоков при использовании объекта Event никак не регламентируется, поэтому вы можете видеть, что поток с номером 0 закончил работу последним, хотя был запущен первым.

              Таймеры (threading.Timer)

              Модуль threading предоставляет удобный инструмент для запуска задач по таймеру – класс Timer . При создании таймера указывается функция, которая будет выполнена, когда он сработает. Timer реализован как поток, является наследником от Thread , поэтому для его запуска необходимо вызвать start() , если необходимо остановить работу таймера, то вызовите cancel() .

              Конструктор класса Timer :

              Timer(interval, function, args=None, kwargs=None)

              • interval
                • Количество секунд, по истечении которых будет вызвана функция function .
                • Функция, вызов которой нужно осуществить по таймеру.
                • Аргументы функции function .

                Методы класса Timer :

                • cancel()
                  • Останавливает выполнение таймера

                  Пример работы с таймером:

                  Барьеры (threading.Barrier)

                  Последний инструмент для синхронизации работы потоков, который мы рассмотрим является Barrier . Он позволяет реализовать алгоритм, когда необходимо дождаться завершения работы группы потоков, прежде чем продолжить выполнение задачи.

                  Barrier(parties, action=None, timeout=None)

                  • parties
                    • Количество потоков, которые будут работать в рамках барьера.
                    • Определяет функцию, которая будет вызвана, когда потоки будут освобождены (достигнут барьера).
                    • Таймаут, который будет использовать как значение по умолчанию для методов wait() .

                    Свойства и методы класса:

                    • wait(timeout=None)
                      • Блокирует работу потока до тех пор, пока не будет получено уведомление либо не пройдет время указанное в timeout.
                      • Переводит Barrier в исходное (пустое) состояние. Потокам, ожидающим уведомления, будет передано исключение BrokenBarrierError .
                      • Останавливает работу барьера, переводит его в состояние “разрушен” ( broken ). Все текущие и последующие вызовы метода wait() будут завершены с ошибкой с выбросом исключения BrokenBarrierError .
                      • Количество потоков, которое нужно для достижения барьера.
                      • Количество потоков, которое ожидает срабатывания барьера.
                      • Значение флага равное True указывает на то, что барьер находится в “разрушенном” состоянии.

                      Пример работы с классом Barrier :

                      Результат работы программы:

                      Данный объект синхронизации может применяться в случаях когда необходимо дождаться результатов работы всех потоков (как в примере выше), либо для синхронизации процесса инициализации потоков, когда перед стартом их работы требуется, чтобы все потоки выполнили процедуру инициализации.

                      P.S.

                      Вводные уроки по “Линейной алгебре на Python” вы можете найти соответствующей странице нашего сайта . Все уроки по этой теме собраны в книге “Линейная алгебра на Python”.
                      Книга: Линейная алгебра на Python
                      Если вам интересна тема анализа данных, то мы рекомендуем ознакомиться с библиотекой Pandas. Для начала вы можете познакомиться с вводными уроками. Все уроки по библиотеке Pandas собраны в книге “Pandas. Работа с данными”.
                      Книга: Pandas. Работа с данными

                      голоса
                      Рейтинг статьи
Ссылка на основную публикацию
Adblock
detector