QNX/UNIX: Анатомия параллелизма

Цилюрик Олег Иванович

Горошко Егор

Зайцев Владимир

4. Примитивы синхронизации

 

 

ОС QNX Neutrino предоставляет широкий набор элементов синхронизации выполнения потоков, как в рамках одного процесса, так и разных. Это практически полный спектр примитивов, описываемых как базовым стандартом POSIX, так и всеми его расширениями реального времени. Тем не менее при работе со всеми этими примитивами не покидает ощущение, что некоторые из них являются органичными для самой ОС (мьютекс, условная переменная), в то время как другие — достаточно громоздкая надстройка над базовыми механизмами, реализуемая, главным образом, в угоду POSIX.

Примечание

К сожалению, и техническая документация QNX [8], и фундаментальная книга Р. Кертена [1] написаны по одной схеме: [35] все, что касается примитивов синхронизации, введенных более поздними расширениями POSIX (барьеры, жесткая блокировка (sleepon), спинлок, блокировки чтения-записи), описывается детально и сопровождается обстоятельными примерами кода, а вот базовые понятия, такие как pthread_mutex_t , sem_t (да и pthread_cond_t , по существу), описаны лишь качественно, «на пальцах», в иллюстративных рассказах об алгоритме пользования ванной комнатой и кухней (термин bathroom встречается намного чаще, чем pthread_mutex_t ). Мы попытаемся по возможности компенсировать этот перекос.

Хотелось бы обратить внимание на интересный факт. В POSIX-варианте API QNX представлен большой набор разнообразных средств синхронизации: мьютексы, условные переменные, семафоры, барьеры, блокировки чтения/записи, ждущие блокировки, спинлоки. Однако в родном native API QNX из всего этого многообразия мы видим всего три элемента синхронизации: мьютекс, семафор и условная переменная. И это при том, что условная переменная не является самостоятельным средством синхронизации и применяется как расширение функциональных возможностей мьютекса!

Это означает, что все многообразие средств синхронизации, предоставляемое в POSIX API QNX, строится исключительно с применением этих минимальных средств синхронизации. Действительно, анализ заголовочных файлов системы показывает, что все дополнительные средства синхронизации, появляющиеся в POSIX API, строятся с использованием только мьютекса и условной переменной. Можно сказать, что пара мыотекс-условная переменная с одной стороны и семафор с другой являются независимыми базисами, каждый из которых позволяет построить практически любой, сколь угодно специфический элемент синхронизации, удовлетворяющий потребностям, которые могут возникнуть при построении вашей системы. Мы проиллюстрируем эту мысль позже, при рассмотрении особенностей применения мьютексов.

Независимо от того, какой набор элементов синхронизации окажется предпочтительным для разрабатываемой вами системы, важным является тот факт, что в случае, если предлагаемые системой средства синхронизации по каким-то причинам вас не устраивают, то ничто не мешает разработать собственные средства синхронизации, используя тот или иной базис или даже их комбинацию. И эффективность полученного нового средства синхронизации будет зависеть только от вас.

Мы постараемся проиллюстрировать эту идею примерами использования базовых средств синхронизации в качестве «конструктора» при построении более сложных. Три базовых элемента синхронизации ОС QNX — мьютекс, условная переменная и семафор — реализуются на уровне микроядра системы и создаются вызовом системной функции:

SyncTypeCreate(unsigned type, sync_t* sync, const struct _sync_attr_t* attr);

Здесь type может принимать значения:

_NTO_SYNC_MUTEX_FREE — для создания мьютекса;

_NTO_SYNC_SEM — для создания семафора;

_NTO_SYNC_COND — для создания условной переменной.

Стоит обратить внимание на два важных факта. Во-первых, типы мьютекса (pthread_mutex_t), условной переменной (pthread_cond_t) и семафора (sem_t) являются псевдонимами типа sync_t. А во-вторых, типы атрибутов этих объектов также являются псевдонимами одного-единственного типа — sync_attr_t, содержащего поле protocol, значение которого определяет, каким способом ОС будет варьировать приоритеты во избежание их инверсии.

В соответствии с документацией QNX 6.2.1 это поле присутствует у параметров всех трех базовых объектов синхронизации, но доступно оно только для переопределения мьютексов. По умолчанию (например, когда вместо attr передается NULL) поле protocol принимает значение PTHREAD_PRIO_INHERIT. Это означает, что ОС будет использовать протокол наследования приоритетов для предотвращения инверсии.

Таким образом, все примитивы синхронизации QNX в теории могут учитывать эффект инверсии приоритетов. Однако на практике важны следующие два вопроса: возможно ли в принципе возникновение ситуации инверсии приоритетов для данного типа примитивов, и если да, то актуально ли для данного типа примитивов препятствовать возникновению инверсии (в качестве иллюстрации этого утверждения можно рассмотреть примитив типа барьер).

Тесты [4] показывают, что только для мьютексов наследованием приоритетов (или альтернативными протоколами) однозначно предотвращается возникновение инверсии приоритетов. По-видимому, в первую очередь это связано с тем, что сама по себе инверсия может возникнуть именно в тех случаях, когда из всех элементов синхронизации необходимо использовать именно мьютекс или что-либо из его наследников (например, блокировки чтения-записи).

 

Семафор (счетный)

 

Семафор является весьма специфическим (в сравнении с прочими) для ОС QNX средством синхронизации, и хотя функции работы с ним также определяются стандартом POSIX, даже семантика этих функций отличается от всех прочих объектов синхронизации.

Примечание

Функции работы с семафором определяются стандартом POSIX 1003.1 (1993) расширения реального времени, а не стандартом POSIX 1003b (1995), которым определены pthread_* и все другие примитивы синхронизации; соответственно функции манипулирования семафорами не начинаются с префикса pthread_* .

В классической работе Дейкстры [10] семафор определяется как объект, над которым можно провести две атомарные операции: инкремент и декремент внутреннего счетчика — при условии, что внутренний счетчик не может принимать значение меньше нуля. Если некий поток пытается уменьшить на единицу значение внутреннего счетчика семафора, значение которого уже равно нулю, то этот поток блокируется до тех пор, пока внутренний счетчик семафора не примет значение, равное 1 или больше (посредством воздействия на него других потоков). Разблокированный поток сможет осуществить декремент нового значения.

Принято рассматривать семафоры двух видов: бинарные, счетчик которых может принимать только значения 0 либо 1, и счетные, или простые, счетчик которых может принимать большие положительные значения. В QNX 6.2.1 максимальное значение счетчика определяется переменной SEM_VALUE_MAX, значение которой равно 32 767.

Отметим важный момент: семафор является наиболее простым и соответственно наиболее универсальным средством синхронизации. И классические решения задач раздельного использования ресурсов, предложенные в теории первоначально Э. Дейкстрой, базируются именно на понятии семафора. Однако в случае систем реального времени применение семафоров для разделения доступа к ресурсу влечет потенциальную опасность возникновения инверсии приоритетов [4], избежать которой никак нельзя.

Причина этого в том, что семафор по определению не может знать своего владельца (захватившего его), поскольку у счетного семафора в принципе не может быть владельца, а бинарный семафор, который отличает своего владельца (поток, заблокировавший в данный момент другие потоки на обращении к семафору), уже перестает быть собственно семафором и называется мьютексом, или эксклюзивной блокировкой. По-видимому, именно этим обстоятельством вызван тот факт, что все объекты синхронизации QNX, не реализованные на уровне native API, но предоставляемые на уровне API POSIX, строятся без использования семафоров.

Принципиально схема работы семафора позволяет использовать его для решения максимально широкого круга задач. Приведем только несколько схематичных примеров:

• Ожидание условия (уведомление). Часто возникает необходимость остановить (заблокировать) поток до тех пор, пока не наступит некое событие (выполнится условие). Разблокировать остановленный поток в таком случае может только другой, активный к этому времени поток. Предварительно инициализируем семафор нулевым значением:

      Поток А             Поток В

while (!expression)  expression = true;

sem_wait(&sem);      sem_post(&sem);

В этом случае поток А ожидает выполнения некоего условия (операция sem_wait()), а поток В уведомляет (операция sem_post()) о выполнении условия любые ожидающие этого условия потоки.

Примечание

В принципе конструкция while() здесь не обязательна, можно обойтись и простым if() , но проверка выполнения условия и после разблокирования потока будет более строгой формой в многопоточной системе, особенно если выполнения условия ожидают более одного потока.

• Взаимное исключение. С помощью семафора можно решить и другую классическую проблему синхронизации — безопасное совместное исполнение кода. Эта проблема возникает, когда из нескольких потоков необходимо провести модификацию общих переменных или обратиться к одному системному ресурсу. В таком случае необходимо установить четкий порядок обращений, не допускающий одновременности исполнения соответствующего участка кода (взаимное исключение). Здесь семафор инициализируется единицей:

             Поток А                             Поток В

sem_wait(&sem);                      sem_wait(&sem);

/* код, который нужно защитить       /* код, который нужно защитить

   от совместного использования. */     от совместного использования. */

sem_post(&sem);                      sem_post(&sem);

В данном случае последовательность доступа потоков к защищаемому участку кода (такой участок кода еще называют критической секцией) не играет никакой роли; здесь важно не допустить, чтобы два потока исполняли этот код одновременно.

Поскольку семафор инициализируется единицей, первый вызов sem_wait() не приводит к блокированию потока, а лишь понижает счетчик семафора до нуля. В таком положении семафора любой поток, повторно вызвавший sem_wait() на входе в критическую секцию, будет блокирован до тех пор, пока первый вошедший поток не покинет этот участок кода и не вызовет sem_post(). После этого один из потоков, ожидающих увеличения счетчика семафора, получит управление (это будет один поток, и нельзя заранее сказать, какой именно) и выполнит операцию декремента над счетчиком, вновь заблокировав вход в критическую секцию. Таким образом можно гарантировать строго последовательное выполнение такого кода.

Однако при использовании семафоров для решения задачи взаимного исключения потоков разного приоритета может возникнуть серьезная проблема, известная как инверсия приоритетов. Вопрос инверсии рассматривался в нашей работе [4], и мы не будем здесь подробно останавливаться на этом. В простейшем случае проблема заключается в том, что если в системе присутствуют несколько (3 или более) потоков разного приоритета (высокого, среднего и низкого), использующих общий ресурс, то возможно возникновение ситуации, когда высокоприоритетный поток будет блокирован в ожидании ресурса, ранее захваченного потоком самого низкого приоритета, который в свою очередь вытеснен потоком среднего приоритета, причем такой неразрешимый «клинч» может продолжаться неограниченно долго.

Каким же образом можно предотвратить подобную ситуацию? Для этого необходимо проводить манипуляции с приоритетом потока, входящего в критическую секцию. Однако после выполнения потоком функции sem_wait() (при этом счетчик семафора уменьшается до нуля) и перехода к выполнению кода критической секции уже никак нельзя определить, какой поток заблокировал семафор, и нельзя ничего сделать с его приоритетом.

Для того чтобы система имела возможность влиять на поведение потоков с точки зрения профилактики инверсии приоритетов и взаимных блокировок («мертвых объятий» — deadlock) потоков или других подобных проблем, вызванных взаимным влиянием потоков, необходимо, чтобы объект синхронизации явным образом хранил информацию о том потоке, который его захватил (то есть знал своего хозяина). Семафор такой информации не хранит, и это необходимо помнить при проектировании системы с его использованием. Применение семафора оптимально для случаев слабо связанных и в идеале равноприоритетных потоков. Собственно, для этих случаев семафор как средство синхронизации и разрабатывался [10].

• Частным случаем задачи взаимного исключения является классическая задача последовательного доступа по типу производитель/потребитель. Такая ситуация возникает, когда один поток передает другому данные через общую переменную. Пока производитель не «положит» новые данные в эту переменную, потребитель должен простаивать в ожидании.

Приведем классическое решение этой задачи. В этом случае нам понадобится два семафора (по одному на каждый поток), которые должны инициализироваться следующим образом: тот, который защищает чтение (потребление), инициализируется нулем (блокирует читающий поток), а тот, который защищает запись (производство), — единицей (открывает доступ «писателю» к общему ресурсу).

         Поток А                   Поток В

sem_wait(&sem_A);         sem_wait(&sem_B);

/* критическая секция */  /* критическая секция */

sem_post(&sem_B);         sem_post(&sem_A);

Положим, поток А является производителем данных, необходимых для работы потока В. Соответственно семафор sem_A инициализирован 1, а семафор sem_B инициализирован 0. Когда поток В попытается обратиться к общей переменной за данными для работы, он будет блокирован в ожидании результатов работы потока А. Поток А, подготовив необходимые данные, войдет в критическую секцию (поскольку его семафор разблокирован), установит новые данные и, покидая критическую секцию, разблокирует семафор потока В. После этого поток В будет разблокирован и сможет получить новые данные. Обратите внимание, что если данные производятся в цикле (а это обычная ситуация), то поток А не сможет повторно получить доступ к общей переменной до тех пор, пока поток В не закончит чтение этой переменной и не покинет критическую секцию.

Примечание

Описанная схема не является универсальной и хорошо работает для двух потоков. Однако возможна ситуация, когда существует множество потоков потребителей и производителей. В ОС QNX существует специальный примитив синхронизации, называемый «блокировкой чтения/записи» и предназначенный для синхронизации доступа в такой ситуации. Этот примитив предоставляет множественный доступ к критической секции кода со стороны «читателей», поскольку они не изменяют содержимого общих данных, и устанавливает эксклюзивный доступ для «писателей». Этот примитив будет подробнее рассмотрен позже.

• До сих пор мы фактически рассматривали работу семафора в бинарном режиме. Для большого количества задач именно такой режим семафора и является основным. Однако возможности счетного семафора, позволяющего контролировать количество обращений, допускают его применение в весьма специфических задачах, где другие средства синхронизации требуют от программиста дополнительной работы.

Проиллюстрируем работу счетного семафора. Предположим, нам надо синхронизировать работу двух потоков так, чтобы один из них всегда шел «вслед» за другим, то есть выполнял некие циклические операции только после их выполнения первым потоком. Для реализации этой схемы нам понадобится семафор, инициализированный нулевым значением.

     Поток А           Поток В

while (true)     while (true)

{                {

 /* работа A; */   /* sem_wait(&sem); */

 sem_post(&sem);  работа B;

}                }

Поток А в каждом цикле увеличивает счетчик семафора на 1, а поток В в свою очередь стремится «выбрать слабину» и в каждом цикле уменьшает этот счетчик на единицу до тех пор, пока он не достигнет нуля. Вне зависимости от приоритетов, дисциплины диспетчеризации или любых источников блокировки потоков поток В будет «следовать» за потоком А и выполнять не больше циклов, чем его «ведущий» поток.

Всеми этими примерами мы хотели показать главную особенность семафора: любой поток может менять его состояние (внутренний счетчик), тем самым делая это элемент синхронизации идеальным средством управления порядком выполнения потоков. Семафор является идеальным инструментом для построения собственных средств планирования (диспетчеризации) выполнения потоков вашей системы. Стандарт POSIX предусматривает наличие специальных «именованных» семафоров, к которым может обращаться любой поток, принадлежащий любому процессу, выполняющемуся в системе, а в ОС QNX к семафорам может обращаться и любой поток любого процесса во всей сети QNX. Таким образом, семафор позволяет осуществлять планирование выполнения задач даже для вашей распределенной сетевой системы.

Ниже мы приводим краткое описание функций работы как с неименованными, так и с именованными семафорами, реализованными в ОС QNX. Функции работы с семафорами объявлены в заголовочном файле . Если для работы какой-либо функции необходимы дополнительные заголовочные файлы, они будут указываться явно.

 

Операции над семафорами

 

Создание семафора

QNX поддерживает два типа семафоров — неименованные и именованные. Разница между ними заключается в том, что к именованному семафору можно обратиться из любого процесса в системе (или даже по сети QNET с другого сетевого хоста), поскольку такой семафор имеет ассоциированное с ним имя в файловой системе QNX. Необходимо помнить, что именованные семафоры, при прочих равных условиях, медленнее и требуют для своей работы запущенного в системе менеджера очередей сообщений POSIX (mqueue).

Для каждого типа семафоров существует своя группа функций, применение которых не должно смешиваться.

sem_init() и sem_destroy() — создание и разрушение неименованного семафора. При создании указывается параметр доступа из других потоков и начальное значение счетчика семафора. С неинициализированным семафором никаких операций проводить нельзя (это общее правило справедливо и для всех иных примитивов синхронизации). После разрушения семафора его необходимо повторно инициализировать для использования.

Обе функции возвращают 0 в случае успеха и -1 в случае ошибки. Код ошибки записывается в переменной errno. В частности, функция sem_init() может сигнализировать о следующих ошибках выполнения:

EAGAIN — в данный момент нет ресурсов для инициализации семафора;

EINVAL — начальное значение счетчика превышает SEM_VALUE_MAX;

EPERM — у процесса недостаточно привилегий для инициализации семафора;

ENOSPC — ресурсы, необходимые для инициализации, исчерпаны;

ENOSYS — функция sem_init() не поддерживается реализацией системы.

При вызове функции sem_destroy() может регистрироваться только одна ошибка:

EINVAL — неправильный описатель семафора.

sem_open() и sem_close() — открытие и закрытие именованного семафора (если отсутствует ранее созданный семафор с таким именем, то его создание). В вопросе подключения и отключения работа с именованным семафором аналогична работе с обычным файлом. Также для именованных семафоров существует операция sem_unlink(), аналогичная операции unlink() для обычного файла. В функцию sem_open() передается имя семафора, параметры открытия семафора и дополнительные параметры в случае создания семафора.

 

Операции блокировки

Для семафора определены три модификации операции блокировки:

int sem_wait(sem_t* sem);

int sem_trywait(sem_t* sem);

#include

int sem_timedwait(sem_t* sem, const struct timespec * abs_timeout);

Все эти функции опираются на функцию (native QNX API):

int SyncSemWait(sync_t* sync, int try);

Функция простого ожидания sem_wait() пытается выполнить декремент счетчика семафора. Если исходное значение счетчика было больше или равно 1, то функция после выполнения операции декремента возвращает управление в вызвавший код. Если же значение внутреннего счетчика семафора равнялось 0, выполнение вызвавшего функцию потока блокируется до тех пор, пока какой-либо другой поток не увеличит значение счетчика семафора. После того как значение счетчика будет увеличено, блокированный поток получает управление (в порядке очереди), выполняет (завершает) декремент счетчика семафора и возвращает управление. Блокированное состояние потока может быть также прервано получением направленного ему сигнала (см. главу 3).

Функция ожидания с проверкой семафора, sem_trywait(), до выполнения операции над семафором проверяет значение счетчика. Если это значение больше нуля, функция выполняет декремент счетчика, а если значение равно нулю - возвращает управление потоку, не блокируясь на ожидании доступности семафора (но захват семафора в этом случае не происходит, о чем извещает код возврата).

Функция ожидания с тайм-аутом, sem_timedwait(), ожидает возможности уменьшить на 1 счетчик семафора (блокируется на ожидании) до момента времени abs_timeout. Это ожидание реализовано с помощью функции TimerTimeout() (native QNX API) с параметром SIGEV_UNBLOCK. Благодаря этой функции может быть прерван ряд состояний блокировки потоков (в том числе блокировки на семафоре, мьютексе и условной переменной).

 

Операции освобождения

int sem_post(sem_t* sem);

Эта операция увеличивает на единицу (инкремент) внутренний счетчик семафора, и в этом она диаметрально противоположна операции блокировки. Если перед этим значение счетчика семафора было равно 0, то один из потоков, ожидающих разблокирования семафора, переходит в состояние готовности. Из списка ожидающих потоков система выбирает самый приоритетный, а если таких несколько, то поток, дольше всех ждавший из очереди наиболее приоритетных потоков.

Эта функция имеет свой оригинал в native API QNX:

int SyncSemPost(sync_t* sync);

Фактически разница между POSIX и QNX API в вариантах этой функции состоит в регистрируемых ею ошибках.

Функция sem_post() сообщает о следующих ошибках:

EINVAL — неверный дескриптор семафора sem;

ENOSYS — функция sem_post() не поддерживается системой.

В отличие от sem_post(), функция SyncSemPost() может указывать на ошибки:

EAGAIN — недостаточно памяти для создания внутреннего объекта синхронизации;

EFAULT — неверный указатель на семафор sync;

EINTR — выполнение функции прервано сигналом;

EINVAL — аргумент sync не указывает на инициированный семафор.

Как видим, функция QNX API несколько разнообразнее в плане контроля передаваемых аргументов и результата выполнения функции.

 

Получение статуса семафора

int sem_getvalue(sem_t* sem, int* value);

Эта функция используется преимущественно для отладки операций над семафорами. По адресу, указанному в value, устанавливается текущее значение счетчика семафора. Поскольку значение счетчика семафора может измениться в любой момент, то значение, которое возвращает эта функция, имеет смысл только непосредственно в точке ее вызова.

Возможные ошибки:

EINVAL — неправильный объект семафор sem.

 

Использование семафора

Как уже говорилось выше, семафор является крайне гибким и эффективным средством синхронизации, особенно удобным для построения собственных средств планирования выполнения потоков. В этом смысле семафор представляет ценность не только как самостоятельное средство синхронизации выполнения потоков, но и как материал для построения специфических средств планирования и синхронизации для конкретных задач. Мы уже говорили, что семафор образует самодостаточный базис, позволяющий строить гораздо более сложные средства синхронизации без привлечения других средств синхронизации. В принципе это так, но нет ничего плохого и в смешанном использовании как семафоров, так и мьютексов для построения собственных средств синхронизации.

Проиллюстрируем все вышесказанное на двух примерах. Сначала мы построим «очередь сообщений», предназначенную для трансляции сообщений графической системы к «медленному» обработчику реакций. Это одно из решений весьма распространенной задачи о предотвращении «зависания» пользовательского интерфейса на период выполнения медленного обработчика. Для решения этой задачи обработчик события оконной системы (например, нажатия кнопки или выбора пункта меню) и функция, которая непосредственно производит требуемые действия (предусмотренные по наступлению указанного события - нажатия кнопки), должны располагаться в разных потоках.

Было бы удобно, если бы при поступлении новых данных от графической системы поток обработки автоматически (неявно) разблокировался и немедленно приступал к обработке, а в периоды отсутствия таких данных - простаивал в блокированном состоянии. Для реализации такой схемы мы построили синхронизирующую очередь сообщений, которая использует семафор для уведомления потока обработки о наличии новых данных. В принципе указанная задача сводится к уже упоминавшемуся ранее классу задач о синхронизации производителя и потребителя данных.

class event {

 /* класс синхронизирующего события, доставляющего

    уведомление о добавлении нового элемента в буфер */

public:

 event() { sem_init(&_block, 0, 0); }

 ~event() { sem_destroy(&_block); }

 void wait() { sem_wait(&_block); }

 void reset() { sem_post(&_block); }

private:

 sem_t _block;

};

/* шаблонный класс очереди данных */

template class CDataQueue {

public:

 CDataQueue() {}

 ~CDataQueue() {}

 void push(T _new_data) {

  _data_queue.push(_new_data);

  data_event.reset();

 }

 T pop() {

  data_event.wait();

  T res = _data_queue.front();

  _data_queue.pop();

  return res;

 }

private:

 std::queue _data_queue;

 event data_event;

};

Принцип работы CDataQueue заключается в том, что для хранения вновь поступающих данных используется очередь, что делает практически независимыми потоки производителя и потребителя. Независимыми во всех случаях, кроме пустой очереди. Потребитель должен быть блокирован до тех пор, пока нет данных от производителя. Как только производитель внесет данные в очередь, поток потребителя разблокируется и считает эти данные. Тонкость заключается в том, что поток потребителя блокируется сам при вызове функции pop(), а разблокируется из потока производителя при вызове им функции push().

Как видите, в построении специфических средств синхронизации нет ничего сложного, вопреки часто встречающемуся утверждению, что создание средств синхронизации со специфическим поведением неадекватно трудоемко, а простейший код позволяет адаптировать возможности тривиального семафора под конкретную задачу.

А теперь хотелось бы обратить ваше внимание на тот факт, что «безопасным» использованием описанной схемы будет только вариант двух потоков — одного производителя и одного потребителя. Если несколько (более двух) потоков одновременно попробуют выполнить функции pop() или push(), начнется путаница, и чем это закончится, сказать трудно. По своей логике код обеих функций в многопоточной системе требует эксклюзивного исполнения. Чтобы обеспечить исключительный доступ к этим участкам кода, мы могли бы использовать дополнительный семафор, но есть другой вариант — специальное средство синхронизации, разработанное именно для решения задачи взаимного исключения, - мьютекс.

 

Мьютекс

 

Мьютекс (от mutual exclusion — взаимное исключение) — это один из базовых примитивов синхронизации QNX Neutrino. Этот элемент реализуется на уровне микроядра системы и имеет широкий набор атрибутов и настроек. Назначение мьютекса — защита участка кода от совместного выполнения несколькими потоками. Такой участок кода иногда называют критической секцией, и обычно он является областью модификации общих переменных или обращения к разделяемому ресурсу.

Принцип работы мьютекса заключается в следующем: при обращении потока к функции блокировки (захвата) pthread_mutex_lock() проверяется, захвачен ли уже мьютекс, и если да, то вызвавший поток блокируется до освобождения критической секции. Если же нет, то объект мьютекс запоминает, какой поток его захватил (то есть владельца) и устанавливает признак, что он захвачен.

Когда действия, которые нельзя производить совместно, закончены, поток должен вызвать функцию разблокировки (освобождения) pthread_mutex_unlock(), которая проверяет, действительно ли вызвавший ее поток является тем, который в данный момент владеет мьютексом, и если да, то она разблокирует мьютекс, после чего ОС проводит редиспетчеризацию потоков. Если есть потоки, ожидающие освобождения мьютекса, то один из таких потоков, имеющий наивысший приоритет, переводится из состояния блокирования в состояние готовности и захватывает мьютекс.

В QNX Neutrino 6.2.1 мьютекс имеет наибольшие возможности по тонкой настройке своих параметров среди всех иных элементов синхронизации. В связи с этим поведение мьютекса очень сильно зависит от того, какие значения вы присвоите его атрибутам.

Как видите, главное отличие мьютекса от семафора заключается в том, что он хранит информацию о потоке, исполняющем код критической секции. Отсюда и важнейшие свойства мьютекса. Мьютекс нельзя разблокировать из другого потока. Если поток захватил мьютекс, то только он может его «отпустить». Используя информацию о владельце (tid потока), система может изменять в нужное время приоритет владельца для разрешения проблемы инверсии приоритетов. Наконец, зная идентификатор потока, мьютекс может выделить ситуацию, когда поток, уже захвативший мьютекс, пытается захватить его повторно (одна из разновидностей deadlock — мертвой блокировки, когда не существует ни одного потока, способного отпустить мьютекс и разблокировать потоки, ожидающие освобождения этого мьютекса).

В ОС QNX возможен вариант работы мьютекса, не предусмотренный стандартом POSIX, — рекурсивный мьютекс. В этом режиме поток, владеющий мьютексом при повторном его захвате, не блокируется. Мьютекс только отмечает в своем внутреннем счетчике, сколько раз он был захвачен, и разблокируется только после равного количества освобождений (естественно, тем же потоком).

Все объявления относительно мьютексов находятся в заголовочном файле , и программный код, их использующий, должен включать директиву:

#include

 

Параметры мьютекса

 

Параметры мьютекса хранятся в структуре pthread_mutexattr_t, которая определена типом sync_attr_t. Эта структура должна быть, создана и определена до инициализации мьютекса, после чего может быть переопределена и использована для других объектов типа мьютекс.

 

Инициализация параметров

int pthread_mutexattr_init(const pthread_mutexattr_t* attr);

Функция инициализирует структуру атрибутов мьютекса, на которую указывает параметр attr. Тип данных pthread_mutexattr_t определен в файле (производный от типа sync_attr_t, который в свою очередь определен в файле ) следующим образом:

struct _sync_attr_t {

 int protocol;

 int flags;

 int prioceiling;

 int clockid; /* только для condvar */

 int reserved[4];

};

После инициализации значения по умолчанию могут быть прочитаны или изменены группой функций, приведенной ниже.

 

Установка граничного приоритета

int pthread_mutexattr_setprioceiling(

 pthread_mutexattr_t* attr, int prioceiling);

int pthread_mutexattr_getprioceiling(

 const pthread_mutexattr_t* attr, int* prioceiling);

Эти функции записывают/читают значение приоритета, которое будет присваиваться потоку, захватившему мьютекс, если поле protocol структуры pthread_mutexattr_t установлено в значение PTHREAD_PRIO_PROTECT.

Этот параметр используется для реализации протокола граничного приоритета (Priority Ceiling Protocol), предлагающего альтернативный вариант защиты от инверсии приоритетов там, где наследование приоритетов может быть неэффективно или нежелательно.

Примечание

Как известно, классический случай инверсии приоритетов может возникать, когда более двух потоков разного приоритета конкурируют и разделяют один общий ресурс. В этой ситуации возможно такое стечение обстоятельств, когда низкоприоритетный поток захватывает ресурс, но позже вытесняется потоком среднего приоритета, а поток с высоким приоритетом блокируется в ожидании освобождения ресурса, захваченного низкоприоритетным потоком. Эта ситуация детально описана в соответствующей главе [4], где приводится сравнительный анализ поведения различных ОС в этой ситуации. Классическим решением этой проблемы является протокол наследования приоритетов, когда ОС распознает ситуацию подобного блокирования и автоматически повышает приоритет вытесненного потока (низкого приоритета) до уровня наивысшего приоритета из числа потоков, ожидающих освобождения ресурса. В результате поток с низким приоритетом не вытесняется, как надлежало бы в соответствии с его изначальным приоритетом, а быстро проходит критическую секцию (освобождая ее), после чего его приоритет возвращается на исходный уровень.

В ряде случаев наследование приоритетов может оказаться не самым оптимальным решением. Примером здесь может служить ситуация, когда один высокоприоритетный поток разделяет много ресурсов с низкоприоритетными потоками — по одному ресурсу на каждый поток. В такой ситуации может возникнуть положение, когда много низкоприоритетных потоков (вытесненных) выстроятся перед высокоприоритетным. Но тогда длинный ряд последовательных операций вытеснения («поштучно») и наследования приоритетов блокирующих потоков может привести к тому, что не хватит времени до окончания критического срока выполнения потока высокого приоритета, пока ОС будет анализировать ситуацию и последовательно проводить протокол наследования приоритетов.

Именно для таких ситуаций и предназначен протокол граничного приоритета. В соответствии с этим протоколом примитив синхронизации (в нашем рассмотрении это мьютекс) наделяется собственным фиксированным приоритетом, а приоритет любого потока, захватившего этот мьютекс, поднимается до предустановленного граничного уровня приоритета мьютекса.

 

Определение протокола защиты от инверсии приоритетов

int pthread_mutexattr_setprotocol(

 pthread_mutexattr_t* attr, int protocol);

int pthread_mutexattr_getprotocol(

 pthread_mutexattr_t* attr, int* protocol);

Эти функции устанавливают/считывают протокол, который реализуется мьютексом для защиты от инверсии приоритетов. Переменная protocol может принимать следующие значения:

PTHREAD_PRIO_INHERIT (значение по умолчанию) — определяет, что для воспрепятствования возникновению инверсии приоритетов будет использоваться протокол наследования приоритетов.

PTHREAD_PRIO_PROTECT — любой поток, захвативший мьютекс и созданный с таким параметром, будет устанавливать фиксированный уровень приоритета в соответствии со значением поля prioceiling, возвращаемого функцией pthread_mutexattr_getprioceiling(). Таким образом, установка этого значения в качестве протокола мьютекса приводит к реализации протокола граничного приоритета для защиты от инверсии приоритетов.

 

Внешний доступ

int pthread_mutexattr_setpshared(

 pthread_mutexattr_t* attr, int pshared);

int pthread_mutexattr_getpshared(

 const pthread_mutexattr_t* attr, int* pshared);

Эти функции устанавливают/считывают внутреннее поле атрибутной записи мьютекса, определяющее, возможен ли доступ к мьютексу из потоков, запущенных вне процесса, в котором был создан и инициализирован мьютекс. Параметр pshared может принимать следующие значения:

PTHREAD_PROCESS_SHARED — любой поток из любого процесса в системе, который может получить доступ к синхронизирующему объекту (для этого придется использовать какой-либо из методов IPC, возможно shared memory), может использовать его по назначению.

PTHREAD_PROCESS_PRIVATE (значение по умолчанию) — мьютекс может использоваться только потоками, порожденными в том же процессе, где был инициализирован мьютекс. В документации сказано: попытка захвата мьютекса с таким значением параметра доступа к потокам из «чужого» процесса приведет к неопределенному результату. На практике же функция захвата возвращает управление в любом случае, независимо от того, был ли уже захвачен мьютекс другим потоком или нет (как будто происходит нормальный захват). Состояние мьютекса при этом никак не меняется.

 

Разрешение рекурсивного захвата

int pthread_mutexattr_setrecursive(

 pthread_mutexattr_t* attr, int recursive);

int pthread_mutexattr_getrecursive(

 const pthread_mutexattr_t* attr, int* recursive);

Функции устанавливают/считывают в атрибутной записи мьютекса признак, определяющий, может ли поток, ранее захвативший мьютекс (его владелец), захватить его еще раз (естественно, что любой другой поток захватить такой мьютекс уже не может и он будет заблокирован). Режим реализован для возможности рекурсивного вызова процедур в потоке. Необходимо помнить, что при рекурсивном захвате мьютекс должен быть освобожден столько раз, сколько раз он был захвачен. Параметр recursive может принимать следующие значения:

PTHREAD_RECURSIVE_ENABLE — разрешает рекурсивный захват мьютекса;

PTHREAD_RECURSIVE_DISABLE (значение по умолчанию) — запрещает рекурсивный захват мьютекса. В результате при попытке захвата мьютекса потоком, который им уже владеет, вызов pthread_mutex_lock() не приведет к захвату мьютекса и вернет значение EDEADLK.

 

Определение типа мьютекса

int pthread_mutexattr_settype(

 pthread_mutexattr_t* attr, int type);

int pthread_mutexattr_gettype(

 const pthread_mutexattr_t* attr, int* type);

В версиях QNX 6.2.1 и 6.3 предусматривается создание мьютексов следующих типов:

• PTHREAD_MUTEX_NORMAL — для этого типа не проводится контроль «мертвой блокировки» (deadlock) в ситуации, когда поток, захвативший мьютекс, пытается захватить его повторно. Поэтому при попытке повторного захвата такого мьютекса тем же потоком этот поток будет безусловно блокирован (то есть он попадает в «мертвую блокировку», а это во всех случаях аварийная ситуация в выполнении приложения); такой мьютекс уже некому разблокировать (мьютекс может разблокироваться только своим владельцем). Попытка освободить (unlock) мьютекс такого типа, захваченный другим потоком, или освободить незахваченный мьютекс ни к чему не приводит, при этом не возвращается ошибка выполнения.

• PTHREAD_MUTEX_ERRORCHECK — включается контроль ошибок. В этом режиме регистрируются следующие ситуации:

• попытка повторного захвата мьютекса тем же потоком;

• попытка освобождения мьютекса, захваченного другим потоком;

• освобождение свободного мьютекса.

• PTHREAD_MUTEX_RECURSIVE — мьютекс, допускающий рекурсивный захват. Поток, пытающийся захватить мьютекс, уже захваченный в этом потоке, сможет это сделать, при этом количество захватов будет учитываться при освобождении мьютекса. Другой поток сможет захватить такой мьютекс только тогда, когда он будет освобожден столько же раз, сколько был захвачен. Если поток пытается освободить мьютекс, захваченный другим потоком, или свободный мьютекс, то будет возвращено сообщение об ошибке (регистрируются ошибки, предусмотренные предыдущим типом, за исключением повторного захвата, который является для рекурсивного мьютекса штатным действием).

Примечание

Обратите внимание, что разрешение рекурсивного захвата мьютекса необходимо проводить установкой двух параметров ( type и recursive ).

• PTHREAD_MUTEX_DEFAULT (значение по умолчанию) — попытка рекурсивного захвата мьютекса, освобождения мьютекса, захваченного другим потоком, или освобождения уже свободного мьютекса ни к чему не приводит и не возвращает ошибку выполнения.

 

Освобождение параметров

int pthread_mutexattr_destroy(pthread_mutexattr_t* attr);

Вызов разрушает ранее применявшийся объект - атрибутную запись мьютекса, после чего она уже не может более использоваться для инициализации мьютекса без предварительного выполнения вызова pthread_mutexattr_init().

На этом обсуждение атрибутов заканчивается, и мы переходим непосредственно к функциям работы с мьютексом.

 

Операции над мьютексом

 

Инициализация мьютекса

int pthread_mutex_init(pthread_mutex_t* mutex,

 const pthread_mutexattr_t* attr);

Структура данных pthread_mutex_t определена в файле (производный тип от типа sync_t, который в свою очередь определен в файле ) и имеет следующий вид:

struct _sync_t {

 /* Счетчик для рекурсивного мьютекса или семафора */

 int count;

 /* TID потока - имеет смысл и применяется только для мьютексов */

 unsigned owner;

};

Функция pthread_mutex_init() инициализирует переданный объект мьютекс в соответствии со значением переданных атрибутов. Если вместо attr передать NULL, то мьютекс будет создан в соответствии со значениями атрибутов по умолчанию. В native QNX API эта функция реализуется вызовом SyncTypeCreate(). SyncTypeCreate() — единая функция для создания всех базовых объектов синхронизации QNX Neutrino.

Вместо прямого вызова функции pthread_mutex_init() для начальной инициализации статических мьютексов (глобальных на уровне файла кода или пространства имен namespace либо явно описанных с квалификатором static) можно воспользоваться двумя макросами PTHREAD_MUTEX_INITIALIZER и PTHREAD_RMUTEX_INITIALIZER:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

pthread_mutex_t mutex = PTHREAD_RMUTEX_INITIALIZER;

Первый из них создает мьютекс в соответствии со значениями атрибутов по умолчанию, а второй - мьютекс с разрешенным рекурсивным захватом.

 

Операции с граничным приоритетом

Большинство параметров мьютекса не могут быть изменены после его создания. Но не все. В процессе работы с мьютексом может быть изменено значение приоритета, которое система использует для реализации протокола граничного приоритета с целью предотвращения инверсии приоритетов:

int pthread_mutex_setprioceiling(pthread_mutex_t* mutex,

 int prioceiling, int* old_ceiling);

int pthread_mutex_getprioceiling(const pthread_mutex_t* mutex,

 int* prioceiling);

Функция pthread_mutex_setprioceiling() захватывает мьютекс (или блокируется, пока мьютекс не будет освобожден, и уже тогда захватывает его) и изменяет установленную для него величину граничного приоритета, после чего освобождает мьютекс для использования другими потоками. После изменения значения граничного приоритета предыдущее значение возвращается в old_ceiling.

Функция возвращает следующие значения:

EOK — успешное завершение;

EINVAL — указанный в вызове мьютекс не существует или указанный приоритет выходит за диапазон допустимых значений;

EPERM — поток, вызвавший функцию, не имеет прав на изменение граничного приоритета указанного мьютекса.

 

Захват мьютекса

Захват мьютекса может производиться тремя разными функциями, в основе которых лежит функция из native QNX API SyncMutexLock().

Простой захват

int pthread_mutex_lock(pthread_mutex_t* mutex);

Функция захватывает мьютекс, на который ссылается mutex. Если мьютекс уже захвачен другим потоком, то вызвавший поток блокируется до освобождения мьютекса и после этого захватывает его. Только после этого функция pthread_mutex_lock() возвращает управление. Если захватить мьютекс пытается поток, который им уже владеет, то поведение функции pthread_mutex_lock() будет зависеть от значений атрибутов мьютекса, указанных при его создании. QNX предоставляет возможность рекурсивного захвата мьютекса при соответствующих настройках атрибутов (см. выше раздел «Параметры мьютекса»). При создании мьютекса с параметрами по умолчанию попытка повторного захвата мьютекса ни к чему не приводит. Если включен режим контроля ошибок и отключен рекурсивный захват мьютекса, функция pthread_mutex_lock() возвращает EDEADLK при попытке повторного захвата мьютекса тем же потоком.

Функция pthread_mutex_lock() может возвращать следующие значения: EOK — успешное завершение;

EAGAIN — недостаточно системных ресурсов для захвата мьютекса;

EDEADLK — вызывающий поток уже владеет мьютексом и мьютекс не поддерживает рекурсивный захват (режим контроля ошибок);

EINVAL — некорректное значение параметра mutex.

Попытка захвата

int pthread_mutex_trylock(pthread_mutex_t* mutex);

Функция проверяет, свободен ли мьютекс mutex, и если да, то она захватывает его. В противном случае функция возвращает значение EBUSY.

Возвращаемые значения:

EOK — успешное завершение;

EAGAIN — недостаточно системных ресурсов для захвата мьютекса;

EBUSY — мьютекс mutex уже захвачен;

EINVAL — некорректное значение параметра mutex.

Захват с установкой времени ожидания

#include

#include

int pthread_mutex_timedlock(pthread_mutex_t* mutex,

 const struct timespec* abs_timeout);

Функция проверяет, свободен ли мьютекс (mutex), и если да, то поток, в котором вызвана функция, захватывает этот мьютекс. Если мьютекс уже захвачен, вызвавший поток блокируется до освобождения мьютекса либо до наступления времени, указанного в аргументе abs_timeout. Если это время уже наступило, поток не блокируется вообще, но захват все-таки произойдет, если мьютекс свободен.

Наступление времени определяется по часам REALTIME_CLOCK, когда значение часов оказывается равным или большим значения, указанного в abs_timeout. Тип данных timespec определен в файле .

Если мьютекс создан с атрибутом протокола PRIO_INHERIT, то после выхода потока из блокировки на мьютексе по тайм-ауту приоритет владельца мьютекса подвергается пересмотру в соответствии с приоритетами потоков, оставшихся в очереди на захват мьютекса.

Возвращаемые значения:

EOK — успешное завершение;

EAGAIN — недостаточно системных ресурсов для захвата мьютекса;

EDEADLK — вызывающий поток уже владеет мьютексом, который не поддерживает рекурсивный захват (режим контроля ошибок);

EINVAL — мьютекс использует протокол граничного приоритета для предотвращения инверсии (атрибут protocol установлен в значение PTHREAD_PRIO_PROTECT), но приоритет вызвавшего потока выше граничного приоритета, присвоенного мьютексу; поток должен быть блокирован (мьютекс не свободен), а значение поля abs_timeout, показывающее количество наносекунд, меньше нуля или больше 1000 миллионов; переменная, на которую указывает mutex, не является инициированным объектом — мьютексом.

ETIMEDOUT — мьютекс не может быть захвачен, поскольку указанный тайм-аут истек.

 

Освобождение мьютекса

int pthread_mutex_unlock(pthread_mutex_t* mutex);

Функция pthread_mutex_unlock() освобождает мьютекс, на который ссылается переменная mutex. Вызвавший поток должен быть владельцем мьютекса. Если есть потоки, блокированные в ожидании освобождения мьютекса, то поток с наивысшим приоритетом (или при равных приоритетах дольше всех ждавший) выходит из блокированного состояния и становится владельцем мьютекса.

Для мьютексов, разрешающих рекурсивный захват, функция освобождения должна вызываться столько же раз, сколько и функция захвата.

Возвращаемые значения:

EOK — успешное завершение;

EINVAL — переменная, на которую указывает mutex, не является инициализированным объектом — мьютексом;

EPERM — вызвавший поток не является владельцем мьютекса.

 

Разрушение объекта мьютекс

int pthread_mutex_destroy(pthread_mutex_t* mutex);

Вызов разрушает объект мьютекс, на который указывает переменная mutex. После чего эта переменная не может быть использована без предварительного вызова pthread_mutex_init().

Возвращаемые значения:

EOK — успешное завершение;

EBUSY - мьютекс захвачен и не может быть разрушен до освобождения;

EINVAL — переменная, на которую указывает mutex, не является инициированным объектом - мьютексом.

 

Операции, не поддерживаемые POSIX

В native QNX API есть ряд функций работы с мьютексом, которые не определены POSIX-стандартом, однако они могут оказаться весьма полезными. Поскольку тип POSIX-мьютекса порождается от sync_t, то вполне возможно использование комбинации функций, определенных POSIX, и «родных» native-функций QNX. Однако необходимо помнить, что в таком случае ни о какой межсистемной совместимости говорить уже не приходится.

Восстановление «мертвого» мьютекса

#include

int SyncMutexRevive(sync_t* sync);

int SyncMutexRevive_r(sync_t* sync);

Эти функции предназначены для восстановления мьютекса, который находится в состоянии блокирования DEAD. Мьютекс попадает в состояние DEAD, когда память, использованная при захвате мьютекса, освобождается. Такое может произойти, например, когда умирает поток, захвативший мьютекс, расположенный в разделяемой памяти. В результате вызова вызвавший поток становится владельцем мьютекса, и его счетчик захватов устанавливается в 1 для рекурсивного мьютекса.

Ошибки выполнения функции:

ЕFAULT — ошибка при обращении к указанным в аргументах переменным;

EINVAL — указанный объект синхронизации не существует или не находится в состоянии DEAD;

ETIMEDOUT — отмена вызова по тайм-ауту ядра (устанавливается вызовом TimerTimeout()).

Установка уведомления о «смерти» мьютекса

Определить состояние мьютекса как DEAD можно с помощью функции SyncMutexEvent(), которая определяет событие, связанное со «смертью» мьютекса.

#include

int SyncMutexEvent(sync_t* sync, struct sigevent* event);

int SyncMutexEvent_r(sync_t* sync, struct sigevent* event);

Данная функция предназначена для установки обработчика ситуации, когда мьютекс попадает в состояние DEAD (то есть перераспределяется память, из которой произошел захват мьютекса). Захватить мьютекс, оказавшийся в состоянии DEAD, можно далее с помощью вызова функции SyncMutexRevive().

Ошибки выполнения функции:

EAGAIN — в данный момент ядро не имеет ресурсов для обработки запроса;

EFAULT — ошибка произошла при попытке обращения к sync;

EINVAL — объект синхронизации, на который указывает sync, не существует.

 

Пример применения мьютекса

Модернизируем наш пример из раздела, посвященного использованию семафора для случая множества потоков источников и приемников данных. Проблема заключается в том, что когда несколько потоков одновременно попытаются вызвать функцию push() или pop(), может произойти сильная путаница, поэтому код этих функций должен исполняться эксклюзивно, только одним потоком. Решить эту проблему можно двумя способами: воспользоваться бинарным семафором или мьютексом. Мы решили применить именно мьютекс и ниже расскажем причину, по которой мы здесь смешали в одной конструкции эти два элемента синхронизации.

/* Шаблонный класс очереди данных */

template class CDataQueue {

public:

 CDataQueue() { pthread_mutex_init(&_mutex, NULL); }

 ~CDataQueue() { pthread_mutex_destroy(&_mutex); }

 void push(T _new_data) {

  pthread_mutex_lock(&_mutex);

  data_queue.push(_new_data);

  data_event.reset();

  pthread_mutex_unlock(&_mutex);

 }

 T pop() {

  data_event.wait();

  pthread_mutex_lock(&_mutex);

  T res = data_queue.front();

  data_queue.pop();

  pthread_mutex_unlock(&_mutex);

  return res;

 }

private:

 std::queue data_queue;

 event data_event;

 pthread_mutex_t _mutex;

};

На первый взгляд задача очевидна: надо не допустить одновременного исполнения двух участков кода. Почему же не воспользоваться семафором, как мы описывали, когда рассказывали о способах его применения? Дело в том, что мы хотели получить универсальное средство передачи данных между потоками, не зависящее от допущений о приоритетах потоков и степени их зависимости. Когда мы строим систему реального времени, вопрос взаимного неявного влияния разных потоков на выполнение друг друга становится очень важным. Мы уже неоднократно упоминали эффект инверсии приоритетов и те способы, которыми можно ее избежать, используя мьютекс для защиты эксклюзивно используемого кода.

 

Сравнение и эффективность

В этом месте временно прервем наше последовательное повествование: мы закончили рассмотрение двух наиболее известных, важных и применяемых примитивов синхронизации — семафора и мьютекса. Теперь сделаем короткую остановку и проведем их взаимное сравнение, а также попробуем на примерах оценить затраты процессорной производительности, требуемые этими механизмами.

Дело в том, что на первый взгляд эти два механизма в высшей степени подобны, особенно если речь заходит о бинарном семафоре, принимающем значения счетчика 0 либо 1. Настолько подобны, что и в обсуждениях, и даже в неспециальной литературе можно встретить утверждения, что это «одно и то же». Сейчас мы увидим, что эти два сходных механизма различаются всем: и затратами процессорного времени на их обслуживание, и целями и задачами, которые они призваны решать, и временем жизни… Начнем с простой оценки затрат процессорного времени на обслуживание каждого из механизмов, после чего остальные различия станут нам намного понятнее.

Для проведения таких оценок используем уже применявшуюся нами схему «симметричных» тестов. Почему именно их? Да, здесь нам не требуются в явном виде обменные операции потоков, но воспользуемся «симметричными» тестами просто в силу минимальных переделок того, что уже было написано ранее. Итак, первый вариант теста для мьютекса (файл sy20m.cc):

Скоростные показатели мьютекса

unsigned long N = 1000;

static pthread_barrier_t bstart;

static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

static bool debug = false;

static char* str;

static volatile int ind = 0;

void* threadfunc(void* data) {

 pthread_barrier_wait(&bstart);

 unsigned long i = 0;

 char tid[8];

 sprintf(tid, "%d", pthread_self());

 uint64_t t = 0, t1;

 while (i++ != N) {

  t1 = ClockCycles();

  pthread_mutex_lock(&mutex);

  if (debug) str[ind++] = *tid;

  pthread_mutex_unlock(&mutex);

  t += ClockCycles() - t1;

  sched_yield();

 }

 cout << pthread_self() << "\t: cycles - "

  << t << ", on mutex - " << t / N << endl;

 return NULL;

}

int main(int argc, char *argv[]) {

 int opt, val;

 while ((opt = getopt(argc, argv, "n,v")) != -1) {

  switch (opt) {

   case 'n':

   if (sscanf(optarg, "%i", &val) != 1)

    cout << "parse command line error" << endl, exit(EXIT_FAILURE);

   if (val > 0) N = val;

   break;

  case 'v':

   debug = true;

   break;

  default:

   exit(EXIT_FAILURE);

  }

 }

 if (debug) str = new char[2 * N + 1];

 const int T = 2;

 pthread_t tid[T];

 if (pthread_barrier_init(&bstart, NULL, T) != EOK)

  perror("barrier init"), exit(EXIT_FAILURE);

 for (int i = 0; i < T; i++)

  if (pthread_create(tid + i, NULL, threadfunc, NULL) != EOK)

   perror("thread create"), exit(EXIT_FAILURE);

 for (int i = 0; i < T; i++)

  pthread_join(tid[i], NULL);

 if (debug) {

  str[ind] = '\0';

  cout << str << endl;

  delete [] str;

 }

 exit(EXIT_SUCCESS);

}

Результаты выполнения этого теста:

# sy20m -n100000

3       : cycles - 14644442, on mutex - 146

2       : cycles - 14614219; on mutex - 146

# sy20m -n1000000

3       : cycles - 146505047; on mutex - 146

2       : cycles - 146388673; on mutex - 146

Модифицируем программу, используя вместо мьютекса неименованный бинарный семафор. Для того чтобы не загромождать текст практически тем же кодом, перечислим только необходимые при этом изменения (файл sy20s.cc):

1. Вместо мьютекса объявляем неименованный семафор, а статическая инициализация мьютекса заменяется на оператор (в теле главной программы) динамической инициализации семафора с присвоением ему начального значения 1:

static sem_t sem;

...

if (sem_init(&sem, 0, 1) != 0)

 perror("semaphore init"), exit(EXIT_FAILURE);

2. Функция потока принимает вид:

void* threadfunc(void* data) {

 ...

 while (i++ != N) {

  t1 = ClockCycles();

  sem_wait(&sem);

  if (debug) str[ind++] = *tid;

  sem_post(&sem);

  t += ClockCycles() - t1;

  sched_yield();

 }

 ...

}

В результате исполнения на этот раз мы получим:

# sy20s -n100000

3       : cycles - 87048886; on semaphore - 870

2       : cycles - 87077787; on semaphore - 870

# sy20s -n1000000

3       : cycles - 869638168; on semaphore — 869

2       : cycles - 868725494, on semaphore - 868

Делаем последнюю модификацию в этой группе тестов, теперь используем специфику именованного семафора (файл sy20n.cc):

1. Вместо оператора динамической инициализации неименованного семафора мы теперь должны создать именованный семафор:

static sem_t* sem;

...

const char semname[] = "/duble";

if ((sem = sem_open(semname, O_CREAT, S_IRWX0, 1)) == SEM_FAILED)

 perror("semaphore init"), exit(EXIT_FAILURE);

Примечание

Последний оператор заслуживает отдельного комментария. Техническая документация утверждает, что функция sem_open() , нормально возвращающая указатель созданного дескриптора семафора типа sem_t , в случае ошибки возвращает -1 (так было записано и в самых ранних редакциях POSIX). Но использование конструкции вида:

if (sem_open( ... ) == -1)

просто вызовет синтаксическую ошибку (несоответствие типов) и не пройдет компиляцию! Естественнее было бы для такой функции возвращать NULL в случае ошибки, но... так определено в POSIX. Кроме того, во многих реализациях UNIX определяется константа:

#define SEM_FAILED ((sem_t*)(-1))

В документации QNX она нигде не упоминается, но, как мы видим, она определена, и все прекрасно работает!

2. Функция потока принимает вид (теперь sem, в отличие от предшествующего случая, ведь теперь это уже указатель на переменную типа sem_t):

void* threadfunc(void* data) {

 ...

 while (i++ != N) {

  t1 = ClockCycles();

  sem_wait(sem);

  if (debug) str[ind++] = *tid;

  sem_post(sem);

  t += ClockCycles() - t1;

  sched_yield();

 }

 ...

}

3. Теперь особое внимание необходимо уделить не только созданию, но и ликвидации именованного семафора (такой семафор имеет время жизни ядра системы и будет продолжать свое существование и после завершения нашего приложения):

sem_close(sem);

sem_unlink(semname);

Запустим полученное приложение при таком значении -n, которое обеспечит достаточное время его работы. Прежде чем обсуждать полученные результаты, посмотрим отображение семафора на пространство файловых имен системы во время работы приложения:

# ls -l /dev/sem

total 1

n------r-х 1 root root 1 Feb 10 18.56 duble

А теперь и результаты работы программы:

# nice -n-19 sy20n -n100000

3      : cycles - 1453746002, on semaphore - 14537

2      : cycles - 1454203573, on semaphore - 14542

Наконец, мы можем обратиться к количественному анализу полученных цифр:

• Примитивы — мьютекс, неименованный и именованный семафоры, — кажущиеся на первый взгляд сходными, требуют для своего обслуживания в эквивалентных условиях принципиально различных затрат, величины которых радикально отличаются: 140 – 870 – 14500 процессорных циклов соответственно, что соотносится как 1:6,2:104.

• Так же радикально отличаются и их характеристики доступа: изнутри процесса (или даже только из того потока, который уже владеет мьютексом), из внешнего процесса, из процесса, работающего на совершенно другом сетевом узле… То, что мы уже рассматривали как «характеристики времени жизни» объектов, принадлежит различным категориям: процесса (process-persistent), ядра (kernel-persistent) или файловой системы (filesystem-persistent).

• У захваченного мьютекса всегда есть поток-владелец, и только он может освободить его в дальнейшем. Именно поэтому мьютекс может использоваться для синхронизации потоков, но только синхронизации в смысле разграничения временной последовательности доступа к фрагменту кода — к тому, что часто называют критической секцией кода. Функциональность семафора значительно выше: при возможности (почти всегда) его применения в том контексте, в котором используется и мьютекс (только нужно ли это делать?), он может применяться и для синхронизации потоков в смысле координации последовательности их взаимодействия в качестве элемента, управляющего порядком выполнения. Покажем это на примере. Для этого незначительно трансформируем код предыдущего теста для семафора (файл sy21.cc):

Синхронизация потоков семафорами

#include

#include

#include

#include

#include

#include

#include

#include

#include

unsigned long N = 1000;

unsigned int T = 2;

static sem_t* sem;

static bool debug = false;

static char* str; // строка диагностики

static volatile int ind = 0;

uint64_t *t;

void* threadfunc(void* data) {

 unsigned long i = 0;

 char tid[8];

 sprintf(tid, "%X", pthread_self());

 // временная метка начала во всех потоках устанавливается

 // на время достижения этой точки в последнем (активном) потоке

 if ((int)data == T - 1) {

  uint64_t с = ClockCycles();

  for (int i = 0; i < T; i++ ) t[i] = c;

 }

 // рабочий цикл переключений за счет синхронизации

 while (i++ < N) {

  sem_wait(sem + (int)data);

  if (debug) str[ind++] = *tid;

  sem_post(sem + ((int)data +1) % T);

 }

 t[(int)data] = ClockCycles() - t[(int)data];

 return NULL;

}

int main(int argc, char *argv[]) {

 int opt, val;

 while ((opt = getopt(argc, argv, "n:t:v")) != -1) {

  switch(opt) {

  case 'n':

   if (sscanf(optarg, "%i", &val) != 1)

    cout << "parse command line error" << endl, exit(EXIT_FAILURE);

   if (val > 0) N — val;

   break;

  case 't':

   if (sscanf(optarg, "%i", &val) != 1)

    cout << "parse command line error" << endl, exit(EXIT_FAILURE);

   if (val > 0) T = val;

   break;

  case 'v':

   debug = true;

   break;

  default:

   exit(EXIT_FAILURE);

  }

 }

 if (debug) str = new char[T * N + 1];

 pthread_t* tid = new pthread_t[T];

 sem = new sem_t[T];

 t = new uint64_t[T];

 for (int i = 0; i < T; i++) {

  // все потоки, кроме последнего, будут заблокированы

  // на своих семафорах сразу же после старта

  if (sem_init(sem + i, 0, (i == (T - 1)) ? 1 : 0))

   perror("semaphore init"), exit(EXIT_FAILURE);

  if (pthread_create(tid + i, NULL, threadfunc, (void*)i ! = EOK)

   perror( "thread create error"), exit(EXIT_FAILURE);

 }

 for (int i=0; i < T; i++)

  pthread_join(tid[i], NULL);

 for (int i = 0; i < T; i++) sem_destroy(sem + i);

 delete [] sem;

 for (int i = 0; i < T; i++)

  cout << tid[i] << "\t: cycles - " << t[i] << ";\ton semaphore - " <<

   t[i] / T / N << endl;

 delete [] tid;

 delete [] t;

 if (debug) {

  str[ind] = "\0"; cout << str << endl;

  delete [] str;

 }

 exit(EXIT_SUCCESS);

}

Логически приложение изменилось следующим образом:

• Теперь у нас может быть не 2 идентичных (симметричных) потока, а произвольное их количество (ключ -t при запуске приложения).

• Потоки синхронизируются не на одном семафоре — введен массив семафоров по числу потоков: каждый поток блокируется на «своем» семафоре, но разблокирует его (после очередного выполнения своего фрагмента) семафор заблокированного «соседа».

• Теперь нам нет нужды использовать барьер для одновременного старта всех созданных потоков: семафоры всех создаваемых потоков инициализируются нулевым значением; стартующий поток тут же блокируется на своем семафоре, и только последний из запущенных выполняется, не блокируясь на семафоре.

• Из кода исключены какие бы то ни было средства принудительной передачи управления (sched_yield()) — все управление логикой ветвления осуществляется только состояниями семафоров.

Посмотрим, что у нас получилось. Запускаем приложение с диагностическим выводом идентификаторов потоков (ключ -v; он у нас был в тестах и ранее, только мы о нем не упоминали):

# nice -n-19 sy21 -n20 -t12 -v

2      : cycles - 664874; on semaphore - 2770

3      : cycles - 649150; on semaphore - 2704

4      : cycles - 638906, on semaphore - 2662

5      : cycles - 622987; on semaphore - 2595

6      : cycles - 611781; on semaphore - 2549

7      : cycles - 594515; on semaphore - 2477

8      : cycles - 571003; on semaphore - 2379

9      : cycles - 552834; on semaphore - 2303

10     : cycles - 536817; on semaphore - 2236

11     : cycles - 519357; on semaphore - 2163

12     : cycles - 500388; on semaphore - 2084

13     : cycles - 296633; on semaphore - 1235

D23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABCD23456789ABC

В строке диагностики (внизу) хорошо видно регулярное чередование выполняющихся потоков, причем оно начинается с индекса последнего созданного потока (13 в показанном примере). Теперь проделаем то же самое, но на большой выборке и с отключенной диагностикой, чтобы получить «чистое» время контекстных переключений потоков, не искаженное затратами на операции формирования диагностики (результаты для нескольких различных размерностей задачи при разном количестве потоков):

# nice -n-19 sy21 -n100000 -t12

2      : cycles - 1509597589; on semaphore - 1257

3      : cycles - 1509581545; on semaphore - 1257

4      : cycles - 1509570283; on semaphore - 1257

5      : cycles - 1509552472; on semaphore - 1257

6      : cycles - 1509537934; on semaphore - 1257

7      : cycles - 1509519299; on semaphore - 1257

8      : cycles - 1509502312; on semaphore - 1257

9      : cycles - 1509482667; on semaphore - 1257

10     : cycles - 1509466343; on semaphore - 1257

11     : cycles - 1509449264; on semaphore - 1257

12     : cycles - 1509431112; on semaphore - 1257

13     : cycles - 1509222808, on semaphore - 1257

# nice -n-19 sy21 -n100000 -t7

2      : cycles - 859768389; on semaphore - 1228

3      : cycles - 859756956; on semaphore - 1228

4      : cycles - 859745649; on semaphore - 1228

5      : cycles - 859736698; on semaphore - 1228

6      : cycles - 859724685; on semaphore - 1228

7      : cycles - 859707720; on semaphore - 1228

8      : cycles - 859554045; on semaphore — 1227

# nice -n-19 sy21 -n50000 -t13

2      : cycles - 832789852; on semaphore - 1281

3      : cycles - 832813231; on semaphore - 1281

4      : cycles - 832835011; on semaphore - 1281

5      : cycles - 832851360; on semaphore - 1281

6      : cycles - 832868482; on semaphore - 1281

7      : cycles - 832884308; on semaphore - 1281

8      : cycles - 832900935; on semaphore - 1281

9      : cycles - 832916093; on semaphore - 1281

10     : cycles - 832931944; on semaphore - 1281

11     : cycles - 832946479; on semaphore - 1281

12     : cycles - 832962202; on semaphore - 1281

13     : cycles - 832976433; on semaphore - 1281

14     : cycles - 832782465; on semaphore - 1281

# nice -n-19 sy21 -n50000 -t17

2      : cycles - 1142879872; on semaphore - 1344

3      : cycles - 1142906138; on semaphore - 1344

4      : cycles - 1142927650; on semaphore - 1344

5      : cycles - 1142943675; on semaphore - 1344

6      : cycles - 1142959582; on semaphore - 1344

7      : cycles - 1142974919; on semaphore - 1344

8      : cycles - 1142991068; on semaphore - 1344

9      : cycles - 1143005896; on semaphore - 1344

10     : cycles - 1143021518, on semaphore - 1344

11     : cycles - 1143036136; on semaphore - 1344

12     : cycles - 1143053448; on semaphore - 1344

13     : cycles - 1143068415; on semaphore - 1344

14     : cycles - 1143083676; on semaphore - 1344

15     : cycles - 1143098361; on semaphore - 1344

16     : cycles - 1143114009; on semaphore - 1344

17     : cycles - 1143128525; on semaphore - 1344

18     : cycles - 1142872665; on semaphore - 1344

Есть некоторая корреляция времени переключения контекста с размером выборки и количеством обрабатывающих потоков, но она в широком диапазоне этих параметров не превышает 8%. В данном приложении эта численная величина включает в себя: блокирование на семафоре, переключение на контекст другого потока и разблокирование семафора. Если вспомнить, что раньше мы получали оценки для принудительного (посредством sched_yield()) переключения контекста потоков в 375 процессорных циклов, а для захвата-освобождения семафора — порядка 870, то эти цифры хорошо согласуются с полученными сейчас результатами.

Рассматриваемые примитивы служат принципиально различным целям. Мьютекс, как уже было сказано ранее, предназначен в первую очередь для регламентации доступа к участкам программного кода. Семафоры же больше предназначены для регламентации порядка доступа к определенным объектам данных. Классическими задачами этого класса являются задачи «производитель-потребитель», когда M производителей создают некоторые объекты данных (читая эти данные с реальных внешних устройств, или создавая их как результат только внутренних вычислений, или любым другим способом), а N потребителей независимо берут произведенные объекты данных на последующую обработку.

Это настолько общий и часто встречающийся класс задач, что покажем для него простейший «скелет» в виде отдельного приложения, в котором отслеживание порядка доступа потребителей будет осуществлять счетный семафор (файл sy22.cc). Для простоты понимания приложение сделано как трансформация кода предшествующей группы тестов. В качестве имитации производства объекта данных, как и в качестве его обработки потребителем, используется пассивная пауза (delay()) на случайную величину (производство и обработка объектов данных в коде не показаны, так как это не относится к существу рассматриваемого — нас интересуют процессы синхронизации этих операций, а не сами операции).

Кроме основной нашей цели это приложение дополнительно демонстрирует:

• Практическое использование принудительного завершения (отмены) потоков «извне» с управлением состоянием завершаемости потоков и расстановкой точек отмены, о чем мы уже говорили ранее.

• Использование атомарных (непрерываемых) операций (например, atomic_add_value()), о которых мы будем говорить чуть позже.

• Использование реентерабельных форм функций стандартной библиотеки, безопасных в многопоточной среде (rand_r() вместо rand()).

Один производитель — T потребителей

#include

#include

#include

#include

#include

#include

#include

#include

const int D = 10;

unsigned int T = 2;

static sem_t sem;

pthread_t* tid;

void* writer(void* data) {

 unsigned long i = (int)(data); // общий размер выборки

 unsigned int s = 1;

 while (i-- > 0) {

  delay((long)rand_r(&s) * D / RAND_MAX + 1);

  sem_post(&sem); // объект данных произведен

 }

 for (i = 0; i < T; i++) pthread_cancel(tid[i + 1]);

 return NULL;

}

static char *str; // строка результирующей диагностики

static volatile unsigned ind = 0;

void* reader(void*) {

 char tid[8];

 sprintf(tid, "%X", pthread_self());

 unsigned int s = rand();

 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);

 while(true) {

  sem_wait(&sem); // получен объект данных

  str[atomic_add_value(&ind, 1)] = *tid;

  pthread_testcancel();

  delay((long)rand_r(&s) * D * T / RAND_MAX + 1);

 }

 return NULL;

}

int main(int argc, char *argv[]) {

 unsigned long N = 1000;

 int opt, val;

 while ((opt = getopt(argc, argv, "n:t:")) != -1) {

  switch(opt) {

  case 'n':

   if (sscanf(optarg, "%i", &val) != 1)

    cout << "parse command line error" << endl, exit(EXIT_FAILURE);

   if (val > 0) N = val;

   break;

  case 't':

   if (sscanf(optarg, "%i", &val) != 1)

    cout << "parse command line error" << endl, exit(EXIT_FAILURE);

   if (val > 0) T = val;

   break;

  default:

   exit(EXIT_FAILURE);

  }

 }

 str = new char[N + 1];

 tid = new pthread_t[T + 1];

 if (sem_init(&sem, 0, 0))

  perror("semaphore init"), exit(EXIT_FAILURE);

 if (pthread_create(tid, NULL, writer, (void*)N) >= EOK)

  perror("writer create error"), exit(EXIT_FAILURE);

 for (int i = 0; i < T; i++)

  if (pthread_create(tid + i + 1, NULL, reader, NULL) != EOK)

   perror("reader create error"), exit(EXIT_FAILURE);

 for (int i = 0; i < T; i++)

  pthread_join(tid[i], NULL);

 sem_destroy(&sem);

 delete [] tid;

 str[ind] = '\0';

 cout << str << endl;

 delete [] str;

 exit(EXIT_SUCCESS);

}

Вот как выглядит результат выполнения этой программы (во избежание внесения дополнительного синхронизма в качестве общего числа циклов «производства» и числа потоков потребителей выбраны взаимно простые числа):

# sy22 -n200 -t13

3456789ABCDEF7936A8547E39DCB45F67A59B84D37EC64F395B6AEF78B9DF34CB53B86A5FEDF975B3A8EC46FB8AD954736FA78C3ED46F7B594EC7B83AC6F9D4BCE569A73F86BCAD74C536EB79F5C8DA5B463EFBC7D937AEC85FDE4566CAF69DE7F385CA6

Хорошо видно, как строго последовательный поначалу порядок доступа потребителей к объектам данных десинхронизируется и становится хаотическим: каждый освободившийся потребитель приступает к работе над следующим объектом данных, как только тот становится доступен.

 

Атомарные операции

Атомарные операции не относятся к элементам синхронизации параллельных ветвей программы. Но им следует уделить внимание по двум причинам. Во-первых, атомарные операции — это простое и эффективное средство, позволяющее во многих случаях избежать использования механизмов синхронизации. А во-вторых, атомарные операции зачастую выпадают из рассмотрения из-за их двойственного положения: при обсуждении параллелизма и синхронизации они не рассматриваются, потому что не являются элементами синхронизации, а при обсуждении последовательных программ не рассматриваются потому, что здесь в них просто нет необходимости.

Атомарные операции — это операции, для которых гарантируется их непрерываемость даже при выполнении на симметричных мультипроцессорных платформах. Выполнение атомарных операций не прерывается даже асинхронными аппаратными прерываниями. Таким образом, эта группа операций является также и безопасной в многопоточном окружении.

Действительно, наиболее часто примитивы синхронизации применяются для создания критической секции кода с целью предотвращения возможности одновременного воздействия на объекты данных со стороны нескольких параллельно развивающихся ветвей программы.

При одновременной работе с данными из различных потоков состояние данных после такого воздействия должно считаться «неопределенным», при этом последствия могут быть более тяжкими, чем просто некорректное состояние данных - структура сложных объектов может быть просто разрушена.

В многопоточной среде элементарные и привычные операции могут таить в себе опасности. Действительно, простейший оператор вида:

i = i + 1;

содержит в себе опасность, если этот оператор записан в функции потока, выполняемой несколькими экземплярами потоков (совершенно типичный случай). Не менее опасен, но менее очевиден по внешнему виду и оператор:

i += 1;

Даже операторы инкремента и декремента (++i и --i), которые в системе команд практически всех типов процессоров выполняются как атомарные и которые являются основой для реализации семафорных операций, в симметричной мультипроцессорной архитектуре перестают быть безопасными. Хуже того, привычные программисту операции стандартной библиотеки и просто синтаксические конструкции языка становятся небезопасными в многопоточной среде. Вот еще два примера:

1. Оператор копирования нетипизированного блока памяти, безбоязненно используемый десятилетиями:

void* memcpy(void* dst, const void* src, size_t length);

2. Операторы присваивания, инициализации или сравнения структурированных объектов данных:

struct X {

 X(const X& y) { ... }

 friend bool operator==(const X& f, const X& s) { ... }

 // оператор присваивания мы не переопределяем, используется

 // присваивание по умолчанию - побайтовое копирование

};

...

X A;

...

X B(А); // потенциальная ошибка

...

B = A; // потенциальная ошибка

if (А == В) { ... } // потенциальная ошибка

Примечание

Обратите внимание, что все объекты данных, для которых могут наблюдаться обсуждаемые эффекты, должны быть доступны вне потока, то есть быть глобальными с точки зрения видимости в потоке.

Именно для безопасного манипулирования данными в параллельной среде QNX API и вводятся атомарные операции. Десять атомарных функций делятся на две симметричные группы по виду своего именования и логике функционирования. Все атомарные операции осуществляются только над одним типом данных unsigned int, но, как будет показано далее, это не такое уж и сильное ограничение. Сам объект, над которым осуществляется атомарная операция (типа unsigned int), — это самая обычная переменная целочисленного типа, только описанная с квалификатором volatile.

Помимо атомарных операций над этой переменной могут выполняться любые другие действия, которые можно считать безопасными в многопоточной среде: инициализация, присваивание значений, сравнения. Более того, при выходе программы за область возможного многопоточного доступа к этой переменной она может далее использоваться любым традиционным и привычным образом.

Важно также отметить, что термин «атомарность» относится не к особым свойствам некоторого объекта данных, а к ограниченному ряду операций, которые можно безопасно выполнять над этим объектом в многопоточной среде.

Общий вид прототипов каждой из двух групп атомарных операций следующий:

void atomic_*(volatile unsigned *D, unsigned S);

unsigned atomic_*_value(volatile unsigned *D, unsigned S);

где вместо * должно стоять имя одной из пяти операций (таким алгоритмом и обеспечивается 10 различных атомарных функций):

add — добавить численное значение к операнду;

sub — вычесть численное значение из операнда;

clr — очистить биты в значении операнда (выполняется побитовая операция (*D) &= ~S);

set — установить биты в значении операнда (выполняется побитовая операция (*D) |= S);

toggle — инвертировать биты в значении операнда (выполняется побитовая операция (*D) ^= S);

D — именно тот объект, над которым осуществляется атомарная операция;

S — второй операнд осуществляемой операции.

Две формы атомарных функций для каждой операции отличаются тем, что первая из них выполняет операцию без возврата значения, а вторая возвращает значение, которое операнд D имел до выполнения операции (т.e. прежнее значение, как это делают, например, префиксные операции инкремента ++D и декремента --D, в отличие от постфиксных D++ и D--).

Зачем нужны две формы для операции? Техническая документация QNX утверждает, что вторая форма может выполняться дольше. Справедливость этого утверждения и насколько дольше выполняется вторая форма, мы скоро увидим на примерах.

Итак, у нас есть 10 функций для выполнения пяти атомарных операций:

atomic_add()    atomic_add_value()

atomic_sub()    atomic_sub_value()

atomic_clr()    atomic_clr_value()

atomic_set()    atomic_set_value()

atomic_toggle() atomic_toggle_value()

Как используются атомарные операции? Обычно для предотвращения одновременного изменения некоторого счетчика индекса мы вынуждены создавать критическую секцию, обозначая ее, скажем, операциями над мьютексом. В частности, в следующем примере нам необходимо из различных потоков последовательно дописывать некоторые байтовые результаты в единый буфер:

// глобальные описания, доступные всем потокам

const unsigned int N = ...

uint8_t buf[N];

// индекс текущей позиции записи

unsigned int ind = 0;

// общий мьютекс, доступный каждому из потоков

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

...

// выполняется в каждом из потоков:

uint8_t res[M]; // результат некоторой операции

unsigned int how = ... // реальная длина этого результата

pthread_mutex_lock(&mutex);

memcpy((void*)buf + ind, (void*)res, how);

ind += how;

pthread_mutex_unlock(&mutex);

Используя атомарные операции, мы можем этот процесс записать так (все глобальные описания остаются неизменными):

// глобальные описания, доступные всем потокам

...

// индекс текущей позиции записи

volatile unsigned int ind = 0;

...

// выполняется в каждом из потоков:

uint8_t res[M]; // результат некоторой операции

unsigned int how = ... // реальная длина этого результата

memcpy((void*)buf + atomic_add_value(ind, how), (void*)res, how);

Или даже так:

// глобальные описания, доступные всем потокам

...

// указатель текущей позиции записи:

volatile unsigned int ind = (unsigned int)buf;

...

// выполняется в каждом из потоков:

memcpy((void*)atomic_add_value(ind, how), (void*)res, how);

В последнем случае это, конечно, трюкачество, построенное на том, что в 32-разрядной архитектуре представления указателя и переменной unsigned int совпадают, но это «работающее трюкачество» и работающее иногда весьма эффективно.

Техника применения атомарных операций оказывается крайне удобной, например, при осуществлении вывода, часто диагностического, из различных потоков. Положим, нам нужно в каждом из многих потоков выводить диагностическую строку при достижении ими определенной точки исполнения:

cout << "Это вывод потока " << pthread_self() << endl;

Но так делать нельзя: при таком решении у нас появляются 2 проблемы, одна из которых очевидна, а другая — не очень:

1. Выводы разных потоков могут «смешиваться»; более того, за счет буферизации вывода некоторые «рваные» фрагменты мы будем наблюдать дважды. Одним словом, наш вывод окажется полностью «нечитабельным».

2. При осуществлении вывода в контексте потока почти наверняка в процессе вывода будут выполняться системные вызовы, которые потребуют новой диспетчеризации и приведут к вытеснению исходного потока. При этом порядок передачи управления от потока к потоку при наличии отладочного вывода будет отличаться от порядка при его отсутствии. А это, наверное, не совсем то, что мы ожидали. В результате при наличии отладочного вывода мы можем наблюдать совсем не ту картину, для изучения которой этот вывод, собственно, и предназначен.

Эти эффекты не связаны с какой-то конкретной формой вывода, такой как вывод в поток, показанный выше; точно так же будет вести себя и традиционный вызов printf(...).

Проблема очень просто решается, если вместо непосредственного вывода из потоков последовательно сбрасывать все сообщения в промежуточный буфер, который будет выводиться в те периоды времени программы, когда запись в него не производится:

const int N = ... , M = ...;

char buf[N];

volatile unsigned ind = 0;

...

// а вот это производится из каждого потока

char tbuf[M];

sprintf(tbuf, "Это вывод потока %X", pthread_self());

strcpy(buf + atomic_add_value(ind, strlen(tbuf)), tbuf);

И наконец, последнее: есть ли смысл во введении этого дополнительного механизма, если всегда существует альтернативная форма организации такой же защиты доступа посредством критической секции (например, при использовании мьютекса)? Сравним (файл a1.cc) временные затраты при многократном изменении значения переменной для случаев атомарных операций и критической секции на базе мьютекса (мы берем именно мьютекс, потому что из всех примитивов синхронизации он самый низкоуровневый и быстрый):

Сравнение мьютекса и двух форм вызова атомарной операции

#include

#include

#include

#include

#include

#include

#include

#include

int main(int argc, char *argv[]) {

 uint64_t N = 100000;

 bool atom = false, value = false;

 int opt, val;

 while ((opt = getopt(argc, argv, "n:av")) != -1) {

  switch(opt) {

  case 'n': // количество повторений

   if (sscanf(optarg, "%i", &val) != 1)

    cout << "parse command line error" << endl, exit(EXIT_FAILURE);

   if (val > 0) N = val;

   break;

  // использовать атомарные операции

  case 'a':

   atom = true;

   break;

  // использовать форму, возвращающую значение

  case 'v':

   value = true;

   break;

  default:

   exit(EXIT_FAILURE);

  }

 }

 // замеряется количество процессорных циклов для каждого случая

 uint64_t i = N, t = ClockCycles();

 volatile unsigned ind = 0;

 if (!atom) {

  pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

  while (i--) {

   pthread_mutex_lock(&mutex);

   ind++;

   pthread_mutex_unlock(&mutex);

  }

 } else if (value)

  while (i--) atomic_add_value(&ind, 1);

 else while (i--) atomic_add(&ind, 1);

 t = ClockCycles() - t;

 cout << "all cycles - " << t << "; on operation - "

  << t / N << endl;

 exit(EXIT_SUCCESS);

}

Вот результат при использовании критической секции:

# nice -n-19 a1 -n10000000

all cycles - 1120872156; on operation - 112

Результат с применением атомарной операции, не возвращающей значения:

# nice -n-19 a1 -n10000000 -a

all cycles — 391018203; on operation - 39

Результат с применением атомарной операции, возвращающей значение (обещанная разница составляет порядка 10%):

# nice -n-19 a1 -n10000000 -a -v

all cycles - 441158981; on operation - 44

 

Условная переменная

 

Одним из важнейших принципов использования мьютексов является максимальное сокращение размеров критической секции, то есть участка, который потоки должны проходить последовательно. Однако зачастую возникает необходимость ожидания выполнения некоторого условия внутри критической секции.

Реализация подобного ожидания «в лоб» привела бы к тому, что все потоки, разделяющие данную критическую секцию, были бы вынуждены ждать выполнения условия для каждого из них. При «правильной» реализации ожидания поток должен освобождать мьютекс на время ожидания и вновь захватывать его, когда ожидаемое условие выполняется. Специально для этого случая стандартом POSIX предусмотрены условные переменные. QNX Neutrino реализует условные переменные как на уровне вызовов микроядра в своем native API, так и в соответствии со стандартом POSIX.

Отметим, что дополнение мьютекса условной переменной делает их комбинацию универсальным базисом, на котором могут быть построены любые сколь угодно сложные в своем поведении объекты синхронизации.

Фактически совместное использование мьютекса и условной переменной создает специфический комбинированный объект синхронизации, который может иметь принципиально более широкое применение, чем отдельно взятый мьютекс. Тем не менее поведение этого объекта синхронизации не столь просто и далеко не очевидно. Рассмотрим его более подробно.

На рис. 4.1 приведена блок-схема операций, выполняемых потоком при использовании мьютекса и условной переменной для синхронизации. Линиями отделены операции, выполняющиеся «внутри» функций, указанных справа. Обратите внимание, что наиболее сложная логика соответствует вызову функции ожидания на условной переменной.

Рис. 4.1. Схема действий потока при выполнении синхронизации с применением пары мьютекс-условная переменная (обратите внимание, что операции при участии мьютекса (1, 2, 3) выполняются дважды.)

Проблема в первую очередь заключается в том, что внутри критической секции, отмеченной вызовами функций pthread_mutex_lock() и pthread_mutex_unlock(), не может находиться более одного потока в единый момент времени. Следовательно, даже если поток, блокированный на условной переменной, и получит pthread_cond_signal() или pthread_cond_broadcast(), он не сможет немедленно продолжить свое выполнение, если внутри критической секции уже находится другой поток. В этом случае разблокированный (на условной переменной) поток изменяет свой статус с «блокированного на условной переменной» (в котором он находился до этого) на статус «блокированного на мьютексе» и пребывает в нем до тех пор, пока текущий владелец не освободит мьютекс.

Все функции операций над условной переменной и ее атрибутами реализованы в заголовочном файле . Если для вызова функции необходимы дополнительные заголовочные файлы, это будет указано особо.

 

Операции над условной переменной

 

Параметры условной переменной

Инициализация параметров

int pthread_condattr_init(pthread_condattr_t* attr);

Функция инициализирует структуру атрибутов условной переменной, на которую указывает параметр attr. Структура данных pthread_condattr_t определена в файле и является производной от типа syncattr_t, описанного в разделе «Параметры мьютекса». При инициализации атрибуты устанавливаются в значения по умолчанию.

Возвращаемые значения:

EOK — успешное завершение;

ENOMEM — недостаточно памяти для инициализации атрибутов условной переменной attr.

Для условной переменной возможна модификация значительно меньшего числа параметров, чем для мьютекса. Следующие функции описывают доступ к этим параметрам.

Параметр доступа

int pthread_condattr_setpshared(

 pthread_condattr_t* attr, int pshared);

int pthread_condattr_getpshared(

 const pthread_condattr_t* attr, int* pshared);

Функции устанавливают/считывают, возможен ли доступ к условной переменной из потоков, порожденных в других процессах. Параметр pshared может принимать следующие значения:

• PTHREAD_PROCESS_SHARED — любой поток, имеющий доступ к области памяти, в которой расположена условная переменная, может ее использовать.

• PTHREAD_PROCESS_PRIVATE (значение по умолчанию) — только поток, созданный в контексте того же процесса, в котором находится условная переменная, может ее использовать. Если поток из другого процесса попытается использовать условную переменную, созданную с параметром PTHREAD_PROCESS_PRIVATE, результат будет не определен.

Возвращаемые значения:

EOK — успешное завершение;

EINVAL — атрибуты условной переменной, на которые указывает attr, или новое значение, на которое ссылается pshared, не определены.

Параметры тайм-аута

int pthread_condattr_setclock(

 pthread_condattr_t* attr, clockid_t id);

int pthread_condattr_getclock(

 const pthread_condattr_t* attr, clockid_t* id);

Функции устанавливают/считывают, каким способом (т.e. на основании какого счетчика) вычисляется значение тайм-аута при вызовах pthread_cond_timedwait(). Этот параметр в QNX Neutrino 6.2 не является обязательным и введен в соответствии со стандартом POSIX 1003.1j (draft). На практике можно устанавливать только значение REALTIME_CLOCK в качестве параметра id; это же значение является значением по умолчанию.

Возвращаемые значения:

EOK — успешное завершение;

EINVAL — неверный аргумент attr.

 

Разрушение блока параметров

int pthread_condattr_destroy(pthread_condattr_t* attr);

Функция разрушает блок параметров условной переменной, на которые указывает attr, после чего он уже не может использоваться без повторной инициализации.

На практике разрушение параметров объекта синхронизации не имеет особого смысла. Вы всегда можете переопределить атрибуты, содержащиеся в переменной attr, для инициализации других условных переменных.

Возвращаемые значения:

EOK — успешное завершение;

EINVAL — неверный аргумент attr.

 

Инициализация условной переменной

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

int pthread_cond_init(pthread_cond_t* cond, pthread_condattr_t* attr);

Инициализирует условную переменную cond со значениями, установленными атрибутами attr. Вместо прямого вызова функции pthread_cond_init() для начальной инициализации статических условных переменных (глобальных на уровне файла кода или пространства имен namespace либо явно описанных с квалификатором static) можно воспользоваться макросом PTHREAD_COND_INITIALIZER.

Возвращаемые значения:

EOK — успешное завершение;

EAGAIN — нет свободных системных объектов синхронизации;

EBUSY — переменная cond уже инициализирована и не разрушалась;

EFAULT — ошибка доступа ядра к объектам cond или attr;

EINVAL — неправильное значение переменной cond.

 

Ожидание условия

Простое ожидание

int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex);

Вызов функции блокирует вызвавший поток на условной переменной cond и разблокирует мьютекс mutex. Поток блокируется до тех пор, пока другой поток не вызовет функцию разблокирования на условной переменной cond (pthread_cond_signal() или pthread_cond_broadcast()). Мьютекс mutex должен быть захвачен потоком до вызова функции. Поток, блокированный на условной переменной, может быть разблокирован также приходом сигнала или вызовом завершения потока. В любом случае при разблокировании потока и выходе из функции ожидания поток вновь захватывает мьютекс mutex.

Не следует использовать условную переменную с мьютексом, у которого разрешен рекурсивный захват.

Возвращаемые значения:

EOK — успешное завершение ожидания либо ожидание прервано сигналом;

EAGAIN — недостаток системных ресурсов для реализации ожидания на условной переменной;

EFAULT — произошла ошибка при попытке обращения к указателям cond или mutex;

EINVAL — возвращается в следующих ситуациях:

 • не инициализированы переменные, на которые указывают cond или mutex;

 • попытка использования переменной, на которую указывает cond, для нескольких мьютексов;

 • вызвавший поток не владеет указанным мьютексом.

Ожидание с тайм-аутом

#include

int pthread_cond_timedwait(pthread_cond_t* cond,

 pthread_mutex_t* mutex, const struct timespec* abstime);

Поведение функции идентично варианту обычного ожидания, за исключением того, что ожидание может завершиться также при наступлении времени, переданного параметром abstime.

Следует помнить, что после наступления времени тайм-аута управление совсем не обязательно вернется к вызвавшему потоку. После наступления этого времени функция переведет поток из состояния блокирования на условной переменной в состояние готовности и предпримет попытку захвата мьютекса. Если мьютекс в это время захвачен другим потоком, вызвавший поток перейдет в состояние блокирования на мьютексе.

Возвращаемые значения:

EOK — успешное завершение ожидания либо ожидание прервано сигналом;

EAGAIN — недостаток системных ресурсов для реализации ожидания на условной переменной;

EFAULT — произошла ошибка при попытке обращения к указателям cond или mutex;

EINVAL — возвращается в следующих ситуациях:

 • не инициализированы переменные, на которые указывают cond или mutex;

 • попытка использования переменной, на которую указывает cond, для нескольких мьютексов;

 • вызвавший поток не владеет указанным мьютексом.

ETIMEDOUT — завершение функции по наступлению времени, указанного в abstime.

 

Выполнение условия

Штатным способом разблокирования потока, блокированного на условной переменной, является вызов функции, сигнализирующей о выполнении условия. В native API это функция SyncCondvarSignal(), которая имеет две POSIX-обертки: pthread_cond_signal() и pthread_cond_broadcast(). Разница между ними заключается в том, что первая пробуждает только один, самый приоритетный поток из ждущих выполнения условия, а вторая пробуждает все потоки, ожидающие выполнения условия.

Однако необходимо помнить про специфику ожидания внутри критической секции: вызов функции pthread_cond_broadcast() только переведет ожидающие потоки из состояния блокирования на условной переменной в состояние блокировки на мьютексе, поскольку мьютекс сможет захватить только самый приоритетный поток.

Нештатным способом завершения ожидания на условной переменной может быть приход немаскированного сигнала UNIX. Если для данного сигнала определен обработчик, он выполнится без захвата мьютекса, а попытка захвата будет произведена уже после его завершения.

Выполнение условия для единичного потока

int pthread_cond_signal(pthread_cond_t* cond);

Функция переводит в состояние готовности самый приоритетный поток из блокированных на условной переменной cond, после чего поток предпринимает попытку захвата своего мьютекса. Если есть несколько потоков с равным (и высшим) приоритетом, блокированных на условной переменной, то разблокируется тот поток, который ожидал дольше остальных.

Возвращаемые значения:

EOK — успешное завершение;

EFAULT — произошла ошибка при попытке обращения к указателям cond или mutex;

EINVAL — не инициализирована переменная, на которую указывает cond.

Выполнение условия для всех ожидающих потоков

int pthread_cond_broadcast(pthread_cond_t* cond);

Вызов функции разблокирует все потоки, блокированные на условной переменной cond. Потоки разблокируются в порядке приоритетов. Для потоков равного приоритета разблокирование проводится в порядке FIFO.

Возвращаемые значения:

EOK — успешное завершение;

EFAULT — произошла ошибка при попытке обращения к указателям cond или mutex;

EINVAL — не инициализирована переменная, на которую указывает cond.

 

Разрушение условной переменной

int pthread_cond_destroy(pthread_cond_t* cond);

Вызов функции деинициализирует условную переменную cond. Для дальнейшего использования условной переменной, на которую ссылается cond, ее необходимо инициализировать вызовом pthread_cond_init(). Функция может использоваться для изменения параметров условной переменной.

Возвращаемые значения:

EOK — успешное завершение;

EBUSY — в данный момент другой поток блокирован на условной переменной cond;

EINVAL — не инициализирована переменная cond.

 

Ждущая блокировка

 

QNX предоставляет упрощенный вариант использования условной переменной для блокирования (остановки) потока при помощи интерфейса так называемой ждущей блокировки (sleepon). Для использования этого механизма не нужно явно создавать никаких объектов синхронизации, за вас это делает ОС. Внешне ждущие блокировки выглядят как набор функций ожидания и освобождения, при этом последовательность действий в принципе аналогична использованию мьютексов и условных переменных.

За этим интерфейсом на самом деле скрывается один мьютекс и несколько дополнительных условных переменных. Использование функций ожидания должно проходить внутри участка кода, отмеченного вызовами блокирования и разблокирования мьютекса, ассоциированного со ждущей блокировкой. Одним из основных недостатков ждущей блокировки является то, что для всех потоков и всех ключей ожидания используется один общий мьютекс. ОС не может никоим образом отслеживать взаимные блокировки потоков при использовании ждущих блокировок. В целом поведение этого средства синхронизации идентично бинарным семафорам, но оно требует дополнительных операций блокирования мьютекса.

Все функции для работы со ждущими блокировками объявлены в файле .

 

Операции со ждущей блокировкой

 

Захват и освобождение ждущей блокировки

Вызов функций ожидания может производиться только внутри блока захвата и освобождения ждущей блокировки:

int pthread_sleepon_lock(void);

int pthread_sleepon_unlock(void);

Функция захвата pthread_sleepon_lock() возвращает следующие значения:

EOK — успешное выполнение;

EDEADLK — попытка повторного захвата мьютекса;

EAGAIN — может возникнуть при первом вызове в процессе, если системе не хватает ресурсов для создания внутреннего мьютекса.

Функция освобождения pthread_sleepon_unlock() возвращает значения:

EOK — успешное выполнение;

EPERM — вызвавший поток не является владельцем внутреннего мьютекса.

 

Функции ожидания

Ожидание выполнения условия для ждущей блокировки может выполняться в двух вариантах: простое ожидание и ожидание с установкой тайм-аута.

int pthread_sleepon_wait(const volatile void* addr);

int pthread_sleepon_timedwait(const volatile void* addr, uint64_t nsec);

При вызове функций ожидания необходимо указать ключ addr (произвольный адрес в памяти). Если этот адрес указывается впервые, для данного вызова создается новая условная переменная. Поток освобождает захваченный внутренний мьютекс и переходит в состояние блокировки на условной переменной.

 

Ожидание завершения потока

Ожидание родительским потоком завершения одного или нескольких порожденных им «присоединенных» потоков (на вызове pthread_join()) — это простейший и эффективный вариант синхронизации потоков, не требующий для своей реализации каких-либо дополнительных синхронизирующих примитивов. Ранее мы уже детально рассматривали процесс порождения и ожидания завершения потоков, сейчас же лишь коротко вернемся к этому вопросу с иной точки зрения - с позиции синхронизации. В простейшем случае общая схема такой синхронизации всегда одинакова и описывается подобной структурой кода:

void* threadfunc(void* data) {

 ...

 return NULL;

}

...

// здесь создается нужное количество (N) потоков:

pthread_t tid[N];

for (int i = 0; i < N; i++)

 pthread_create(tid + 1, NULL, threadfunc, NULL);

// а вот здесь ожидается завершение всех потоков!

for (int i = 0; i < N; i++)

 pthread_join(tid + 1, NULL);

При первом знакомстве с подобным шаблоном кода пугает то обстоятельство, что предписан такой же порядок ожидания завершения потоков, как и при их создании. И это при том, что порядок их завершения может быть совершенно произвольным. Но представленный шаблон верен: если некоторый ожидаемый в текущем цикле поток j «задерживается», а мы заблокированы именно в ожидании tid[j], то после завершения этого ожидаемого потока, которое когда-то все-таки наступит, мы «мгновенно» пробегаем все последующие i, для которых соответствующие tid[i] уже завершились ранее. Так что представленный шаблон корректен и широко используется на практике.

Примечание

В такой схеме потоки могут возвратить в точку ожидания (и зачастую делают это) результат своего выполнения. В представленном шаблоне мы не стали показывать возврат значений, чтобы не загромождать код. Возврат результата подробно рассматривался ранее, когда речь шла о завершении потоков.

Показанная схема синхронизации на завершении потоков не является примитивом синхронизации и не требует использования таковых, но она выводит нас на еще один тип примитивов — барьер.

 

Барьер

 

Барьер как раз и предназначен для разрешения выше обозначенной проблемы — ожидания условия достижения несколькими заданными потоками точки синхронизации. Достигнув этой точки, потоки освобождаются «одновременно» и уже с этой точки продолжают свое независимое развитие. «Классическая» схема использования барьера (именно в таком качестве он чаще всего и используется), неоднократно приводимая в описаниях, выглядит так (мы уже много раз использовали ее в примерах кода):

static pthread_barrier_t bfinish;

void* threadfunc(void* data) {

 // потоки что-то делают ...

 pthread_barrier_wait(&bfinish);

 return NULL;

}

int main(int argc, char *argv[]) {

 int N = ...; // будем создавать N идентичных потоков

 if (pthread_barrier_init(&bfinish, NULL, N + 1) != EOK)

  perror("barrier init"), exit(EXIT.FAILURE);

 for (int i = 0; i < N; i++)

  if (pthread_create(NULL, NULL, threadfunc, NULL) != EOK)

   perror("thread create"), exit(EXIT_FAILURE);

 pthread_barrier_wait(&bfinish);

}

Очевидно, что по функциональности эта схема мало отличается от ожидания завершения потоков на pthread_join(), описанного выше. Однако есть различия в организации: если ранее мы просто ожидали полного завершения дочерних потоков, то в данной схеме мы ожидаем достижения ими специально созданной точки синхронизации. Еще одно отличие состоит в том, что схема синхронизации с ожиданием завершения на pthread_join() приемлема только для «присоединенных» потоков, тогда как схема на pthread_barrier_wait() может применяться и к «отсоединенным», автономным потокам.

Но если бы различие двух схем только на том и заканчивалось, то, возможно, нецелесообразно было бы вводить новый механизм барьеров. Однако техника использования барьеров шире, она может быть использована, например, когда нужно, чтобы, напротив, последовательно создаваемые потоки (в цикле порождающего потока) стартовали на исполнение «одновременно» (особенно это характерно тогда, когда дочерние потоки создаются с более высоким приоритетом, чем порождающий):

static pthread_barrier_t bstart;

void* threadfunc(void* data) {

 // все потоки после создания должны "застрять" на входном барьере,

 // чтобы потом одновременно "сорваться" в исполнение...

 pthread_barrier_wait(&bstart);

 // ... выполнение ...

 return NULL;

}

int main(int argc, char *argv[]) {

 ...

 int N = ...; // будем создавать N идентичных потоков

 if (pthread_barrier_init(&bstart, NULL, N) != EOK)

  perror("barrier init"), exit(EXIT_FAILURE);

 for (int i = 0; i < nthr; i++) {

  if (pthread_create(NULL, NULL, threadfunc, NULL) != EOK)

  perror("thread create"), exit(EXIT_FAILURE);

 }

 ...

}

Обратите внимание на параметр количества ожидающих на барьере потоков при его инициализации: здесь он на единицу меньше.

Применение барьеров подробно описано в литературе [1], поэтому мы не будем специально останавливаться на этом элементе синхронизации, тем более что это один из наиболее простых в применении элементов.

По непонятным причинам документация QNX [8] причисляет барьеры к элементам синхронизации ядра, однако никаких средств native API QNX, предназначенных для работы с барьерами, документация не описывает, а заголовочный файл так описывает тип pthread_barrier_t:

typedef struct {

 unsigned int barrier;

 unsigned int count;

 pthread_mutex_t lock;

 pthread_cond_t bcond;

} pthread_barrier_t;

Выводы можно сделать самостоятельно.

Также несколько загадочно выглядит тот факт, что согласно документации QNX 6.2.1 все функции работы с барьером и его атрибутами описаны в заголовочном файле , за исключением двух функций pthread_barrier_wait() и pthread_barrierattr_setpshared(), о которых говорится, что они описаны в файле ! Но если заглянуть в заголовочные файлы, то выясняется, что можно спокойно использовать для абсолютно всех функций работы с барьером либо заголовочный файл , либо .

 

Операции с барьерами

 

Параметры барьера

Следующие функции инициализируют и разрушают блок параметров барьера:

int pthread_barrierattr_init(pthread_barrierattr_t* attr);

int pthread_barrierattr_destroy(pthread_barrierattr_t* attr);

Функция инициализации возвращает следующие значения:

EOK — успешное выполнение;

ENOMEM — недостаточно памяти для инициализации атрибутов барьера.

Функция разрушения атрибутов объекта возвращает значения:

EOK — успешное выполнение;

EINVAL — передан неверный объект атрибутов барьера attr.

Параметры барьера описываются типом pthread_barrierattr_t и аналогично другим типам синхронизации используются в момент инициализации элемента синхронизации. Фактически единственным атрибутом барьера является модификатор доступа к барьеру из других процессов:

int pthread_barrierattr_setpshared(

pthread_barrierattr_t* attr, int pshared);

int pthread_barrierattr_getpshared(

const pthread_barrierattr_t* attr, int* pshared);

По умолчанию атрибуты барьера запрещают доступ к элементу синхронизации из других процессов.

Обе функции могут возвращать следующие значения:

EOK — успешное выполнение;

EINVAL — одно или оба из переданных функции значений неверны.

 

Инициализация и разрушение барьера

int pthread_barrier_init(pthread_barrier_t* barrier,

 const pthread_barrierattr_t* attr, unsigned int count);

Функция инициализирует объект синхронизации типа барьер, после чего его можно использовать. В атрибутах барьера устанавливается (или запрещается) возможность доступа к барьеру из других процессов. По умолчанию такой доступ запрещен. Для того чтобы изменить возможность доступа к созданному ранее барьеру, его необходимо разрушить, установить соответствующий атрибут и инициализировать барьер повторно. Параметр count показывает, какое количество потоков будет ожидать на барьере до их освобождения.

Возвращаемые значения:

EOK — успешное выполнение;

EAGAIN — системе не хватает ресурсов для инициализации барьера;

EBUSY — попытка инициализации уже инициализированного барьера;

EFAULT — сбой произошел при обращении ядра к аргументам;

EINVAL — attr указывает на неинициализированное значение атрибутов.

int pthread_barrier_destroy(pthread_barrier_t* barrier);

Функция разрушает барьер, после чего соответствующий элемент синхронизации barrier не может использоваться без повторной его инициализации.

Возвращаемые значения:

EOK — успешное выполнение;

EBUSY — в настоящее время есть потоки, блокированные на барьере;

EINVAL — неинициализированный объект barrier.

 

Ожидание на барьере

Функция ожидания (синхронизации) на барьере:

int pthread_barrier_wait(pthread_barrier_t* barrier);

Вызов этой функции приводит к блокированию потока до тех пор, пока на барьере не накопится количество заблокированных потоков, определенное ранее при вызове функции pthread_barrier_init().

Необходимо с особой осторожностью относиться к использованию барьеров для остановки и синхронизации потоков разных приоритетов. Поскольку потоки, ожидающие у барьера, находятся в блокировке на условной переменной (внутренней), то система никак не отслеживает эффекты, связанные с возможной инверсией приоритетов.

Если поток, блокированный на барьере, получает сигнал, для которого определен обработчик, то обработчик сигнала выполняется, но по завершении его выполнения поток вновь блокируется до выполнения условия барьера.

Документация QNX утверждает, что нельзя заранее сказать, какой поток будет освобожден первым, когда заданное количество потоков достигнет барьера. Учитывая, что при реализации операций над потоками использовалась комбинация мьютекса с условной переменной (как видно из приведенного выше определения, типа pthread_barrier_t), освобождение блокированных потоков будет проходить по принципам, определенным для потоков, блокированных на условной переменной и получающих «широковещательное» (broadcast), или одновременное, освобождение. Документация QNX утверждает, что в таком случае первым будет «отпущен» тот поток, который ждал дольше других.

Функция ожидания на барьере возвращает значения:

BARRIER_SERIAL_THREAD — для первого из потоков, достигшего барьера и блокированного на нем;

0 — для всех последующих потоков;

EINVAL — при ошибке выполнения, которая может возникнуть только по некорректности инициализации барьера.

 

Блокировки чтения/записи

 

Блокировка чтения/записи является специфическим механизмом, отличающимся от рассмотренных выше. Специфика состоит в следующем:

• Данный тип блокировки даже по POSIX является альтернативным. Часто этот тип блокировки может реализовываться как надстройка над уже существующими базовыми примитивами. У. Стивенс [2] показывает один из вариантов такой «оберточной» реализации и приводит ссылки на еще несколько вариантов, в том числе и на предложенный стандартом IEEE 1996.

• Функциональность, обеспечиваемая блокировкой чтения/записи, может быть предоставлена и простым использованием других базовых механизмов, однако реализация с блокировкой чтения/записи может быть намного эффективнее по производительности приложения (как мы увидим позже).

Целесообразность привлечения блокировки чтения/записи возникает тогда, когда все множество потоков может быть отчетливо разделено на две группы: потоки, только считывающие защищаемые блокировкой данные (читатели), и потоки, модифицирующие эти данные (писатели). Такая ситуация традиционно возникает в области работы с объемными, сложно структурированными данными.

При использовании блокировки чтения/записи, в отличие от других ранее рассмотренных механизмов, вводится различие между получением такой блокировки для считывания и записи. При этом действуют следующие правила:

• Любое количество потоков может заблокировать ресурс для считывания, если ни один поток не заблокировал его по записи.

• Наличие множественных блокировок по чтению не препятствует (не блокирует) ни одному из потоков-читателей выполнять свои операции с ресурсом.

• Блокировка по записи может быть установлена, только если ни один поток не блокирует ресурс ни по чтению, ни по записи.

• Блокировка по записи запрещает дальнейшую блокировку ресурса (блокирует запрашивающий процесс) и по чтению, и по записи.

Функции работы с блокировками чтения/записи объявлены, как и большинство примитивов синхронизации, в заголовочном файле .

 

Операции с блокировками чтения/записи

 

Инициализация объекта блокировки

int pthread_rwlock_init(pthread_rwlock_t* rwl,

 const pthread_rwlockattr_t* attr);

int pthread_rwlock_destroy(pthread_rwlock_t* rwl);

Вызов функций инициализирует/разрушает блокировку чтения/записи. При инициализации блокировки ей передается структура параметров блокировки pthread_rwlockattr_t, в которой могут устанавливаться параметры доступа к объекту из других процессов.

Вместо прямого вызова функции pthread_rwlock_init() для начальной инициализации статических блокировок чтения/записи (глобальных на уровне файла кода или пространства имен namespace либо явно описанных с квалификатором static) можно воспользоваться макросом PTHREAD_RWLOCK_INITIALIZER.

 

Захват блокировки чтения/записи

В связи со спецификой применения блокировок чтения/записи этот объект синхронизации имеет две группы функций захвата, позволяющих по-разному регулировать доступ к защищаемому участку кода.

К первой группе относятся функции, допускающие рекурсивный захват объекта синхронизации и совместное использование участка кода. Это так называемые функции блокировки по чтению.

Вторая группа функций допускает только эксклюзивный захват объекта синхронизации и к выполнению защищаемого кода допускается только один поток. Это функции блокировки по записи.

Функции блокировки по чтению

int pthread_rwlock_rdlock(pthread_rwlock_t* rwl);

int pthread_rwlock_tryrdlock(pthread_rwlock_t* rwl);

int pthread_rwlock_timedrdlock(pthread_rwlock_t* rwlock,

 const struct timespec* abs);

Эта группа функций позволяет множественный захват объекта синхронизации и одновременное исполнение защищаемого участка кода, с которым ассоциируется переменная rwl. Данная группа функций предназначена для обеспечения одновременного чтения данных, разделяемых несколькими потоками. Выполнение захвата с использованием этой группы функций будет блокировано, если объект синхронизации, на который указывает rwl, уже захвачен одной из функций эксклюзивного исполнения кода (блокировки по записи).

Первая функция в группе позволяет выполнить простой захват объекта синхронизации. При рекурсивном захвате с помощью любой из указанных функций необходимо помнить, что освобождение должно производиться столько же раз, сколько и захват.

Функция pthread_rwlock_rdlock() может возвращать следующие значения:

EOK — успешное выполнение;

EAGAIN — при первом использовании статически инициированной блокировки чтения/записи (PTHREAD_RWLOCK_INITIALIZER) недостаточно системных ресурсов для инициализации блокировки чтения/записи;

EDEADLK — вызывающий поток уже провел эксклюзивный захват (блокировку по записи) объекта синхронизации, на который ссылается rwl, и повторный захват на чтение привел бы к полному («мертвому») блокированию потока;

EFAULT — сбой при обращении ядра к rwl;

EINVAL — rwl указывает на неверный объект блокировки чтения/записи.

Вторая из перечисленных функций проверяет, не находится ли соответствующий объект синхронизации в эксклюзивном использовании, и если это так, она возвращает соответствующее значение без последующего ожидания освобождения объекта синхронизации.

Функция pthread_rwlock_tryrdlock() возвращает следующие значения:

EOK — успешное выполнение;

EAGAIN — при первом использовании статически инициированной блокировки чтения/записи (PTHREAD_RWLOCK_INITIALIZER) недостаточно системных ресурсов для инициализации блокировки чтения/записи;

EBUSY — блокировка чтения/записи блокирована по записи (собственно, это и есть основной мотив использования именно этого вызова);

EDEADLK — вызывающий поток уже провел эксклюзивный захват (блокировку по записи) объекта синхронизации, на который ссылается rwl;

EFAULT — сбой при обращении ядра к rwl;

EINVAL — rwl указывает на неверный объект блокировки чтения/записи.

Третья функция из группы предусматривает завершение блокировки ожидания освобождения объекта синхронизации при возникновении блокировки по записи или по истечении заданного тайм-аута ожидания.

Функция pthread_rwlock_timedrdlock() может возвращать следующие коды ошибки:

EAGAIN — система не может захватить блокировку по чтению, поскольку достигнуто максимальное число блокировок чтения для данного объекта;

EDEADLK — вызывающий поток уже является владельцем указанного объекта синхронизации; он захватил его, используя блокировку по записи, и повторная блокировка по чтению привела бы к полному блокированию потока;

EINVAL — неверный параметр вызова: либо rwl указывает на неинициализированный объект блокировки чтения/записи, либо время тайм-аута abs задано меньше нуля или равно или выше предельного значения 1000 миллионов;

ETIMEDOUT — не удалось захватить блокировку до истечения срока тайм-аута.

Функции блокировки по записи

int pthread_rwlock_wrlock(pthread_rwlock_t* rwl);

int pthread_rwlock_trywrlock(pthread_rwlock_t* rwl);

int pthread_rwlock_timedwrlock(pthread_rwlock_t* rwlock,

 const struct timespec* abs_timeout);

Функции этой группы предназначены для эксклюзивного захвата объекта синхронизации и использования его для блокирования по записи. Блокировка по записи, в отличие от блокировки по чтению, не допускает совместного исполнения защищаемого участка кода (ни пишущими, ни читающими потоками). Так же как в группе функций блокировки по чтению, в этой группе присутствуют функции простого захвата, попытки захвата и захвата с тайм-аутом ожидания освобождения.

Функция pthread_rwlock_wrlock() возвращает следующие значения:

EOK — успешное выполнение;

EAGAIN — при первом использовании статически инициированной блокировки чтения/записи (PTHREAD_RWLOCK_INITIALIZER) недостаточно системных ресурсов для инициализации блокировки чтения/записи;

EDEADLK — вызывающий поток уже является владельцем блокировки в эксклюзивном режиме;

EFAULT — сбой при обращении ядра к rwl;

EINVAL — rwl указывает на неверный объект блокировки чтения/записи.

Функция pthread_rwlock_trywrlock() возвращает значения:

EOK — успешное выполнение;

EAGAIN — при первом использовании статически инициированной блокировки чтения/записи (PTHREAD_RWLOCK_INITIALIZER) недостаточно системных ресурсов для инициализации блокировки чтения/записи;

EBUSY — блокировка уже захвачена в режиме чтения или записи;

EDEADLK — вызывающий поток уже является владельцем блокировки в эксклюзивном режиме;

EFAULT — сбой при обращении ядра к rwl;

EINVAL — rwl указывает на неверный объект блокировки чтения/записи.

Функция pthread_rwlock_timedwrlock() возвращает значения:

EOK — успешное выполнение;

EAGAIN — система не может захватить блокировку по записи, поскольку достигнуто максимальное число блокировок по записи для данного объекта;

EDEADLK — вызывающий поток уже является владельцем блокировки в эксклюзивном режиме;

EINVAL — неверный параметр вызова: либо rwl указывает на неинициализированный объект блокировки чтения/записи, либо время тайм-аута abs задано меньше нуля или равно или выше предельного значения 1000 миллионов;

ETIMEDOUT — не удалось захватить блокировку до истечения заданного срока тайм-аута.

 

Освобождение блокировки

int pthread_rwlock_unlock(pthread_rwlock_t* rwl);

Функция освобождает захваченный любым образом объект блокировки чтения/записи. Если объект был захвачен в режиме множественного использования (блокировки по чтению), то количество его освобождений должно равняться количеству захватов.

Возвращаемые значения:

EOK — успешное завершение;

EAGAIN — при первом использовании статически инициированной блокировки чтения/записи (PTHREAD_RWLOCK_INITIALIZER) недостаточно системных ресурсов для инициализации блокировки чтения/записи;

EFAULT — ядро не смогло обратиться к объекту rwl;

EINVAL — объект rwl указывает на неверно инициированный объект блокировки чтения/записи;

EPERM — нет потоков, захвативших объект rwl в режиме чтения или записи, или вызывающий поток не владеет блокировкой в режиме записи.

 

Использование блокировок чтения/записи

Построим приложение, использующее блокировку чтения/записи (файл sy10.cc):

Эффективность блокировки чтения/записи

#include

#include

#include

// сколь угодно сложные элементы внутренней базы данных

// приложения; в примере мы используем их простейший вид

typedef int element;

// база данных приложения - динамический список элементов

class dbase : public list {

 static const int READ_DELAY = 1, WRITE_DELAY = 2;

public:

 // операция "добавить в базу данных"

 void add(const element& e) {

  int pos = size() * rand() / RAND_MAX;

  list::iterator p = begin();

  for (int i = 0; i < pos; i++) p++;

  insert(p, e);

  delay(WRITE_DELAY);

 }

 // операция "найти в базе данных"

 int pos(const element& e) {

  int n = 0;

  for (list::iterator i = begin(); i != end(); i++, n++)

   if (*i == e) {

    delay(READ_DELAY);

    break;

   }

  if (n == size()) n = -1;

  return n;

 }

} data;

inline element erand(unsigned long n) {

 return (element)((n * rand()) / RAND_MAX);

}

inline bool wrand(double p) {

 return (double)rand() / (double)RAND_MAX < p;

}

int main(int argc, char *argv[]) {

 // общее число обращений приложения к базе данных

 static unsigned long n = 1000;

 // вероятность обновления базы данных при обращении

 static double p = .1;

 unsigned long m;

 if (argc > 1 && (m = atoll(argv[1])) ! = 0) n = m;

 double q;

 if (argc > 2 && (q = atof(argv[2])) != 0) p = q;

 // начальное заполнение базы данных

 for (int i = 0; i < n; i++) data.add(erand(n));

 // тактовая частота процессора (для измерения времени)

 const uint64_t cps = SYSPAGE_ENTRY(qtime)->cycles_per_sec;

 // последовательность n обращений к базе данных,

 // из них p*n требуют обновления списка, а остальные

 // (1-p)*n требуют только выборки данных:

 uint64_t t = ClockCycles();

 for (int i = 0; i < n; i++) {

  element e = erand(n);

  if (!wrand(p)) data.pos(e);

  else data.add(e);

 }

 t = ((ClockCycles() - t) * 1000000000) / cps;

 cout << "evaluation time: " << (double)t / 1.E9

  << " sec." << endl;

 return EXIT_SUCCESS;

}

Перед нами простейшая последовательная программа, которая массированно читает свою базу данных и изредка ее модифицирует. Для выполнения реальных операций чтения/записи данных программе необходимо выполнять некоторые достаточно продолжительные операции. В приведенном коде эти операции имитируются задержками delay(WRITE_DELAY) и delay(READ_DELAY).

Возникает совершенно естественное желание преобразовать последовательные запросы к данным в параллельные (файл sy11.cc). Для этого преобразуем структуру списка элементов, с которым работаем:

class dbase : public list {

 static const int READ_DELAY = 1, WRITE_DELAY = 2;

 pthread_mutex_t loc;

public:

 dbase(void) { pthread_mutex_init(&loc, NULL); }

 ~dbase(void) { pthread_mutex_destroy(&loc); }

 void add(const elements e) {

  pthread_mutex_lock(&loc);

  int pos = size() * rand() / RAND_MAX;

  list::iterator p = begin();

  for (int i = 0; i < pos; i++) p++;

  insert(p, e);

  delay(WRITE_DELAY);

  pthread_mutex_unlock(&loc);

 }

 int pos(const elements e) {

  int n = 0;

  pthread_mutex_lock(&loc);

  for (list::iterator i = begin(); i != end(); i++, n++)

   if (*i == e) {

    delay(READ_DELAY);

    break;

   }

  pthread_mutex_unlock(&loc);

  if (n == size()) n = -1;

  return n;

 }

} data;

А в вызывающей программе цикл запросов к данным преобразуем в:

pthread_t *h = new pthread_t[n];

uint64_t t = ClockCycles();

for (int i = 0; i < n; i++) {

 element e = erand(n);

 pthread_create(h + i, NULL, wrand(p) ? add : pos, (void*)e);

}

for (int i = 0; i < n; i++)

 pthread_join(h[i], NULL);

t = ((ClockCycles() - t) * 1000000000) / cps;

delete h;

А используемые этим фрагментом функции потоков определим как:

static void* add(void* par) { data.add((element)par); }

static void* pos(void* par) { data.pos((element)par); }

Совершенно естественно, что список элементов, из которого мы извлекаем данные (и куда изредка помещаем новые), должен защищаться как при модификации, так и при считывании (во избежание их одновременной модификации «со стороны»). Понятно, что в представленном решении мы чересчур перестраховались: во время считывания мы должны защищаться от потенциальной одновременной модификации, но нет необходимости защищать структуру данных от параллельного считывания. Поэтому переопределим структуру данных (файл sy12.cc), используя блокировку чтения/записи, оставив все прочее без изменений:

class dbase : public list {

 static const int READ_DELAY = 1, WRITE_DELAY = 2;

 pthread_rwlock_t loc;

public:

 dbase(void) { pthread_rwlock_init(&loc, NULL); }

 ~dbase(void) { pthread_rwlock_destroy(&loc); }

 void add(const elements e) {

  pthread_rwlock_wrlock(&loc);

  int pos = size() * rand() / RAND_MAX;

  list::iterator p = begin();

  for (int i = 0; i < pos; i++) p++;

  insert(p, e);

  delay(WRITE_DELAY);

  pthread_rwlock_unlock(&loc);

 }

 int pos(const elements e) {

  int n = 0;

  pthread_rwlock_rdlock(&loc);

  for (list::iterator i = begin(); i != end(); i++, n++)

   if (*i == e) {

    delay(READ_DELAY);

    break;

   }

  pthread_rwlock_unlock(&loc);

  if (n == size()) n = -1;

  return n;

 }

} data;

А теперь пришло время сравнить варианты:

# nice -n-19 sy10 500 .2

evaluation time: 1.2296 sec.

# nice -n-19 sy11 500 .2

evaluation time: 1.24973 sec.

# nice -n-19 sy12 500 .2

evaluation time: 0.440904 sec.

При «жесткой» блокировке мы не получаем никакого выигрыша за счет параллельного выполнения запросов к данным, а при использовании блокировки чтения/записи — 3-кратный выигрыш. Проделаем то же самое, но в условиях гораздо меньшей интенсивности обновления данных относительно общего потока запросов:

# nice -n-19 sy10 500 .02

evaluation time 0.989699 sec.

# nice -n-19 sy11 500 .02

evaluation time 0.98391 sec.

# nice -n-19 sy12 500 .02

evaluation time 0.0863443 sec.

Выигрыш становится более чем 10-кратным.

Показанные примеры (sy10.cc, sy11.cc, sy12.cc) в высшей степени условны: картина происходящего будет существенно другой при замене пассивного ожидания (delay()) на активные процессорные операции над данными, но общие тенденции сохраняются.

 

Спинлок

 

Спинлок, или «крутящаяся» блокировка, предназначен исключительно для применения в системах SMP (Symmetrical Multi-Processing), то есть в многопроцессорных системах. Поведение спинлока практически идентично классическому мьютексу, за единственным исключением — ожидающий поток не блокируется и не вытесняется. Не забывайте, речь идет о многопроцессорной системе! Основным назначением спинлока является задержка выполнения обработчиков прерываний, и предназначены они для исключения временных потерь, связанных с переключением контекстов.

Функции работы с «крутящейся» блокировкой объявлены в заголовочном файле <рthread.h>. Самих функций немного, и они имеют минимальные возможности по настройке. Спинлок не поддерживает тайм-ауты. Появление этого элемента синхронизации в QNX Neutrino связано с требованиями стандарта POSIX 1003.1j (draft).

 

Операции со спинлоком

 

Инициализация и разрушение спинлока

int pthread_spin_init(pthread_spinlock_t* spinner, int pshared);

Функция инициализирует объект синхронизации спинлока блокировки, на который указывает аргумент spinner, и устанавливает для него параметр доступа из других процессов в соответствии со значением переменной pshared. Эта переменная может принимать следующие значения:

• PTHREAD_PROCESS_SHARED — с объектом спинлок может оперировать поток любого процесса, имеющего доступ к памяти, в которой распределен объект спинлок;

• PTHREAD_PROCESS_PRIVATE — доступ к объекту синхронизации возможен только для потоков процесса, из адресного пространства которого была распределена память объекта синхронизации.

В случае успешного завершения функция возвращает нулевое значение, в противном случае — один из следующих кодов ошибок:

AGAIN — системе не хватает ресурсов для инициализации блокировки;

EBUSY — объект крутящейся блокировки, на который указывает spinner, уже инициирован;

EINVAL — некорректный объект spinner;

ENOMEM — система не имеет достаточного количества свободной памяти для создания нового объекта.

int pthread_spin_destroy(pthread_spinlock_t* spinner);

Функция деинициализирует объект крутящейся блокировки. После деинициализации для последующего применения объекта он должен быть вновь инициализирован. Обратите внимание, результат функции не определен, если поток в данный момент крутится на блокировке, на которую указывает spinner, либо если объект spinner не был инициализирован.

Возвращаемые значения:

EOK — успешное выполнение;

EBUSY — блокировка используется другим потоком и не может быть разрушена;

EINVAL — некорректный объект spinner.

 

Захват и освобождение спинлока

int pthread_spin_lock(pthread_spinlock_t* spinner);

int pthread_spin_trylock(pthread_spinlock_t* spinner);

Это функции захвата и попытки захвата крутящейся блокировки соответственно. Как и для мьютекса, если объект spinner в момент захвата свободен, то поток, вызвавший одну из этих функций, становится владельцем крутящейся блокировки. Если spinner уже захвачен другим потоком, то в случае вызова второй из рассматриваемых функций управление возвращается немедленно, а в случае простого захвата (первая функция) вызвавший поток «крутится», то есть остается активным, но не возвращает управления до тех пор, пока объект синхронизации не освободится.

Попытка повторного захвата крутящейся блокировки из того же потока приводит к мертвой блокировке.

Функции возвращают следующие параметры:

EOK — успешное выполнение;

EAGAIN — недостаточно ресурсов системы для захвата spinner;

EDEADLK — вызвавший поток уже владеет spinner;

EINVAL — spinner — неверный объект типа pthread_spinlock_t;

EBUSY — объект захвачен другим потоком (для pthread_spin_trylock()).

int pthread_spin_unlock(pthread_spinlock_t* spinner);

Вызов этой функции освобождает объект крутящейся блокировки, на который указывает аргумент spinner.

Функция может возвращать значения:

EOK — успешное выполнение;

EINVAL — неверный объект spinner;

EPERM — вызывающий поток не является владельцем крутящейся блокировки.