QNX/UNIX: Анатомия параллелизма

Цилюрик Олег Иванович

Горошко Егор

Зайцев Владимир

2. Процессы и потоки

 

 

При внимательном чтении технической документации [8] и литературы по ОС QNX [1] отчетливо бросается в глаза, что тонкие детали создания и функционирования процессов и потоков описаны крайне поверхностно и на весьма некачественном уровне. Возможно, это связано с тем, что общие POSIX-механизмы уже изучены и многократно описаны на образцах кода в общей литературе по UNIX. Однако большинство литературных источников написано в «допотоковую» эпоху, когда основной исполняемой единицей в системе являлся процесс.

Детальное рассмотрение особенностей именно QNX (версии 6.X после приведения ее в соответствие с POSIX, в отличие от предыдущей 4.25) лишний раз подчеркивает, что:

• Процесс является только «мертвой» статической оболочкой, хранящей учетные данные и обеспечивающей окружение динамического исполнения… Чего? Конечно же, потока, даже если это единственный (главный) исполняемый поток приложения (процесса), как это принято в терминологии, не имеющий отношения к потоковым понятиям.

• Любые взаимодействия, синхронизация, диспетчеризация и другие механизмы имеют смысл только применительно к потокам, даже если это потоки, локализованные в рамках различных процессов. Вот здесь и возникает термин, ставший уже стереотипным: «IPC — средства взаимодействия процессов». Для однопотоковых приложений этот терминологический нюанс не вносит ровно никакого различия, но при переходе к многопотоковым приложениям мы должны рассуждать в терминах именно взаимодействующих потоков, локализованных внутри процессов (одного или различных).

• В системах с аппаратной трансляцией адресов памяти (MMU — Memory Management Unit) процесс создает для своих потоков дополнительные «границы существования» — защищенное адресное пространство. Большинство сложностей, описываемых в литературе в связи с использованием IPC, обусловлено необходимостью взаимодействующих потоков преодолевать адресные барьеры, устанавливаемые процессами для каждого из них. (Что касается MMU, то выданной книге предполагается исключительно x86-архитектура, хотя количество аппаратных платформ, на которых работает ОС QNX, на сегодняшний день уже перевалило за десяток.)

Примечание

Модель потоков QNX в значительной степени напоминает то, что происходит с процессами в MS-DOS или с задачами (task) в существенно более поздней ОС реального времени VxWorks: исполнимые единицы разделяют единое адресное пространство без каких-либо ограничений на использование всего адресного пространства. В рамках подобной модели в QNX можно реализовать и сколь угодно сложный комплекс, трансформировав в потоки отдельные процессы, составляющие этот комплекс, с тем только различием, что в QNX все элементы собственно операционной системы продолжают работать в изолированном адресном пространстве и не могут быть никоим образом включены (и тем самым повреждены) в пространство приложения.

И в технической документации QNX, и в книге Р. Кертена [1] много страниц уделено описанию логики процессов, потоков, синхронизации и многим другим вещам в терминах аллегорических аналогий: коллективное пользование ванной комнатой, кухней... Если согласиться, что такие аллегории более доходчивы для качественного описания картины происходящего (что, похоже, так и есть), то для иерархии «операционная система — процесс — поток» можно найти существенно более близкую аллегорию: «аквариумное хозяйство». Действительно:

• В некотором общем помещении, где имеются все средства жизнеобеспечения — освещение, аэрация, терморегуляция, кормление (операционная система), — размещаются аквариумы (процессы), внутри которых (в одних больше, в других совсем немного) живут активные сущности (растения, рыбы, улитки). Помимо всех прочих «удобств» в помещении время от времени появляется еще одна сущность — «хозяин». Он является внешней по отношению к системе силой, которая асинхронно предпринимает некоторые действия (кормление, пересадка животных), нарушающие естественное «синхронное» течение событий (это служба системного времени операционной системы, которая извне навязывает потокам диспетчеризацию).

• Аквариумы (процессы) являются не только контейнерами, заключающими в себе активные сущности (потоки). Они также ограничивают ареал существования (защищенное адресное пространство) для их обитателей: любое нарушение границ обитания в силу каких-либо форс-мажорных обстоятельств, безусловно, означает гибель нарушителя (ошибка нарушении защиты памяти в потоке).

• Обитатели аквариумов (потоки) легко и непринужденно взаимодействуют между собой (сталкиваются при движении или, напротив, уступают друг другу место) в пределах контейнера (процесса). Однако при этом они не могут взаимодействовать с обитателями других контейнеров (процессов); более того, они даже ничего не знают об их существовании. Если обитатель требует вмешательства, например перемещения его в другой контейнер, то он может лишь способствовать этому, взывая своим поведением (при помощи особых знаков) к инстанции более высокого уровня иерархии, в отличие от контейнера некоторой «общесистемной субстанции», взывающего к хозяину (операционной системе) о вмешательстве (диспетчеризации).

• Все жизненно необходимые ресурсы (кислород, корм, свет) поступают непосредственно к контейнеру как единице распределения (операционная система выделяет ресурсы процессу в целом). Обитатели контейнера (потоки) конкурируют за распределение общих ресурсов контейнера на основании своих характеристик (приоритетов) и некоторой логики (дисциплины) распределения относительно «личностных» характеристик: размера животного, быстроты реакции и движения и т.д.

Такая ассоциативная аналогия, возможно, позволит отчетливее ощутить, что процесс и поток относятся к различным уровням иерархии понятий ОС. Это различие смазывается тем обстоятельством, что в любой ОС (с поддержкой модели потоков или без нее) всякий процесс всегда наблюдается в неразделимом единстве хотя бы с одним (главным) потоком и нет возможности наблюдать и анализировать поведение «процесса без потока».

Отсюда и происходят попытки объединения механизмов создания и манипулирования процессами и потоками «под одной крышей» (единым механизмом). Например, в ОС Linux создание и процесса (fork()), и потока (pthread_create()) свели к единому системному вызову _clone(), что явилось причиной некоторой иллюзорной эйфории, связанной с непонятной, мифической «дополнительной гибкостью».

Усилия последующих лет были направлены как раз на разделение этих механизмов, ликвидацию этой «гибкости» и восстановление POSIX-модели. Отсюда же вытекают и разработки последних лет в области новых «экзотических» ОС, направленные на сближение модели процесса и потока, и попытки создания некой «гибридной» субстанции, объединяющей атрибуты процесса и потока, если того захочет программист (на момент создания). По нашему мнению, идея «гибридизации» достаточно сомнительна и согласно нашей аналогии направлена на создание чего-то, в головной своей части напоминающего аквариум, а в задней — рыбу. Получается даже страшнее, чем русалка…

Отмеченный выше дуализм абстракций процессов и потоков (а в некоторых ОС и их полная тождественность) приводит к тому, что крайне сложно описывать одно из этих понятий, не прибегая к упоминанию атрибутов другого. В итоге, с какой бы из двух абстракций ни начать рассмотрение, нам придется, забегая вперед, ссылаться на атрибутику другой, дуальной ей. В описании процессов нам не обойтись без понятия приоритета (являющегося атрибутикой потока), а в описании потоков мы не сможем не упомянуть глобальные (относительно потока) объекты, являющиеся принадлежностью процесса, например файловые дескрипторы, сокеты и многое другое.

По этой причине наше последующее изложение при любом порядке его «развертывания» обречено на некоторую «рекурсивность». Итак, следуя сложившейся традиции, начнем с рассмотрения процессов.

 

Процессы

 

Создание параллельных процессов настолько полно описано в литературе по UNIX, что здесь мы приведем лишь минимально необходимый беглый обзор, останавливаясь только на отличительных особенностях ОС QNX.

Всякое рассмотрение предполагает наличие системы понятий. Интуитивно ясное понятие процесса не так просто поддается формальному определению. Процитируем (во многом качественное) определение, которое дает Робачевский [3]:

Обычно программой называют совокупность файлов, будь то набор исходных текстов [8] , объектных файлов или собственно выполняемый файл. Для того чтобы программа могла быть запущена на выполнение, операционная система сначала должна создать окружение или среду выполнения задачи, куда относятся ресурсы памяти, возможность доступа к устройствам ввода/вывода и различным системным ресурсам, включая услуги ядра.

Процесс всегда содержит хотя бы один поток, поскольку мы говорим об исполняемом, развивающемся во времени коде. Для процессов, исходный код которых подготовлен на языке C/C++, главным потоком процесса является поток, в котором исполняется функция, текстуально описанная под именем main(). Код и данные процесса размещаются в оперативной памяти в адресном пространстве процесса. Если операционная система и реализующая платформа (наше рассмотрение ограничено только реализацией x86) поддерживают MMU и виртуализацию адресного пространства на физическую память, то каждый процесс имеет собственное изолированное и уникальное адресное пространство и у него нет возможности непосредственно обратиться в адресное пространство другого процесса.

Любой процесс может содержать произвольное количество потоков, но не менее одного и не более 32 767 (для QNX версии 6.2). Совокупность данных, необходимых для выполнения любого из потоков процесса, а также контекст текущего выполняемого потока называются контекстом процесса.

Согласно ранним «каноническим» спецификациям UNIX [3] ОС должна поддерживать не менее 4095 отдельных процессов (точнее 4096, из которых 0-вой представляет собой процесс, загружающий ОС и, возможно, реализующий в дальнейшем функции ядра). Во всей документации ОС QNX нам не удалось найти предельное значение этого параметра. Но если из этого делается «тайна мадридского двора», то наша задача — найти это значение:

int main(int argc, char* argv[]) {

 unsigned long n = 1;

 pid_t pid;

 while((pid = fork()) >= 0) {

  n++;

  if (pid > 0) {

   waitpid(pid, NULL, WEXITED);

   exit(EXIT_SUCCESS);

  }

 }

 if (pid == -1) {

  cout << "exit with process number: << n << " - " << flush;

  perror(NULL);

 }

}

Этот достаточно непривычный по внешнему виду код дает нам следующий результат:

# pn

exit with process number: 1743 - Not enough memory

Системному сообщению о недостатке памяти достаточно трудно верить: чуть меньше 4 Кбайт программного кода в своих 1743 «реинкарнациях» требуют не более 6,6 Мбайт для своего размещения при свободных более 230 Мбайт в системе, в которой мы испытывали это приложение. Оставим это на совести создателей ОС QNX.

В продолжение нашей основной темы любопытно рассмотреть результаты вывода команды pidin, а именно последнюю ее строку с информацией о последнем запущенном в системе процессе:

• до запуска обсуждаемого приложения:

4/366186 1 /photon/bin/phcalc 10r REPLY 241691

• и после его завершения:

54652947 1 bin/pidin 10r REPLY 1

Легко видеть, что разница PID, равная 54652947 – 47366186 = 7286761, никак не является числом активированных на этом временном промежутке процессов, которое равно 1743. Поэтому к численным значениям PID нужно относиться с заметной осторожностью: это не просто инкрементированное значение числа запущенных процессов, схема формирования PID заметно сложнее.

В любом случае мы можем принять, что в ОС QNX Neutrino 6.2.1, как и в других «канонических» UNIX, количество процессов (если, конечно, эта ОС не дает нам более вразумительных оценок) ограничено цифрой 4095. Видно, что общее количество независимых потоков исполнения в системе может достигать совершенно ошеломляющей цифры. Но как бы много потоков мы ни создавали, им все равно придется конкурировать за доступ к самому главному ресурсу — процессору. В настоящее время реализованные в QNX дисциплины диспетчеризации работают над суммарным полем всех потоков в системе (рис. 2.1): если в системе выполняется N процессов и i-й процесс реализует Mi потоков, то в очередях диспетчеризации одновременно задействовано   управляемых объектов (потоков).

Рис. 2.1. Диспетчеризация процессов

На рис. 2.1 изображены два процесса, выполняющиеся под управлением системы. Каждый процесс создал внутри себя различное количество потоков равного приоритета. Обратите внимание, что фактическая диспетчеризация производится не между процессами, а между потоками процессов, даже если иногда для простоты говорят «диспетчеризация процессов». Потоки объединены в циклическую очередь диспетчеризации, и пунктирная линия показывает порядок, в котором (в направлении стрелки) они будут поочередно получать квант времени.

Если ни один из потоков не будет выполнять блокирующих операций (read(), delay(), accept(), MsgSend() и множество других), что реально встречается крайне редко, то показанный порядок «следования» потоков при диспетчеризации будет сохраняться неограниченно долго. Как только поток выполнит блокирующий вызов, он будет удален из очереди готовых к выполнению потоков, а после завершения вызова возвращен в очередь, причем (что характерно!) в голову очереди. После этого топология «петли» (порядок чередования), показанной на рисунке пунктиром, может произвольным образом измениться.

Из рисунка хорошо видно, что при диспетчеризации «в рамках системы» (об этом мы будем говорить позже) два запущенных процесса будут выполняться в неравных условиях: на каждый полный цикл диспетчеризации программный код, выполняющийся в рамках процесса А, будет получать 1 квант времени, а код в процессе B — 3 кванта.

Примечание

Стандарт POSIX, определяя названную стратегию диспетчеризации константой PTHREAD_SCOPE_SYSTEM , предусматривает и другую стратегию, обозначаемую константой PTHREAD_SCOPE_PROCESS , когда потоки конкурируют за процессорный ресурс в пределах процесса, к которому они принадлежат (в Sun Solaris первой стратегии соответствуют «bound thread», а второй — «unbound thread»). Реализация стратегии PTHREAD_SCOPE_PROCESS связана с серьезными трудностями. Насколько нам известно, в настоящее время из числа широко распространенных ОС она реализована только в Sun Solaris. В QNX для совместимости с POSIX даже присутствуют системные вызовы относительно стратегии диспетчеризации:

int pthread_attr_setscope(pthread_attr_t* attr, int scope);

int pthread_attr_getscope(const pthread_attr_t* attr, int* scope);

но в качестве параметра scope они допускают... только значение PTHREAD_SCOPE_SYSTEM и на поведение потоков никакого влияния не оказывают.

PID (Process ID) — идентификатор процесса, присваиваемый процессу при его создании, например вызовом fork(). PID позволяет системе однозначно идентифицировать каждый процесс. При создании нового процесса ему присваивается первый свободный (то есть не ассоциированный ни с каким процессом) идентификатор. Присвоение происходит по возрастающей: идентификатор нового процесса больше идентификатора процесса, созданного перед ним. Когда последовательность идентификаторов достигает максимального значения (4095), следующий процесс получает минимальный свободный (за счет завершившихся процессов) PID, и весь цикл повторяется снова. Значения PID нумеруются, начиная с 0. Процесс, загружавший ОС, является родительским для всех процессов в системе и его PID = 0.

Из других важных атрибутов процесса отметим:

• PPID (Parent Process ID) — PID процесса, породившего данный процесс. Таким образом, все процессы в системе включены в единую древовидную иерархию.

• TTY — терминальная линия: терминал или псевдотерминал, ассоциированный с процессом. Если процесс становится процессом-демоном, то он отсоединяется от своей терминальной линии и не имеет ассоциированной терминальной линии. (Запуск процесса как фонового — знак «&» в конце командной строки — не является достаточным основанием для отсоединения процесса от терминальной линии.)

• RID и EUID — реальный и эффективный идентификаторы пользователя. Эффективный идентификатор служит для определения прав доступа процесса к системным ресурсам (в первую очередь к файловым системам). Обычно RID и EUID совпадают, но установка флага SUID для исполняемого файла процесса позволяет расширить полномочия процесса.

• RGID и EGID — реальный и эффективный идентификаторы группы пользователей. Как и в случае идентификаторов пользователя, EGID не совпадает с RGID, если установлен флаг SGID для исполняемого файла процесса.

Часто в качестве атрибутов процесса называют и приоритет выполнения. Однако приоритет является атрибутом не процесса (процесс — это статическая субстанция, контейнер), а потока, но если поток единственный (главный, порожденный функцией main()), его приоритет и есть то, что понимается под «приоритетом процесса».

 

Создание нового процесса

 

Созданию процессов (имеется в виду создание процесса из программного кода) посвящено столько описаний [1-9], что детальное рассмотрение этого вопроса было бы лишь пересказом. Поэтому мы ограничимся только беглым перечислением этих возможностей, тем более что в ходе обсуждения нас главным образом интересуют не сами процессы, а потоки, заключенные в адресных пространствах процессов.

 

Использование командного интерпретатора

Самый простой способ — запустить из программного кода дочернюю копию командного интерпретатора, которому затем передать команду запуска процесса. Для этого используется вызов:

int system(const char* command);

где command — текстовая строка, содержащая команду, которую предполагается выполнить ровно в том виде, в котором мы вводим ее командному интерпретатору с консоли.

Примечание

Функция имеет еще одну специфическую форму вызова, когда в качестве command задается NULL . По коду возврата это позволяет выяснить, присутствует ли (и доступен ли) командный интерпретатор в системе (возвращается 0, если интерпретатор доступен).

На время выполнения вызова system() вызывающий процесс приостанавливается. После завершения порожденного процесса функция возвращает код завершения вновь созданной копии интерпретатора (или -1, если сам интерпретатор не может быть выполнен), то есть младшие 8 бит возвращаемого значения содержат код завершения выполняемого процесса. Возврат вызова system() может анализироваться макросом WEXITSTATUS(), определенным в файле . Например:

#include

int main(void) {

 int rc = system("ls");

 if (rc == -1) cout << "shell could not be run" << endl;

 else

  cout << "result of running command is " << WEXITSTATUS(rc) << endl;

 return EXIT_SUCCESS;

}

Примечание

Эта функция использует вызов spawnlp() для загрузки новой копии командного интерпретатора, то есть «внутреннее устройство» должно быть в общем виде вам понятно. Особенностью QNX-реализации является то, что spawnlp() всегда использует вызов /bin/sh , независимо от конкретного вида интерпретатора, устанавливаемого переменной окружения SHELL (ksh, bash…). Это обеспечивает независимость поведения родительского приложения от конкретных установок системы, в которой это приложение выполняется.

Вызов system() является не только простым, но и очень наглядным, делающим код легко читаемым. Программисты часто относятся к нему с пренебрежением, отмечая множество его недостатков. Однако в относительно простых случаях это может быть оптимальным решением, а недостатки не так и существенны:

• Используя копию командного интерпретатора, вызов system() может инициировать процесс, исполняющий и бинарную программу, и скрипт на языке самого командного интерпретатора (shell), а также программный код на интерпретирующих языках, таких как Perl, Tcl/Tk и др. Многие из рассматриваемых ниже «чисто программных» способов могут загружать и исполнять только бинарный исполняемый код (по крайней мере, без использования ими весьма громоздких искусственных и альтернативных возможностей).

• Остановка родительского процесса в ожидании завершения порожденного также легко разрешается: просто запускайте дочерний процесс из отдельного потока:

#include

void* process(void* command) {

 system((char*)command);

 delete command;

 return NULL;

}

int main(int argc, char *argv[]) {

 ...

 char* comstr = "ls -l";

 pthread_create(NULL, NULL, strdup(comstr), &process);

 ...

}

• Часто в качестве недостатка этого способа отмечают «автономность» и невозможность взаимодействия родительского и порожденного процессов.

Но для расширения возможностей взаимосвязи процессов можно прежде всего воспользоваться вызовом popen() (POSIX 1003.1a), являющимся в некотором роде эквивалентом, расширяющим возможности system(). Возможности popen() часто упускаются из виду, так как в описаниях этот вызов относится не к области создания процессов, а к области программных каналов (pipe). Синтаксис этого вызова таков:

FILE* popen(const char* command, const char* mode);

где command — командная строка, как и у system(); mode — режим создаваемого программного канала со стороны порождающего процесса: ввод (mode = «r») или вывод (mode = «w»). Любые другие значения, указанные для mode, дают непредсказуемый результат.

В результате выполнения этой функции создается открытый файловый дескриптор канала (pipe), из которого породивший процесс может (mode = «r») читать (стандартный поток вывода дочернего процесса STDOUT_FILENO) или в который может (mode = «w») писать (стандартный поток ввода дочернего процесса STDIN_FILENO) стандартным образом, как это делается для типа FILE (в частности, с отработкой ситуации EOF).

Рассмотрим следующий пример. Конечно, посимвольный ввод/вывод — это не лучшее решение, и здесь мы используем его только для простоты:

int main(int argc, char** argv) {

 FILE* f = popen("ls -l", "r");

 if (f == NULL) perror("popen"), exit(EXIT_FAILURE);

 char c;

 while((с = fgetc(f)) != EOF )

  cout << (islower(с) ? toupper(с) : c);

 pclose(f);

 return EXIT_SUCCESS;

}

Примечание

Новый процесс выполняется с тем же окружением, что и родительский. Процесс, указанный в команде, запускается примерно следующим эквивалентом:

spawnlp(P_NOWAIT, shell_command, shell_command, "-с", command, (char*)NULL);

где shell_command — командный интерпретатор, специфицированный переменной окружения SHELL или утилита /bin/sh . В этом кроется причина возможного различия в выполнении вызовов system() и popen() .

Если popen() возвращает не NULL, то выполнение прошло успешно. В противном случае устанавливается errno: EINVAL — недопустимый аргумент mode, ENOSYS — в системе не выполняется программа менеджера каналов. После завершения работы с каналом, созданным popen(), он должен быть закрыт парной операцией pclose().

При использовании system() в более сложных случаях, например при запуске в качестве дочернего собственного процесса, являющегося составной частью комплекса (до сих пор мы рассматривали в качестве дочерних только стандартные программы UNIX), причем запуск производится из отдельного потока (то есть без ожидания завершения, как предлагалось выше), мы можем реализовать сколь угодно изощренные способы взаимодействия с помощью механизмов IPC, например, открывая в дочернем процессе двунаправленные каналы к родителю.

 

Клонирование процесса

Вызов fork() создает клон (полную копию) вызывающего процесса в точке вызова. Вызов fork() является одной из самых базовых конструкций всего UNIX-программирования. Его толкованию посвящено столько страниц в литературе, сколько не уделено никакому другому элементу API. Синтаксис этого вызова (проще по синтаксису не бывает, сложнее по семантике — тоже):

#include

pid_t fork(void);

Действие вызова fork() следующее:

• Порождается дочерний процесс, которому системой присваивается новое уникальное значение PID.

• Дочерний процесс получает собственные копии файловых дескрипторов, открытых в родительском процессе в точке выполнения fork(). Каждый дескриптор ссылается на тот же файл, который соответствует аналогичному дескриптору родителя. Блокировки файлов (locks), установленные в родительском процессе, наследуются дочерним процессом.

• Для дочернего процесса его значения tms_utime, tms_stime, tms_cutime и tms_cstime устанавливаются в значение ноль. Выдержки (alarms) для этих таймеров, установленные к этому времени в родительском процессе, в дочернем процессе очищаются.

Сигнальные маски (подробнее об этом будет рассказано ниже) для дочернего процесса инициализируются пустыми сигнальными наборами (независимо от сигнальных масок, установленных родительским процессом).

Если вызов функции завершился неудачно, функция возвращает -1 и устанавливает errno: EAGAIN — недостаточно системных ресурсов; ENOMEM — процессы требуют большее количество памяти, чем доступно в системе; ENOSYS — функция fork() не реализуется в этой модели памяти, например в физической модели адресации памяти (напомним, что QNX — многоплатформенная ОС и число поддерживаемых ею платформ все возрастает).

А вот с кодом возврата этой функции в случае удачи сложнее и гораздо интереснее. Дело в том, что для одного вызова fork() одновременно имеют место два возврата в двух различных копиях (но в текстуально едином коде!): в копии кода, соответствующей дочернему процессу, возвращается 0, а в копии кода родителя — PID успешно порожденного дочернего процесса. Это и позволяет разграничить в едином программном коде фрагменты, которые после точки ветвления надлежит выполнять в родительском процессе и которые относятся к дочернему процессу. Типичный шаблон кода, использующего fork(), выглядит примерно так:

pid_t pid = fork();

if (pid == -1) perror("fork"), exit(EXIT_FAILURE);

if (pid == 0) {

 // ... этот код выполняется в дочернем процессе ...

 exit(EXIT_SUCCESS);

}

if (pid > 0) {

 // ... этот код выполняется в родительском процессе ...

 do { // ожидание завершения порожденного процесса

  wpid = waitpid(pid, &status, 0);

 } while(WIFEXITED(status) == 0);

 exit(WEXITSTATUS(status));

}

Эта схема порождения процесса, его клонирование, настолько широко употребляется, особенно при построении самых разнообразных серверов, что для нее была создана специальная техника, построенная на вызове fork(). Заметьте, что во всех многозадачных ОС обязательно присутствует та или иная техника программного создания нового процесса, однако не во всех существует техника именно клонирования, то есть создания полного дубликата порождающего процесса.

Вот как выглядит простейший ретранслирующий TCP/IP-сервер, заимствованный из нашей более ранней публикации [4] (обработка ошибок полностью исключена, чтобы упростить пример):

Ретранслирующий TCP/IP-сервер [12]

int main(int argc, char* argv[]) {

 // создание и подготовка прослушивающего сокета:

 int rc, ls = socket(AF_INET, SOCK_STREAM, 0);

 setsockopt(ls, SOL_SOCKET, SO_REUSEADDR, &rc, sizeof(rc));

 struct sockaddr_in addr;

 memset(&addr, 0, sizeof(addr));

 addr.sin_len = sizeof(addr); // специфика QNX

 addr.sin_family = AF_INET;

 addr.sin_port = htons(PORT); // PORT - константа

 addr.sin_addr.s_addr = htonl(INADDR_ANY);

 bind(ls, (struct sockaddr*)&addr, sizeof(sockaddr));

 listen(ls, 25);

 while(true) {

  rc = accept(ls, NULL, NULL);

  pid_t pid = fork();

  if (pid < 0) ...; // что-то произошло!

  if (pid == 0) {

   close(ls);

   char data[MAXLINE];

   int nd = read(rc, &data, MAXLINE);

   if (nd > 0) write(rc, &data, nd);

   close(rs);

   exit(EXIT_SUCCESS);

  }

  else close(rs); // единственное действие родителя

 }

 exit(EXIT_SUCCESS);

}

Приведенный фрагмент может в процессе своей работы породить достаточно много идентичных процессов (один родительский, пассивно прослушивающий канал; остальные — порожденные, активно взаимодействующие с клиентами, по одному на каждого клиента). Все порождаемые процессы наследуют весь набор дескрипторов (в данном случае сокетов), доступных родительскому процессу. Лучшее, что могут сделать процессы (как родительский, так и дочерний), — немедленно после вызова fork() (и это хорошая практика в общем случае) тщательно закрыть все унаследованные дескрипторы, не имеющие отношения к их работе.

Примечание

Операция fork() должна создать не только структуру адресного пространства нового процесса, но и побайтную копию этой области. В операционных системах общего назначения (Win32, Linux, FreeBSD) для облегчения этого трудоемкого процесса используется виртуализация страниц по технологии COW (copy on write), детально описанная, например, применительно к Win32, Джеффри Рихтером. Накладные расходы процесса копирования здесь демпфированы тем, что копирование каждой физической страницы памяти фактически производится только при записи в эту страницу, то есть затраты на копирование «размазываются» достаточно случайным образом по ходу последующего выполнения дочернего процесса (здесь нет практически никакого итогового выигрыша а производительности, есть только сокрытие от пользователя одноразового размера требуемых затрат).

Системы реального времени не имеют права на такую роскошь: непредсказуемое рассредоточение копирующих операций по всему последующему выполнению, а поэтому и использование в них COW вообще, выглядит весьма сомнительно. В [4] мы описывали эксперименты в QNX, когда в код сервера, построенного на fork() , была внесена «пассивная» строка, никак не используемая в программе, но определяющая весьма протяженную инициализированную область данных:

static long MEM[2500000];

При этом время реакции (ответа) сервера (затраты времени на выполнение fork() ) возросло в 50 раз и составило 0,12 сек на процессоре 400 МГц. Еще раз, но в другом контексте эта особенность будет обсуждена ниже при сравнении затрат производительности на создание процессов и потоков.

Дополнительным вызовом этого класса (для полноты обзора) является использование функции:

pid_t vfork(void);

В отличие от fork(), этот вызов, впервые введенный в BSD UNIX, делает разделяемым для дочернего процесса адресное пространство родителя. Родительский процесс приостанавливается до тех пор, пока порожденный процесс не выполнит exec() (загружая новый программный код дочернего процесса) или не завершится с помощью exit() или аналогичных средств.

Функция vfork() может быть реализована на аппаратных платформах с физической моделью памяти (без виртуализации памяти), a fork() — не может (или реализуется с большими накладными расходами), так как требует создания абсолютной копии области адресного пространства, что в физической модели повлечет сложную динамическую модификацию адресных полей кода. Тем не менее (при некоторых кажущихся достоинствах) в BSD также подчеркивалось, что vfork() таит в себе серьезную потенциальную опасность, поскольку позволяет одному процессу использовать или даже модифицировать адресное пространство другого, то есть фактически нарушает парадигму защищенных адресных пространств.

 

Запуск нового программного кода

Наконец, рассмотрим запуск на выполнение нового, отличного от родительского процесса программного кода, образ которого содержится в отдельном исполняемом файле в качестве дочернего процесса. Для этой цели в QNX существуют две группы функций: exec() (их всего 8: execl(), execle(), execlp(), execlpe(), execv(), execve(), execvp(), execvpe()) и spawn() (их 10: spawn(), spawnl(), spawnle(), spawnlp(), spawnlpe(), spawnp(), spawnv(), spawnve(), spawnvp(), spawnvpe()).

Это множество форм записи отличается синтаксисом, который определяет формат списка аргументов командной строки, полученного нами в качестве параметров функции main(), передаваемых программе, а также некоторыми другими дополнительными деталями. Суффиксы в именах функций обозначают следующее:

• l — список аргументов определяется через список параметров, заданных непосредственно в самом вызове. Этот список завершается нулевым аргументом NULL;

• e — окружение для процесса указывается посредством определения массива переменных окружения;

• p — относительный путь поиска: если не указан полный путь к файлу программы (то есть имя файла не содержит разделителей «/»), для его поиска используется переменная окружения PATH;

• v — список аргументов определяется через указатель на массив аргументов.

В нашу задачу не входит описание всех возможностей вызовов, тем более что они обстоятельно описаны в [1, 2, 5, 6], и мы будем использовать по тексту любую, более удобную для нас форму без дополнительных объяснений.

Большинство форм функции exec() являются POSIX-совместимыми, а большая часть форм функции spawn() представляет собой специфическое расширение QNX. Более того, даже для тех функций группы spawn(), которые часто называют POSIX-совместимыми [1], техническая документация QNX определяет степень совместимости примерно в таких терминах: «…функция spawn() является функцией QNX Neutrino (основанной на POSIX 1003.1d черновом стандарте).»

Функции семейства exec(), напротив, подменяют исполняемый код текущего процесса (не изменяя его идентификатор PID, права доступа, внешние ресурсы процесса, а также находящийся в том же адресном пространстве) исполняемым кодом из другого файла. Поэтому используются эти вызовы непосредственно после fork() для замены копии вызывающего процесса новым (это классическая UNIX-технология использования).

Функции семейства spawn(), напротив, порождают новый процесс (с новым идентификатором PID и в новом адресном пространстве). Все формы вызовов spawn() после подготовительной работы (иногда очень значительной) в конечном итоге ретранслируются в вызов базовой формы spawn(), который последним действием своего выполнения и посылает сообщение procnto (менеджер процессов QNX, «территориально» объединенный с микроядром системы в одном файле).

Базовый вызов spawn() определяется следующим образом:

#include

pid_t spawn(const char* path, int fd_count, const int fd_map[],

 const struct inheritance* inherit, char* const argv[],

 char* const envp[]);

где path — полное имя исполняемого бинарного файла;

fd_count — размерность следующего за ним массива fd_map;

fd_map — массив файловых дескрипторов, которые вы хотели бы унаследовать в дочернем процессе от родительского. Если fd_count не равен 0 (то есть может иметь значения вплоть до константы OPEN_MAX), то fd_map должен содержать список из fd_count файловых дескрипторов. Если же fd_count равен 0, то дочерний процесс наследует все родительские дескрипторы, исключая те, которые созданы с флагом PD_CLOEXEC функции fcntl();

inherit — системная структура (см. системные определения) типа struct inheritance, содержащая как минимум:

 unsigned long flags — один или более установленных бит:

  SPAWN_CHECK_SCRIPT — позволить spawn() запускать требуемый командный интерпретатор, интерпретируя path как скрипт (интерпретатор указан в первой строке скрипта path);

  SPAWN_SEARCH_PATH — использовать переменную окружения PATH для поиска выполняемого файла path;

  SPAWN_SETGROUP — установить для дочернего процесса значение группы, специфицируемое членом (структуры) pgroup. Если этот флаг не установлен, дочерний процесс будет частью текущей группы родительского процесса;

  SPAWN_SETND — запустить дочерний процесс на удаленном сетевом узле QNET, сам же удаленный узел специфицируется членом (структуры) nd (см. команду удаленного запуска on);

  SPAWN_SETSIGDEF — использовать структуру sigdefault для определения процесса множества (набора) сигналов, для которых будет установлена реакция по умолчанию. Если этот флаг не установлен, дочерний процесс наследует все сигнальные реакции родителя;

  SPAWN_SETSIGMASK — использовать sigmask в качестве сигнальной маски дочернего процесса.

 pid_t pgroup — группа дочернего процесса; имеет смысл, только если установлен флаг SPAWN_SETGROUP. Если флаг SPAWN_SETGROUP установлен и inherit.pgroup установлен как SPAWN_NEWPGROUP, то дочерний процесс открывает новую группу процессов с идентификатором группы (GID), равным PID этого нового процесса.

 sigset_t sigmask — сигнальная маска дочернего процесса, если установлен флаг SPAWN_SETSIGMASK.

 sigset_t sigdefault — набор сигналов дочернего процесса, для которых определяется реакция по умолчанию, если установлен флаг SPAWN_SETSIGDEF.

 uint32_t nd — это совершенно уникальный (относительно других ОС, а значит, и всего POSIX) параметр QNX - дескриптор узла сети QNET, на котором должен быть запущен новый процесс. Это поле используется, только если установлен флаг SPAWN_SETND.

argv — указатель массива аргументов. Значение argv[0] должно быть строкой (char*), содержащей имя файла, загружаемого как процесс (но может быть NULL, если аргументы не передаются). Последний элемент массива argv обязан быть NULL. Само значение argv никогда не может быть NULL.

envp — указатель массива символьных строк переменных системного окружения (environment). Последний элемент массива envp обязан быть NULL. Каждый элемент массива является строкой (char*) вида: variable = value. Если само значение указателя envp равно NULL, то дочерний процесс полностью наследует копию окружения родителя. (Окружение процесса — всегда «копия», поэтому любые изменения, внесенные в окружение дочерним процессом, никак не отражаются на окружении его родителя.)

Примечание

Если дочерний процесс является скриптом интерпретатора (флаг SPAWN_CHECK_SCRIPT ), то первая строка текста скрипта должна начинаться с #! , за которыми должны следовать путь и аргументы того интерпретатора, который будет использоваться для интерпретации этого скрипта. К скрипту не применяется установленный в системе интерпретатор по умолчанию (как это происходит при вызове его по имени из командной строки).

Правила наследования (и ненаследования) параметров дочернего процесса от родителя (RID, RGID и других атрибутов) жестко регламентированы, достаточно сложны (в зависимости от флагов) и могут быть уточнены в технической документации QNX. Отметим, что безусловно наследуются такие параметры, как: а) приоритет и дисциплина диспетчеризации; б) рабочий и корневой каталоги файловой системы. Не наследуются: установки таймеров процесса tms_utime, tms_stime, tms_cutime и tms_cstime, значение взведенного сигнала SIGALRM (это значение сбрасывается в ноль), файловые блокировки, блокировки и отображения памяти (shared memory), установленные родителем.

При успешном завершении вызов функции возвращает PID порожденного процесса. При неудаче возвращается -1 и errno устанавливается:

• E2BIG — количество байт, заданное в списке аргументов или переменных окружения и превышающее ARG_MAX;

• EACCESS — нет права поиска в каталогах префикса имени файла, или для файла не установлены права на выполнение, или файловая система по указанному пути была смонтирована с флагом ST_NOEXEC;

• EAGAIN — недостаточно системных ресурсов для порождения процесса;

• ERADF — недопустим хотя бы один из файловых дескрипторов в массиве fd_map;

• EFAULT — недопустима одна из буферных областей, указанных в вызове;

• ELOOP — слишком глубокий уровень символических ссылок к файлу или глубина префиксов (каталогов) в полном пути к файлу;

• EMFILE — недостаточно ресурсов для отображения файловых дескрипторов в дочерний процесс;

• ENAMETOOLONG — длина полного пути превышает PATH_MAX или длина компонента имени файла и пути превышает NAME_MAX;

• ENOENT — файл нулевой длины или несуществующий префиксный компонент в полном пути;

• ENOEXEC — файл, указанный как программа, имеет ошибочный для исполняемого файла формат;

• ENOMEM — в системе недостаточно свободной памяти для порождения процесса;

• ENOSYS — файловая система, специфицированная полным путевым именем файла, не предназначена для выполнения spawn();

• ENOTDIR — префиксные компоненты пути исполняемого файла не являются каталогами;

Даже из этого очень краткого обзора вызова spawn() становятся очевидными некоторые вещи:

• Эта форма универсальна (самодостаточна), она позволяет обеспечить весь спектр разнообразных форм порождения нового процесса

• Она же и самая громоздкая форма, тяжеловесная для практического кодирования, поэтому в реальных текстах в большинстве случаев вы вместо нее встретите ее конкретизации: spawnl(), spawnle(), spawnlp(), spawnlpe(), spawnp(), spawnv(), spawnve(), spawnvp(), spawnvpe(). Все эти формы достаточно полно описаны в [1]. Функционально они эквивалентны spawn(), поэтому мы не станем на них детально останавливаться.

• Хотя вызов spawn() и упоминается в описаниях как POSIX-совместимый, в QNX он существенно расширен и модифицирован и поэтому в лучшем случае может квалифицироваться как «выполненный по мотивам» POSIX.

В качестве примера приведем использованную в [4] (глава Д. Алексеева «Утилита on») форму вызова для запуска программы (с именем, заданным в строке command) на удаленном узле node (например, /net/xxx) сети QNET (как вы понимаете, это совершенно уникальная возможность QNX, говорить о которой в рамках POSIX-совместимости просто бессмысленно):

int main() {

 char* command = "...", *node = "...";

 // параметры запуска не используются

 char* const argv[] = { NULL };

 struct inheritance inh;

 inh.flags = 0;

 // флаг удаленного запуска

 inh.flags |= SPAWN_SETND;

 // дескриптор хоста

 inh.nd = netmgr_strtond(node, NULL);

 pid_t pid = spawnp(command, 0, NULL, &inh, argv, NULL);

 ...

}

Использованная здесь форма spawnp() наиболее близка к базовой spawn() и отличается лишь тем, что для поиска файла программы используется переменная системного окружения PATH.

Приведем характерный пример вызова группы exec*():

int execl(const char* path, const char* arg0, const char* arg1, ...

 const char* argn, NULL);

где path — путевое имя исполняемого файла; arg0, …, argn — символьные строки, доступные процессу как список аргументов. Список аргументов должен завершаться значением NULL. Аргумент arg0 должен быть именем файла, ассоциированного с запускаемым процессом.

Примечание

Устоявшаяся терминология «запускаемый процесс» относительно exec*() явно неудачна и лишь вводит в заблуждение. Здесь гораздо уместнее говорить о замещении выполнявшегося до этой точки кода новым, выполнение которого начинается с точки входа главного потока замещающего процесса.

Примечание

Если вызов exec*() выполняется из многопоточного родительского процесса, то все выполняющиеся потоки этого процесса предварительно завершаются. Никакие функции деструкторов для них не выполняются.

Если вызов exec*() успешен, управление никогда уже не возвращается в точку вызова. В случае неудачи возвращается -1 и errno устанавливается так же, как описано выше для spawn().

В качестве примера работы вызова spawn*() (использование exec() аналогично) рассмотрим приложение (файлы p1.cc, p1ch.cc), в котором:

• Родительский процесс (p1) порождает дочерний (p1ch) и ожидает от него поступления сигнала SIGUSR1 (сигналы детально обсуждаются позже, но здесь попутно «вскроем» одну из их особенностей).

• Дочерний процесс периодически посылает родителю сигнал SIGUSR1.

• Родительский процесс может переустановить (с помощью параметров командной строки запуска) для дочернего: период посылки сигнала (1-й параметр задан в нашем приложении константой) и приоритет, с которым будет выполняться дочерний процесс (2-й параметр, в качестве которого ретранслируется единственный параметр команды запуска родителя).

Примечание

В данный момент нас интересует только то приложение, в котором дочерний процесс порождается вызовом spawnl() . Используемые приложением механизмы и понятия — сигналы UNIX приоритеты, наследование и инверсия приоритетов — будут рассмотрены позже, поэтому при первом чтении их можно опустить. Нам не хотелось перегружать текст дополнительными «пустыми» примерами, лишь иллюстрирующими применение одной функции. Это приложение, созданное «на будущее», позволит нам отследить крайне актуальный для систем реального времени вопрос о наличии (или отсутствии) наследования приоритетов при посылке сигналов (допустимо как одно, так и другое решение, но оно должно быть однозначно единственным для ОС).

Итак, родительское приложение (файл p1.cc):

Сигналы и наследование приоритетов

#include

#include

#include

#include

#include

#include

// обработчик сигнала

static void handler(int signo, siginfo_t* info, void* context) {

 int oldprio = getprio(0);

 setprio(0, info->si_value, sival_int);

 cout << "SIG = " << signo << " old priority = "

  << oldprio << " new priority = " << getprio(0) << endl;

 setprio(0, oldprio);

}

int main(int argc, char* argv[]) {

 // установить обработчик сигнала

 sigset_t sig;

 sigemptyset(&sig);

 //определение #define SIGUSR1 16

 sigaddset(&sig, SIGUSR1);

 sigprocmask(SIG_BLOCK, &sig, NULL);

 struct sigaction act;

 act.sa_mask = sig;

 act.sa_sigaction = handler;

 act.sa_flags = SA_SIGINFO;

 if (sigaction(SIGUSR1, &act, NULL) < 0)

  perror("set signal handler"), exit(EXIT_FAILURE);

 // создать новый (дочерний) процесс

 const char* prg = "./p1ch", *sdelay = "3";

 pid_t pid =

  ((argc > 1 ) && (atoi(argv[1]) >= sched_get_priority_min(SCHED_RR)) &&

  (atoi(argv[1]) <= sched_get_priority_max(SCHED_RR))) ?

  spawnl(P_NOWAIT, prg, prg, sdelay, argv[1], NULL) :

  spawnl(P_NOWAIT, prg, prg, sdelay, NULL);

 if (pid == -1)

  perror("spawn child process"), exit(EXIT_FAILURE);

 // размаскировать и ожидать сигнала.

 sigprocmask(SIG_UNBLOCK, &sig, NULL);

 while (true) {

  if (sleep(3) != 0) continue;

  cout << "parent main loop: priority = " << getprio(0) << endl;

 }

}

Дочернее приложение (файл p1ch.cc), которое и будет запускать показанный выше родительский процесс:

#include

#include

#include

#include

#include

int main(int argc, char *argv[]) {

 int val, del = 5;

 if ((argc > 1) &&

  (sscanf(argv[1], "%i", &val) == 1) && (val > 0)) del = val;

 if ((argc > 2) &&

  (sscanf(argv[2], "%i", &val) == 1 ) && (val > 0) &&

  (val <= sched_get_priority_max(SCHED_RR)))

  if (setprio(0, val) == -1) perror("set priority");

 // периодически уведомлять родителя SIGUSR1, используя

 // его как сигнал реального времени (с очередью):

 while(true) {

  sleep(del);

  union sigval val;

  val.sival_int = getprio(0);

  // #define SIGUSR1 16

  sigqueue(getppid(), SIGUSR1, val);

 }

}

Примечание

Для многих сигналов действием на их получение, предопределенным по умолчанию, является завершение процесса. (Реже встречается действие по умолчанию — игнорировать полученный сигнал при отсутствии явно установленной для него функции обработчика.) Достаточно странно, что завершение процесса предусмотрено как реакция по умолчанию на получение «пользовательских» сигналов SIGUSR1 и SIGUSR2. Если показанное выше приложение в процессе отладки запустить вызовом из командной строки (из командного интерпретатора или, например, файлового менеджера mqc), то результатом (на первый взгляд не столь ожидаемым) станет завершение интерпретатора командной строки (родительского процесса) и, как следствие, самого тестируемого приложения.

Вот как выглядит начальный участок совместной работы двух процессов:

# p1 15

parent main loop: priority = 10

SIG = 16: old priority = 10, new priority = 15

SIG = 16: old priority = 10, new priority = 15

parent main loop: priority = 10

SIG = 16: old priority = 10, new priority = 15

parent main loop: priority = 10

SIG = 16: old priority = 10, new priority = 15

parent main loop priority = 10

SIG = 16: old priority = 10, new priority = 15

parent main loop: priority = 10

SIG = 16: old priority = 10, new priority = 15

parent main loop: priority = 10

Отчетливо видно, что при посылке сигналов реального времени наследование приоритета посылающего процесса не происходит (дочернее приложение, посылающее сигнал, выполняется с приоритетом 15, а обработчик полученного сигнала в родительском процессе выполняется с приоритетом по умолчанию, равным 10).

Забегая вперед, сообщим, что в приведенном коде приложения сделано жалкое подобие имитации наследования приоритета: в качестве ассоциированного с сигналом реального времени значения передается значение приоритета отправителя, которое тут же устанавливается как приоритет для выполнения кода обработчика. Однако слабость в отношении истинного наследования состоит здесь в том, что два первых оператора (сохранение и установка приоритета) выполняются под приоритетом родителя, и в это время обработчик может быть вытеснен диспетчером системы.

 

Завершение процесса

С завершением процесса дело обстоит достаточно просто, по крайней мере, в сравнении с тем, что происходит при завершении потока, как это и будет показано очень скоро. Процесс завершается, если программа выполняет вызов exit() или выполнение просто доходит до точки завершения функции main(), будь то с явным указанием оператора return или без оного. Это естественный, внутренний (из программного кода самого процесса) путь завершения.

Другой путь — посылка процессу извне (из другого процесса) сигнала, реакцией на который (предопределенной или установленной) является завершение процесса (подробнее о сигналах и реакциях см. ниже). В противовес естественному завершению такое принудительное завершение извне в [12] (по крайней мере, в отношении потоков) названо отменой, и именно этим термином мы будем пользоваться далее, чтобы отчетливо отмечать, о каком варианте завершения идет речь. (Такая же терминология будет использоваться нами и относительно завершения потока.)

Здесь уместно сделать краткое отступление относительно «живучести», как это названо у У. Стивенса [2], или времени жизни объектов IPC, что в равной мере может быть отнесено не только к объектам IPC, но и ко всем прочим объектам операционной системы. У. Стивенс делит все объекты по времени жизни на:

• Объекты, время жизни которых определяется процессом (process-persistent). Такой объект существует до тех пор, пока не будет закрыт последним процессом, который его использует. Примерами такого объекта являются неименованные и именованные программные каналы (pipes, FIFO).

• Объекты, время жизни которых определяется ядром системы (kernel-persistent). Такой объект существует до перезагрузки ядра или явного удаления объекта. Примерами этого класса объектов являются семафоры (именованные) и разделяемая память.

• Объекты, время жизни которых определяется файловой системой (filesystem-persistent). Такой объект отображается на файловую систему и существует до тех пор, пока не будет явно удален. Примерами этого класса объектов в различных ОС в зависимости от реализации могут быть очереди сообщений POSIX, семафоры и разделяемая память.

Квалификация каждого из объектов по времени жизни отнюдь не тривиальная задача. Объекты, отнесенные к одному классу, мигрируют в другой при переходе от одной ОС к другой в зависимости от деталей их реализации.

Проблемы завершения и особенно отмены процесса могут возникать, если процесс оперирует с объектами, время жизни которых превышает process-persistent. Мы еще много раз коснемся этой проблемы при рассмотрении завершения потоков, так как там она может возникать и в отношении всех process-persistent-объектов, и для ее разрешения в технике потоков даже предложены специальные технологии, о которых мы детально поговорим далее, при рассмотрении потоков.

 

Соображения производительности

Интересны не только затраты на порождение нового процесса (мы еще будем к ним неоднократно возвращаться), но и то, насколько «эффективно» сосуществуют параллельные процессы в ОС, насколько быстро происходит переключение контекста с одного процесса на другой. Для самой грубой оценки этих затрат создадим простейшее приложение (файл p5.cc):

Затраты на взаимное переключение процессов

#include

#include

#include

#include

#include

#include

int main(int argc, char* argv[]) {

 unsigned long N = 1000;

 if (argc > 1 && atoi(argv[1]) > 0)

 N = atoi(argv[1]);

 pid_t pid = fork();

 if (pid == -1)

  cout << "fork error" << endl, exit(EXIT_FAILURE);

 uint64_t t = ClockCycles();

 for (unsigned long i = 0; i < N; i++) sched_yield();

 t = ClockCycles() - t;

 delay(200);

 cout << pid << "\t: cycles - " << t << "; on sched - " << (t/N) / 2 << endl;

 exit(EXIT_SUCCESS);

}

Два одновременно выполняющихся процесса настолько симметричны и идентичны, что они даже не анализируют PID после выполнения fork(), они только в максимальном темпе «перепасовывают» друг другу активность, как волейболисты делают это с мячом (рис. 2.2).

Рис. 2.2. Симметричное взаимодействие потоков

Рисунок 2.2 иллюстрирует взаимодействие двух идентичных процессов: вся их «работа» состоит лишь в том, чтобы как можно быстрее передать управление партнеру. Такую схему, когда два и более как можно более идентичных потоков или процессов в максимально высоком темпе (на порядок превосходящем последовательность «естественной» RR-диспетчеризации) обмениваются активностью, мы будем неоднократно использовать в дальнейшем для различных механизмов, называя ее для простоты «симметричной схемой».

Примечание

Чтобы максимально упростить код приложения, при его написании мы не трогали события «естественной» диспетчеризации, имеющие место при RR-диспетчеризации каждые 4 системных тика (по умолчанию это ~4 миллисекунды). Как сейчас покажут результаты, события принудительной диспетчеризации происходят с периодичностью порядка 1 микросекунды, т.e. в 4000 раз чаще, и возмущения, возможно вносимые RR-диспетчеризацией, можно считать не настолько существенными.

Вот результаты выполнения этой программы:

# nice -n-19 p5 1000000

1069102 : cycles - 1234175656; on sched — 617

0       : cycles - 1234176052; on sched - 617

# nice -n-19 p5 100000

1003566 : cycles - 123439225; on sched — 617

0       : cycles - 123440347; on sched - 617

# nice -n-19 p5 10000

1019950 : cycles - 12339084; on sched — 616

0       : cycles - 12341520; on sched - 617

# nice -n-19 p5 1000

1036334 : cycles - 1243117; on sched — 621

0       : cycles - 1245123; on sched - 622

# nice -n-19 p5 100

1052718 : cycles - 130740; on sched — 653

0       : cycles - 132615; on sched - 663

Видна на удивление устойчивая оценка, практически не зависящая от общего числа актов диспетчеризации, изменяющегося на 4 порядка.

Отбросив мелкие добавки, привносимые инкрементом и проверкой счетчика цикла, можно считать, что передача управления от процесса к процессу требует порядка 600 циклов процессора (это порядка 1,2 микросекунды на компьютере 533 МГц, на котором выполнялся этот тест).

 

Потоки

 

Последующие расширения POSIX специфицируют широкий спектр механизмов «легких процессов» — потоков (группа API pthread_*()). Техника потоков вводит новую парадигму программирования вместо уже ставших традиционными UNIX-методов. Это обстоятельство часто недооценивается. Например, использование pthread_create() вместо fork() может на порядки повысить скорость реакций, особенно в ОС с отсутствием механизмов COW (copy on write) при создании дубликатов физических страниц RAM сегментов данных (таких как QNX, хотя механизмы COW вряд ли вообще применимы в ОС реального времени) [4]. Другой пример: использование множественных потоков вместо ожиданий на множестве дескрипторов в операторе select().

Однако очень часто эти две парадигмы, традиционная и потоковая, не сочетаются в рамках единого кода из-за небезопасности (not thread safe) традиционных механизмов UNIX (fork(), select() и др.) в многопоточной среде. Тогда приходится использовать либо одну, либо другую парадигму как альтернативы, не смешивая их между собой. Или смешивать, но с большой осторожностью и с хорошим пониманием того, что при этом может произойти в каждом случае.

Поток можно понимать как любой автономный последовательный (линейный) набор команд процессора. Источником этого линейного кода для потока могут служить:

• бинарный исполняемый файл, на основе которого системой или вызовом группы spawn() запускается новый процесс и создается его главный поток;

• дубликат кода главного потока процесса родителя при клонировании процессов вызовом fork() (тоже относительно главного потока);

• участок кода, оформленный функцией специального типа (void*()(void*)); это общий случай при создании второго и всех последующих потоков процесса (при создании многопоточных процессов) вызовом pthread_create(). Такую функцию мы будем называть функцией потока. Это наиболее интересный для нас случай.

В первых двух вариантах мы имеем неявное создание (главного) потока и, как следствие, порождение нового процесса. В последнем случае - явное создание потока, которое в литературе, собственно, и именуется «созданием потока». Хотя сущность происходящего относительно исполняющегося потока во всех случаях все же остается неизменной.

Кроме последовательности команд к потоку нужно отнести и те локальные данные, с которыми работает функция потока, то есть собственный стек потока. Во время приостановки системой выполнения (диспетчеризации) кода текущего потока должна обеспечиваться возможность сохранения текущих значений регистров (включая регистры FPU, сегментные регистры) и, возможно, другой специфической информации. Текущее значение этого набора данных, относящихся к выполнению текущего потока, называется контекстом потока. Контекст потока, кроме того, обеспечивает связь потока с его экземпляром собственных данных, о чем мы детально поговорим чуть позже. Детальная структура и объем данных, составляющих контекст потока, определяются не только самой ОС, но и типом процессорной архитектуры, на которой она выполняется (для многоплатформенных ОС, к которым принадлежит и QNX).

В принципе считается, что время переключения контекстов потоков в пределах одного процесса и время переключения контекстов процессов могут заметно отличаться, особенно для платформ с управлением виртуальной памятью. Однако удобства реализации и стремление к однородности могут перевесить соблазн разработчиков ОС использовать это различие, что мы вскоре и увидим в отношении QNX.

Идентификатором потока (значимым только внутри одного процесса!) является TID (Thread ID), присваиваемый потоку при его создании вызовом pthread_create(). TID позволяет процессу (а также системе в рамках процесса) однозначно идентифицировать каждый поток. Нумерация TID в QNX начинается с 1 (это всегда главный поток процесса, порожденный main()) и последовательно возрастает по мере создания потоков (до 32767).

Еще одним важнейшим атрибутом потока является приоритет его выполнения. Для каждого из уровней приоритетов, обслуживаемых системой (в QNX 6.2.1 таких уровней 64, в QNX 6.3 — 256), поддерживается циклическая очередь потоков, готовых к исполнению (на деле большая часть из таких очередей оказывается пустой). Все политики диспетчеризации работают только с потоками из одной такой очереди: очереди потоков наивысшего из присутствующих в системе приоритетов. Если в системе выполняется поток высокого приоритета, то ни один поток более низкого приоритета не получит управление до тех пор, пока поток высокого приоритета не будет переведен в блокированное состояние в ожидании некоторого события (рис. 2.3).

Рис. 2.3. Диспетчеризация потоков с различными приоритетами

На рис. 2.3 представлены два процесса, каждый из которых создает внутри себя несколько потоков, но на этот раз различных приоритетов (10 и 12). Жирной пунктирной линией показан порядок, в котором потоки высокого приоритета (12) объединены в циклическую очередь диспетчеризации. Это активная очередь диспетчеризации (наивысшего приоритета). Тонкой линией показан порядок потоков в другой очереди (приоритета 10). До тех пор пока все потоки активной очереди не окажутся в силу каких-либо обстоятельств в блокированном состоянии, ни один из потоков очереди приоритета 10 не получит ни единого кванта времени.

 

Создание нового потока

 

Создание нового потока в программном коде осуществляет вызов:

int pthread_create(pthread_t* thread,

 const pthread_attr_t* attr, void*(*start_routine)(void*), void* arg);

где thread — NULL или указатель переменной типа pthread_t, значение которой будет загружено идентификатором созданного потока после успешного выполнения функции. Далее это значение (это и есть TID) может использоваться по тексту программы для идентификации созданного потока.

attr — NULL или указатель структуры типа pthread_attr_t. Если это значение NULL, то созданный поток будет иметь набор параметров, устанавливаемых по умолчанию. Если нет, то поток будет создан с параметрами, установленными в структуре attr. Модификация полей attr после создания потока (то есть после вызова функции) не оказывает никакого эффекта на параметры потока, и вообще говоря, структура attr может быть уничтожена сразу же после вызова pthread_create(). Документация предостерегает от прямой манипуляции значениями полей этой структуры, предлагая использовать для этого функции pthread_attr_init() и pthread_attr_set_*().

start_routine — функция типа void*()(void*), уже упоминавшаяся выше как функция потока; это тот код, который будет фактически выполняться в качестве отдельного потока. Если выполнение этой функции завершается по return, то происходит нормальное завершение потока с вызовом pthread_exit(), использующим значение, возвращаемое start_routine в качестве статуса завершения. (Исключением является поток, связанный с main(); он при завершении выполняет вызов exit().)

arg — указатель на блок данных, передаваемых start_routine в качестве входного параметра. Этот параметр подробно рассмотрен далее.

Чаще всего (однако совершенно необязательно) функция потока start_routine представляет собой бесконечный цикл, в котором выполняются некоторые действия с выходом из цикла в том случае, когда нужно завершить выполнение и уничтожить созданный поток. Выглядит это следующим образом:

// функция потока:

void* ThreadProc(void* data) {

 while (true) {

  // ... выполняется работа ...

  if (...) break;

  // после этого поток нам уже не нужен!

 }

 return NULL;

}

После успешного создания нового потока он начинает функционировать «параллельно» с породившим его потоком и другими потоками процесса (если быть совсем точными, то со всеми прочими потоками, существующими в системе, так как в QNX существует только одна стратегия диспетчеризации потоков PTHREAD_SCOPE_SYSTEM, и существует она глобально, на уровне всей системы). При этом после точки выполнения pthread_create() невозможно предсказать, какой поток получит управление: породивший, порожденный или вообще произвольный поток из другого процесса. Это важно учитывать при передаче новому потоку данных и других операциях начальной инициализации параметров внутри созданного потока.

В отличие от создаваемых параллельных процессов, рассмотренных ранее, все потоки, создаваемые в рамках одного процесса, разделяют единое адресное пространство процесса, и поэтому все переменные процесса, находящиеся в области видимости любого потока, доступны этому потоку.

 

Атрибуты потока

В коде реальных приложений очень часто можно видеть простейшую форму вызова, порождающего новый поток, в следующем виде:

pthread_create(NULL, NULL, &thread_func, NULL);

И для многих целей такого вызова достаточно, так как созданный поток будет обладать свойствами, предусмотренными по умолчанию (преимущественная часть поведенческих характеристик нового потока наследуется от его родителя). Если же нам нужно создать поток с некоторым специфическим поведением, отличающимся от поведения по умолчанию, нам следует обратиться к атрибутной записи создания потока — второму параметру вызова функции создания.

Атрибутная запись потока должна создаваться и обязательно инициализироваться вызовом pthread_attr_init() до точки порождения потока. Любые последующие изменения атрибутной записи создания потока не производят никаких изменений в поведении потока (хотя некоторые из параметров потока, определяемых атрибутной записью при его создании, могут быть изменены позже, уже в ходе выполнения потока, вызовом соответствующих функций). Таким образом, атрибутная запись потока является чисто инициализирующей структурой и может быть даже уничтожена следующим оператором после порождения этого потока.

Эффект повторной инициализации атрибутной записи не определен. Для ее повторного использования (если требуется переопределение значений параметров) должен быть предварительно выполнен вызов pthread_attr_destroy() с последующей повторной инициализацией структуры (он разрушает атрибутную запись, но без освобождения ее памяти):

pthread_attr_t* pattr = new pthread_attr_t;

for (int i = 0; i < N; i++) {

 pthread_attr_init(pattr);

 // ... разнообразные настройки для разных потоков ...

 pthread_create(NULL, pattr, &function, NULL);

 pthread_attr_destroy(pattr);

}

delete pattr;

Непосредственно манипулировать с полями атрибутной записи, адресуясь к ним по именам полей, крайне опасно. Для этого предусмотрен широкий спектр функций SET/GET:

pthread_attr_getdetachstate()

pthread_attr_setdetachstate()

pthread_attr_getguardsize()

pthread_attr_setguardsize()

pthread_attr_getinheritsched()

pthread_attr_setinheritsched()

pthread_attr_getschedparam()

pthread_attr_setschedparam()

pthread_attr_getschedpolicy()

pthread_attr_setschedpolicy()

pthread_attr_getscope()

pthread_attr_setscope()

pthread_attr_getstackaddr()

pthread_attr_setstackaddr()

pthread_attr_getstacklazy()

pthread_attr_setstacklazy()

pthread_attr_getstacksize()

pthread_attr_setstacksize()

Мы не станем подробно описывать все параметры потока, которые могут быть переопределены атрибутной записью, ведь для этого есть техническая документация QNX, а рассмотрим только наиболее интересные параметры.

 

Присоединенность

Это одно из самых интересных свойств потока, но одновременно и одно из самых сложных для понимания, поэтому есть смысл остановиться на нем более подробно. Поток может создаваться как ожидаемый (PTHREAD_CREATE_JOINABLE; таковым он и создается по умолчанию; используется также термин «присоединенный») или отсоединенный (PTHREAD_CREATE_DETACHED). Например:

pthread_attr_t attr;

pthread_attr_init(&attr);

pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

pthread_create(NULL, &attr, &function, NULL);

Присоединенный поток сохраняет некоторую связь с родителем (мы это рассмотрим, когда речь пойдет о завершения потока), в то время как отсоединенный поток после точки ветвления ведет себя как совершенно автономная сущность: после точки ветвления у родительского потока нет возможности синхронизироваться с его завершением, получить код его завершения или результат выполнения потока.

Можно ожидать завершения присоединенного потока в некотором другом потоке процесса (чаще всего именно в родительском, но это не обязательно) с помощью следующего вызова:

int pthread_join(pthread_t thread, void** value_ptr);

где thread — идентификатор TID ожидаемого потока, который возвращался как первый параметр вызова pthread_create(pthread_t* thread, ...) при его создании или был им же получен после своего создания вызовом pthread_self();

value_ptr — NULL или указатель на область данных (результата выполнения), которую завершающийся поток, возможно, захочет сделать доступной для внешнего мира после своего завершения. Этот указатель функция потока возвращает оператором return или вызовом pthread_exit().

Примечание

В API QNX присутствует родственная функция (не POSIX) pthread_timedjoin() , отличающаяся тем, что она возвратит ошибку, если синхронизация по завершению не будет достигнута в указанный интервал времени:

int pthread_timedjoin(pthread_t thread, void** value_ptr,

 const struct timespec* abstime);

Таким образом, вызов pthread_join(): а) блокирует вызывающий поток, б) является средством синхронизации потоков без использования специальных примитивов синхронизации и в) позволяет потоковой функции завершающегося потока возвратить результат своей работы в точку ожидания его завершения.

Примечание

Значение value_ptr (если оно не было указано как NULL ) указывает на возвращенный результат только при нормальном завершении потока. В случае его завершения «извне» (отмены) значение value_ptr устанавливается в PTHREAD_CANCELED (константа).

Если поток предназначен для выполнения автономной работы, не требует синхронизации и не предполагает возвращать значение, он может создаваться как отсоединенный. Поскольку таких случаев достаточно много, даже большинство (например, все множество параллельных сетевых серверов), то такое поведение потока вполне могло бы быть умалчиваемым при создании. Причина несколько ограниченного использования отсоединенных потоков относительно тех случаев, когда это может быть оправданным, состоит, скорее всего, в интуитивной боязни программистов «потерять контроль» над параллельно выполняемой ветвью, хотя зачастую этот контроль бывает чисто иллюзорным (принудительное завершение потока мы подробно рассмотрим позже).

По умолчанию потоки создаются именно как присоединенные, и это аргументируется тем обстоятельством, что такой поток всегда может сделать себя (или другой поток) отсоединенным, вызвав из своей функции потока:

int pthread_detach(pthread_t thread);

Превратить же поток, созданный как отсоединенный, в присоединенный (ожидаемый) нет никакой возможности. Таким образом, это одностороннее преобразование!

Для отсоединенного потока все задействованные им системные ресурсы освобождаются в момент его завершения, а для ожидаемого — в момент выполнения pthread_join() для этого потока из какого-либо другого активного потока.

Пример синхронизации порожденных потоков:

const int THR_NUM = 5; // число дочерних потоков

pthread_t thrarray[THR_NUM];

for (int i = 0; i < THR_NUM, i++)

 pthread_create(&thrarray[i], NULL, &thrfunc, NULL);

...

// синхронизация всех дочерних потоков:

for (int i = 0, i < THR_NUM; i++)

 pthread_join(&thrarray[i], NULL);

Здесь показана стандартная техника использования pthread_join(), вызывающая при первом знакомстве вопрос: «А что произойдет, если потоки завершатся в другом порядке, а не в той последовательности, в которой они запускались?» (порядок слежения во 2-м цикле). Но в том-то и состоит приятная особенность этой техники, что ничего не произойдет, — второй цикл является точкой синхронизации всех потоков THR_NUM, независимо от взаимного порядка их завершения.

 

Дисциплина диспетчеризации

Для дочернего потока может потребоваться установить иную по отношению к родителю дисциплину (политику) диспетчеризации (SCHED_FIFO, SCHED_RR, SCHED_SPORADIC):

pthread_attr_t attr;

pthread_attr_init(&attr);

pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);

pthread_attr_setschedpolicy(&attr, SCHED_RR);

Особенностью здесь является то, что после инициализации атрибутной записи значениями по умолчанию в параметре типа наследования атрибутной записи будет стоять PTHREAD_EXPLICIT_SCHED («наследовать от родителя»). Изменение параметров, таких как политика диспетчеризации, приоритет и др., будет иметь силу, только если мы посредством вызова pthread_attr_setinheritsched() принудительно переустановим значение типа наследования в PTHREAD_EXPLICIT_SCHED.

 

Приоритет

Пожалуй, наиболее часто приходится переопределять именно приоритет, с которым будет выполняться создаваемый поток. При запуске потока с параметрами по умолчанию его приоритет устанавливается равным приоритету порождающего потока.

Примечание

При запуске приложений из командной строки для главного потока приложения (функция main() ) значение приоритета устанавливается равным приоритету его родителя, в данном случае командного интерпретатора shell (в какой-то его конкретной реализации: ksh, bash и проч.). Приоритет командного интерпретатора, запускаемого из стартовых скриптов системы, для QNX 6.2.1, например, принимает значение 10, которое и можно квалифицировать как значение «по умолчанию». Важно только отчетливо восстановить «цепочку» возникновения этого «значения по умолчанию» (от стартовой программы, последовательно от одного родительского процесса к дочернему и так далее) и помнить, что она всегда может быть изменена. Таким образом, вся цепочка порождаемых потоков, если они порождаются без вмешательства в атрибутную запись потока, будет иметь тот же приоритет по умолчанию. Как управлять приоритетами создаваемых потоков «персонифицированно», рассказывается в этой главе. Но можно управлять приоритетами всей совокупности потоков приложения (относительно приоритетов всех прочих потоков в системе), изменяя приоритет запуска приложения и используя стандартную UNIX-команду nice . В простейшем виде это выглядит так:

# nice -nINC prog

где INC — численное значение инкремента приоритета относительно умалчиваемого, с которым требуется выполнять приложение, причем положительным инкрементам соответствует понижение приоритета, а отрицательным — повышение;

prog — имя приложения со всеми последующими его параметрами. Особенностью реализации команды nice в QNX является то, что она позволяет варьировать приоритет запускаемого приложения только в ограниченных пределах: +9 в сторону уменьшения и -19 в сторону увеличения. Это не позволяет таким простым способом запустить, например, приложение с приоритетом 0 фонового потока procnto (idle-поток) и ограничивает возможность повышения приоритета верхней границей 29 при максимально возможном значении приоритета в системе 63 (все численные значения относятся к редакции QNX 6.2.1; для QNX 6.3 диапазон допустимых значений приоритетов: 0...255). В итоге, чтобы выполнить программу myprog под приоритетом 20, фиксируя при этом время ее выполнения, необходима команда:

# nice -n-10 time myprog

Значение приоритета создаваемого потока хранится в поле param атрибутной записи (типа sched_param; подробнее эта структура будет рассмотрена при обсуждении диспетчеризации). Для переустановки значений, входящих в структуру sched_param, предоставлена функция:

int pthread_attr_setschedparam(pthread_attr_t* attr,

 const struct sched_param *param);

где attr — как и ранее, атрибутная запись потока;

param — указатель структуры sched_param, из которой параметры будут перенесены в атрибутную запись потока.

Теперь посмотрим, как запустить на выполнение поток с приоритетом на 2 единицы ниже, чем у его родителя:

int policy;

struct sched_param param;

pthread_getschedparam(pthread_self(), &policy, ¶m);

param sched_priority -= 2;

pthread_attr_t attr;

pthread_attr_init(&attr);

pthread_attr_setschedparam(&attr, ¶m);

pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);

pthread_create(NULL, &attr, &func, NULL);

Или даже так (хотя это немного грубее):

int policy;

struct sched_param param;

pthread_getschedparam(pthread_self(), &policy, ¶m);

pthread_attr_t attr;

pthread_attr_init(&attr);

attr.param.sched_priority = param.sched_priority - 2;

pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);

pthread_create(NULL, &attr, &func, NULL);

Примечание

Как и при установке политики диспетчеризации, параметры диспетчеризации потока (и приоритет в их составе) будут установлены, только если параметр типа наследования от родителя установлен в PTHREAD_EXPLICIT_SCHED посредством вызова pthread_attr_setinheritsched() .

Заметим здесь вскользь (в дальнейшем нам представится возможность использовать эти знания), что помимо «продуктивных» потоков (компонент системы и пользовательских приложений) в системе всегда существует один «паразитный» поток, запущенный с приоритетом 0 (idle-поток). Он «выбирает» весь остаток процессорного времени в те периоды, когда все имеющиеся в системе продуктивные потоки перейдут в блокированные состояния (ожидания). Подобная практика хорошо известна и реализуется также в большинстве других операционных систем.

 

Отличия от POSIX

Если следовать POSIX-стандарту, то некоторые из атрибутов невозможно переопределить до фактического создания этого стандарта (их можно изменить позже в самом коде потока, но иногда это не совсем правильное решение). Все эти возможности относятся к асинхронному завершению потока; детали функционирования этого механизма рассматриваются позже. К подобного рода атрибутам относятся:

• запретить асинхронное завершение (отмену) потока;

• установить тип завершаемости потока;

• определить, что должно происходить при доставке потоку сигналов.

QNX расширяет возможности POSIX, позволяя по условию OR установить соответствующие биты-флаги в поле flags атрибутной записи, прежде чем будет произведен вызов, создающий поток. Не существует функций вида pthread_attr_set_*(), эквивалентных этим установкам. К этим флагам относятся:

• PTHREAD_CANCEL_ENABLE — запрос на завершение будет обрабатываться в соответствии с типом завершаемости, установленным для потока (значение по умолчанию);

• PTHREAD_CANCEL_DISABLE — запросы на завершение будут отложены;

• PTHREAD_CANCEL_ASYNCHRONOUS — если завершение разрешено, отложенные или текущие запросы будут выполнены немедленно;

• PTHREAD_CANCEL_DEFERRED — если завершение разрешено, запросы на завершение будут отложены до достижения точки завершаемости (значение по умолчанию);

• PTHREAD_MULTISIG_ALLOW — завершать по сигналу все потоки в процессе (POSIX-умолчание);

• PTHREAD_MULTISIG_DISALLOW — завершать по сигналу только тот поток, который принял сигнал.

После запуска потока все атрибуты, связанные с завершаемостью потока, могут быть изменены вызовами pthread_setcancelstate() и pthread_setcanceltype().

 

Передача параметров потоку

Зачастую каждый поток из группы последовательно создаваемых потоков, выполняющих одну и ту же функцию, нужно запускать со своим индивидуальным блоком данных (параметром потока). Для этого предназначен 4-й параметр вызова pthread_create() — указатель на блок данных типа void*. Характерно, что это может быть произвольная структура данных сколь угодно сложного типа, структуризацию которой вызывающий pthread_create() код и функция потока должны понимать единообразно; никакого контроля соответствия типов на этапе вызова не производится.

Достаточно часто встречающийся на практике образец многопоточного кода — это циклическая процедура ожидания наступления некоторого условия (события), после которого порождается новый экземпляр потока, призванный обслужить наступившее событие (типичная схема всего разнообразия многопоточных сетевых серверов). В таких случаях код, порождающий потоки, выглядит подобно следующему фрагменту:

// функция потока:

void* ThreadProc(void* data) {

 // ... выполняется обработка, используя структуру *(DataParam*)data

 return NULL;

}

// порождающий потоки код:

while (true) {

 // инициализация области параметров

 struct DataParam data(...);

 if ( /* ожидаем нечто */ )

  pthread_create(NULL, &attr, &ThreadProc, &data);

}

Этот простейший код крайне опасен: при быстрых циклах и, что намного важнее, непредсказуемых моментах повторных созданий экземпляров потоков из вызывающего цикла необходимо обеспечить, чтобы используемое в функции потока ThreadProc() значение данных было адекватным. Оно может быть изменено в вызывающем коде или даже, более того, просто разрушено при выходе из локальной области видимости, как в следующем коде:

// порождающий потоки код:

while(true) {

 if ( /* ожидаем нечто */ ) {

  struct DataParam data(...);

  pthread_create(NULL, &attr, &ThreadProc, &data);

 }

 // здесь может идти достаточно длительная обработка

}

Здесь блок данных, выделяемый в стеке порождающего потока, к началу его использования в дочернем потоке может быть просто уничтожен.

Единственно надежный способ обеспечить требование актуальности передаваемых данных - это создание копии блока параметров непосредственно при входе в функцию потока, например так (если определена операция копирования):

// функция потока:

void* ThreadProc(void* data) {

 DataParam copy = *(DataParam*)data;

 // выполняется обработка, используя структуру copy

 return NULL;

}

или так (если определен инициализирующий конструктор структуры данных):

// функция потока:

void* ThreadProc(void* data) {

 DataParam copy(*(DataParam*)data);

 // ... выполняется обработка, используя структуру copy

 return NULL;

}

Но и этот код оказывается некорректен. При порождении потока нам нужно, чтобы инициализация копии переданных данных в теле функции потока произошла до того, как на очередном цикле оригинал этих данных будет разрушен или изменен. Но дисциплины диспетчеризации равнозначных потоков (в данном случае родительского и порожденного) в операционной системе никак не регламентируют (и не имеют права этого делать!) порядок их выполнения после точки ветвления — pthread_create().

Обеспечить актуальность копии переданных данных можно несколькими искусственными способами:

1. Передачей в качестве аргумента pthread_create() специально сделанной ранее временной копии экземпляра данных, например:

if ( /* нечто */ ) {

 // static обеспечивает неразрушаемость

 static struct DataParam copy;

 copy = data;

 pthread_create(NULL, &attr, &ThreadProc, ©);

}

Этот способ иногда хорошо «срабатывает» для данных типа символьных строк, представленных в стандарте языка С (однако используется он не часто):

void* ThreadProc(void *data) {

 ...

 // можно даже не делать копию - это уже копия:

 printf("%s", (char*)data);

}

...

while (true) {

 char *data = ... /* инициализация данных */;

 if ( /* нечто */ )

  pthread_create(NULL, &attr, &ThreadProc, strdup(data));

}

2. Для передачи параметра скалярного типа (char, short, int), не превышающего размер указателя, очень часто в самых разнообразных источниках [1, 3] можно увидеть такой трюк, когда указателю присваивается непосредственное значение скалярной величины:

// функция потока:

void* ThreadProc(void* data) {

 // ... выполняется обработка, используя значение параметра (char)data

 return NULL;

}

// порождающий потоки код:

while (true) {

 char data = /* инициализация параметра */;

 if ( /* ожидаем нечто */ )

  pthread_create(NULL, &attr, &ThreadProc, (void*)data);

}

Или даже так:

pthread_create(NULL, &attr, &ThreadProc, (void*)5);

pthread_create(NULL, &attr, &ThreadProc, (void*)(x + y));

Положительной стороной этого решения (которое тем не менее остается трюкачеством) является то, что параметр в ThreadProc() передается по значению, то есть неявным копированием, и любые последующие манипуляции с ним не приведут к порче переданного значения. Таким образом, в ThreadProc() нет необходимости создавать локальную копию полученного параметра.

3. Создание экземпляра данных в родительском потоке для каждого нового экземпляра создаваемого потока с гарантированным уничтожением экземпляра данных при завершении порожденного потока:

void* ThreadProc(void *data) {

 // используем экземпляр data без копирования ...

 ...

 delete data;

 return NULL;

}

...

if ( /* нечто */ ) {

 // создание экземпляра вместе с инициализацией

 // (предполагаем, что для DataParam ранее определен

 // копирующий конструктор):

 pthread_create(NULL, &attr, &ThreadProc, new DataParam(data));

}

Это один из самых безошибочно срабатывающих способов, и единственным его недостатком является то, что объекты создаются в одной структурной единице (родителе), а уничтожаться должны в другой (потомке), которые иногда даже размещаются в различных файлах программного кода, а ошибки с парностью операций над динамической памятью обходятся очень дорого.

4. «Ручной» вызов диспетчеризации в порождающем потоке, по крайней мере при дисциплине по умолчанию для QNX — round-robin:

if ( /* нечто */ ) {

 pthread_create(NULL, &attr, &ThreadProc, &data);

 sched_yield();

}

Мы не можем произвольно изменять последовательность выполнения потоков (чем нарушили бы принципы диспетчеризации) и не можем утверждать, что при наличии многих потоков именно только что порожденный поток получит управление. Но после выполнения sched_yield() мы можем гарантировать, что родительский поток будет помещен именно в хвост очереди потоков равных приоритетов, готовых к исполнению, и его активизация произойдет позже всех наличных в системе потоков, в том числе и последнего порожденного.

Примечание

В этом месте внимательный читатель вправе оживиться: «Обманывают, обвешивают…». Да, описываемое здесь экзотическое решение не совсем корректно с позиции уже упоминавшегося определения Э. Дейкстры «слабосвязанных процессов» и независимости результата от относительных скоростей: в SMP-системе при количестве процессоров, большем, чем количество параллельных потоков, это решение не будет работать так, как мы ему предписываем. Но к настоящему времени такое «стечение обстоятельств» может быть либо чисто теоретически умозрительным, либо возникать на экспериментальных единичных образцах SMP, содержащих десятки и сотни процессоров…, но где QNX, насколько нам известно, не используется.

В этом варианте и в порожденном потоке можно симметрично сделать так:

void* ThreadProc(void *data) {

 struct DataParam copy(*data);

 sched_yield();

 ...

}

Примечание

Иногда для выражения этой техники используется и такая, в общем несколько небрежная, форма записи:

pthread_create(NULL, &attr, &ThreadProc, &data);

delay(1); // вместо sched_yield()

Фокус здесь состоит не в том, что 1 миллисекунда — это время, заведомо достаточное для копирования экземпляра данных, а в том, что POSIX определяет, что операция delay() (а также все родственные ей функции: sleep() , nanosleep() и другие функции пассивной задержки) является операцией пассивного ожидания и должна сопровождаться принудительной диспетчеризацией.

5. Создание потока с приоритетом выше, чем родительский, с последующим возвратом его приоритета на прежний уровень после выполнения требуемой инициализации копии:

void* ThreadProc(void* data) {

 struct sched_param param;

 int policy;

 pthread_getschedparam(pthread_self(), &policy, ¶m);

 param.sched_priority -= 2;

 // инициализация копии блока данных

 ...

 pthread_setschedparam(pthread_self(), policy, ¶m);

 ...

 return NULL;

}

...

if ( /* нечто */ ) {

 pthread_attr_t attr;

 pthread_attr_init(&attr);

 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

 pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);

 pthread_attr_setschedpolicy(&attr, SCHED_RR);

 int policy;

 struct sched_param param;

 pthread_getschedparam(pthread_self(), &policy, ¶m);

 attr.param.sched_priority = param.sched_priority + 2;

 pthread_create(NULL, &attr, &ThreadProc, &data);

}

Здесь в точке создания порожденный поток сразу же вытесняет своего родителя и выполняет инициализацию копии области параметров, после чего возвращается к нормальной (с равными приоритетами) диспетчеризации. Этот вариант может показаться искусственно усложненным, но отлично вписывается своими побочными достоинствами в создание многопоточных GUI-приложений для графической подсистемы Photon.

 

Данные потока

 

В реальном коде часто возникает ситуация, когда одновременно исполняются несколько экземпляров потоков, использующих один и тот же код (при создании потоков указывается одна и та же функция потока). При этом некоторые данные (например, статические объекты, глобальные объекты программного файла, объявленные вне функции потока) будут представлены для различных экземпляров потока в виде единого экземпляра данных, а другие (блок параметров функции потока, локальные данные функции потока) будут представлять собой индивидуальные экземпляры для каждого потока:

class DataBlock {

 DataBlock(void);

 DataBlock(DataBlock&);

}

DataBlock A;

void* ThreadProc(void *data) {

 static DataBlock B;

 DataBlock C, D(*(DataBlock*)data);

 ...

 delete data;

 return NULL;

}

...

for(int i = 0; i < N; i++ ) {

 DataBlock E;

 // ... обработка и заполнение E ...

 pthread_create(NULL, NULL, &ThreadProc, new DataBlock(E));

}

В этом простейшем фрагменте кода N потоков разделяют единые экземпляры данных А и В: любые изменения, сделанные в данных потоком i, будут видимы потоку j, если, конечно, корректно выполнена синхронизация доступа к данным и потоки «совместными усилиями» не разрушат целостность блока данных. Другие блоки данных, С и D, представлены одним изолированным экземпляром на каждый поток, и никакие изменения, производимые потоком в своем экземпляре данных, не будут видны другим потокам.

Подобные эффекты не возникают в однопотоковых программах, а если они не учитываются и возникают спонтанно, то порождают крайне трудно выявляемые ошибки. Очень часто такие ошибки возникают после преобразования корректных последовательных программ в потоковые. Рассмотрим простейший фрагмент кода:

int M = 0;

void Func_2(void) {

 static int С = 0;

 M += 2;

 C++;

 M -= 2;

}

void Func_1(void) { Func_2(); }

void* ThreadProc(void *data) {

 Func_1();

 return NULL;

}

...

for (int i = 0; i < N; i++)

 pthread_create(NULL, NULL, &ThreadProc, NULL);

Можно ли здесь утверждать, что переменная M сохранит нулевое значение, а переменная С действительно является счетчиком вызовов и ее результирующее значение станет N? Ни в коей мере: после выполнения такого фрагмента в переменных может быть все что угодно. Но цепочка вызовов Func_1()->Func_2() может быть сколь угодно длинной, описание Func_2() может находиться совершенно в другом файле кода (вместе с объявлением переменной M!) и, наконец, Func_2() в нашей транскрипции может быть любой функцией из библиотек C/C++, писавшейся лет 15 назад и содержащей в своем теле статические переменные!

POSIX.1 требует, чтобы определенные в нем функции были максимально безопасными в многопоточной среде. Но переработка всех библиотек - трудоемкий и длительный процесс. API QNX (и так поступили производители многих POSIX-совместимых ОС) для потенциально небезопасных в многопоточной среде функций ввели их эквиваленты, отличающиеся суффиксом «_r», например: localtime() — localtime_r(), rand() — rand_r() и т.д. Принципиально небезопасна в многопоточной среде одна из самых «любимых» в UNIX функция — select().

 

Собственные данные потока

Описанной выше схеме общих данных приложения и локальных данных потока, достаточных для большинства «ординарных» приложений, все-таки определенно не хватает гибкости, покрывающей все потребности. Поэтому в расширениях POSIX реального времени вводится третий специфичный механизм создания и манипулирования с данными в потоке — собственные данные потока (thread-specific data). Использование собственных данных потока — самый простой и эффективный способ манипулирования данными, представленными индивидуальными экземплярами данных для каждого потока.

Согласно POSIX операционная система должна поддерживать ограниченное количество объектов собственных данных (POSIX.1 требует, чтобы этот предел не превышал 128 объектов на каждый процесс). Ядром системы поддерживается массив из этого количества ключей (тип pthread_key_t; это абстрактный тип, и стандарт предписывает не ассоциировать его с некоторым значением, но реально это небольшие целые значения, и в таком виде вся схема гораздо проще для понимания). Каждому ключу сопоставлен флаг, отмечающий, занят этот ключ или свободен, но это внутренние детали реализации, не доступные программисту. Кроме того, при создании ключа с ним может быть связан адрес функции деструктора, которая будет вызываться при завершении потока и уничтожении его экземпляра данных (рис. 2.4).

Рис. 2.4. Ключи экземпляров данных

Когда поток вызывает pthread_key_create() для создания нового типа собственных данных, система разыскивает первое незанятое значение ключа и возвращает его значение (0...127). Для каждого потока процесса (в составе описателя потока) хранится массив из 128 указателей (void*) блоков собственных данных, и по полученному ключу поток, индексируя этот массив, получает доступ к своему экземпляру данных, ассоциированных со значением ключа. Начальные значения всех указателей блоков данных - NULL, а фактическое размещение и освобождение блоков данных выполняет пользовательская программа (рис. 2.5).

Рис. 2.5. Поток и его собственные данные

На рис. 2.5 представлен массив структур, создаваемый в единичном экземпляре для каждого процесса библиотекой потоков. Каждый элемент ключа должен быть предварительно инициализирован вызовом pthread_key_create() (однократно для всего процесса). Каждый инициализированный элемент массива определяет объекты единого класса во всех использующих их потоках, поэтому для них здесь же определяется деструктор (это в терминологии языка С!). Деструктор — единый для экземпляров данных в каждом потоке. Даже для инициализированного и используемого ключа в качестве деструктора может быть указан NULL, при этом никакие деструктивные действия при завершении потока не выполняются.

После размещения блока программа использует вызов pthread_setspecific(). Для связывания адреса своего экземпляра данных с элементом массива указателей, индексируемого ключом. В дальнейшем каждый поток использует pthread_getspecific() для доступа именно к своему экземпляру данных. Это схема, а теперь посмотрим, как она работает.

Положим, что нам требуется создать N параллельно исполняющихся идентичных потоков (использующих единую функцию потока), каждый из которых предполагает работать со своей копией экземпляра данных типа DataBlock:

class DataBlock {

 ~DataBlock() { ... }

 ...

};

void* ThreadProc(void *data) {

 // ... здесь будет код, который мы рассмотрим

 return NULL;

}

...

for (int i = 0; i < N; i++)

 pthread_create(NULL, NULL, &ThreadProc, NULL);

Последовательность действий потока выглядит следующим образом:

1. Поток запрашивает pthread_key_create() — создание ключа для доступа к блоку данных DataBlock. Если потоку необходимо иметь несколько (m) блоков собственных данных различной типизации (и различного функционального назначения): DataBlock_1, DataBlock_2 … DataBlock_m, то он запрашивает значения ключей соответствующее число раз для каждого типа (m).

2. Неприятность здесь состоит в том, что запросить значение ключа для DataBlock должен только первый пришедший к этому месту поток (когда ключ еще не распределен). Последующие потоки, достигшие этого места, должны только воспользоваться ранее распределенным значением ключа для типа DataBlock. Для разрешения этой сложности в систему функций собственных данных введена функция pthread_once().

3. После этого каждый поток (как создавший ключ, так и использующий его) должен запросить по pthread_getspecific() адрес блока данных и, убедившись, что это NULL, динамически распределить область памяти для своего экземпляра данных, а также зафиксировать по pthread_setspecific() этот адрес в массиве экземпляров для дальнейшего использования.

4. Дальше поток может работать с собственным экземпляром данных (отдельный экземпляр на каждый поток), используя для доступа к нему pthread_getspecific().

5. При завершении любого потока система уничтожит и его экземпляр данных, вызвав для него деструктор, который был установлен вызовом pthread_key_create(), единым для всех экземпляров данных, ассоциированных с этим значением ключа.

Теперь запишем это в коде, заодно трансформировав в новую функцию ThreadProc() код ранее созданной версии этой же функции SingleProc() для исполнения в одном потоке, не являющийся реентерабельным и безопасным в многопоточной среде. (О вопросах реентерабельности мы обязательно поговорим позже.)

void* SingleProc(void *data) {

 static DataBlock db( ... );

 // ... операции с полями DataBlock

 return NULL;

}

Примечание

To, что типы параметров и возвращаемое значение SingleProc() «подогнаны» под синтаксис ее более позднего эквивалента ThreadProc() , не является принципиальным ограничением - входную и выходную трансформации форматов данных реально осуществляют именно в многопоточном эквиваленте. Нам здесь важно принципиально рассмотреть общую формальную технику трансформации нереентерабельного кода в реентерабельный.

Далее следует код SingleProc(), преобразованный в многопоточный вид:

static pthread_key_t key;

static pthread_once_t once = PTHREAD_ONCE_INIT;

static void destructor(void* db) {

 delete (DataBlock*)db;

}

static void once_creator(void) {

 // создается единый на процесс ключ для данных DataBlock:

 pthread_key_create(&key, destructor);

}

void* ThreadProc(void *data) {

 // гарантия того, что ключ инициализируется только 1 раз на процесс!

 pthread_once(&once, once_creator);

 if (pthread_getspecific(key) == NULL)

  pthread_setspecific(key, new DataBlock(...));

 // Теперь каждый раз в теле этой функции или функций, вызываемых

 // из нее, мы всегда можем получить доступ к экземпляру данных

 DataBlock* pdb = pthread_getspecific(key);

 // ... все те же операции с полями pdb->(DataBlock)

 return NULL;

}

Примечание

Обратите внимание, что вся описанная техника преобразования потоковых функций в реентерабельные (как и все программные интерфейсы POSIX) отчетливо ориентирована на семантику классического С, в то время как все свое изложение мы ориентируем и иллюстрируем на С++. При создании экземпляра собственных данных полностью разрушается контроль типизации: разные экземпляры потоков вполне могли бы присвоить своим указателям данные (типа v oid* ), ассоциированные с одним значением key . Это совершенно различные типы данных, скажем DataBlock_1* и DataBlock_2* . Но проявилось бы это несоответствие только при завершении функции потока и уничтожении экземпляров данных, когда к объектам совершенно разного типа был бы применен один деструктор, определенный при выделении ключа. Ошибки такого рода крайне сложны в локализации.

Особая область, в которой собственные данные потока могут найти применение и где локальные (стековые) переменные потока не могут быть использованы, — это асинхронное выполнение фрагмента кода в контексте потока, например при получении потоком сигнала.

Еще одно совсем не очевидное применение собственных данных потока (мы не встречали в литературе упоминаний о нем), которое особо органично вписывается в использование именно С++, — это еще один способ возврата в родительский поток результатов работы дочерних. При этом неважно, как были определены дочерние потоки - как присоединенные или как отсоединенные (мы обсуждали это ранее); такое использование в заметной мере нивелирует их разницу. Эта техника состоит в том, что:

• Если при создании ключа не определять деструктор экземпляра данных потока pthread_key_create(..., NULL), то при завершении потока над экземпляром его данных не будут выполняться никакие деструктивные действия и созданные потоками экземпляры данных будут существовать и после завершения потоков.

• Если к этим экземплярам данных созданы альтернативные пути доступа (а они должны быть в любом случае созданы, так как области этих данных в конечном итоге нужно освободить), то благодаря этому доступу порождающий потоки код может использовать данные, «оставшиеся» как результат выполнения потоков.

В коде (что гораздо нагляднее) это может выглядеть так (код с заметными упрощениями взят из реального завершенного проекта):

// описание экземпляра данных потока

struct throwndata {

 ...

};

static pthread_once_t once = PTHREAD_ONCE_INIT;

static pthread_key_t key;

void createkey(void) { pthread_key_create(&key, NULL); }

// STL-очередь, например указателей на экземпляры данных

queue result;

// функция потока

void* GetBlock(void*) {

 pthread_once(&once, createkey);

 throwndata *td;

 if ((td = (throwndata*)pthread_getspecific(key)) == NULL) {

  td = new throwndata();

  pthread_setspecific(key, (void*)td);

  // вот он - альтернативный путь доступа:

  result.push(td);

 }

 // далее идет плодотворная работа над блоком данных *td

 // . . . . . . . . .

}

int main(int argc, char **argv) {

 // . . . . . .

 for (int i = 0; i < N; i++)

  pthread_create(NULL, NULL, GetBlock, NULL);

 // . . . . . . к этому времени потоки завершились;

 // ни в коем случае нельзя помещать result.size()

 // непосредственно в параметр цикла!

 int n = result.size();

 for (int i = 0; i < n; i++) {

  throwndata *d = result.front();

  // обработка очередного блока *d ...

  result pop();

  delete d;

 }

 return EXIT_SUCCESS;

}

Примечание

В предыдущих примерах кода мы указывали третий параметр pthread_create() в виде &GetBlock (адреса функции потока), но в текущем примере мы сознательно записали GetBlock . И то и другое верно, ибо компилятор достаточно умен, чтобы при указании имени функции взять ее адрес.

Собственные данные потоков — это настолько гибкий механизм, что он может таить в себе и другие, еще не используемые техники применения.

 

Безопасность вызовов в потоковой среде

Рассмотрев «в первом приближении» технику собственных данных потоков, мы теперь готовы ответить на вопрос: «В чем же главное предназначение такой в общем-то достаточно громоздкой техники? И зачем для ее введения потребовалось специально расширять стандарты POSIX?» Самое прямое ее предназначение, помимо других «попутных» применений, которые были обсуждены ранее, — это общий механизм превращения существующей функции для однопотокового исполнения в функцию, безопасную (thread safe) в многопоточном окружении. Этот механизм предлагает единую (в смысле «единообразную», а не «единственно возможную») технологию для разработчиков библиотечных модулей.

Примечание

ОС QNX, заимствующая инструментарий GNU-технологии (gcc, make, …), предусматривает возможность построения как статически связываемых библиотек (имена файлов вида xxx.a ), так и разделяемых или динамически связываемых (имена файлов вида xxx.so ). Целесообразность последних при построении автономных и встраиваемых систем (на что главным образом и нацелена ОС QNX) достаточно сомнительна. Однако высказанное выше положение о построении реентерабельных программных единиц относится не только к библиотечным модулям (как статическим, так и динамическим) в традиционном понимании термина «библиотека», но и охватывает куда более широкий спектр возможных объектов и в той же мере относится и просто к любым наборам утилитных объектных модулей (вида xxx.о ), разрабатываемых в ходе реализации под целевой программный проект.

Если мы обратимся к технической документации API QNX (аналогичная картина будет и в API любого UNIX), то заметим, что только небольшая часть функций отмечена как thread safe. К «небезопасным» отнесены такие общеизвестные вызовы, как select(), rand() и readln(), а многим «небезопасным» в потоковой среде вызовам сопутствуют их безопасные дубликаты с суффиксом *_r в написании имени функции, например MsgSend() — MsgSend_r().

В чем же состоит небезопасность в потоковой среде? В нереентерабельности функций, подготовленных для выполнения в однопоточной среде, в первую очередь связанной с потребностью в статических данных, хранящих значение от одного вызова к другому. Рассмотрим классическую функцию rand(), традиционно реализуемую в самых разнообразных ОС примерно так (при «удачном» выборе констант А, В, С):

int rand(void) {

 static int x = rand_init();

 return x = (A*x + B)%C;

}

Такая реализация, совершенно корректная в последовательной (однопотоковой) модели, становится небезопасной в многопоточной: а) вычисление x может быть прервано событием диспетчеризации, и не исключено, что вновь получивший управление поток в свою очередь обратится к rand() и исказит ход текущего вычисления; б) каждый поток «хотел бы» иметь свою автономную последовательность вычислений x, не зависящую от поведения параллельных потоков. Желаемый результат будет достигнут, если каждый поток будет иметь свой автономный экземпляр переменной x, что может быть получено двумя путями:

1. Изменить прототип объявления функции:

int rand_r(int *x) {

 return x = (А * (*x) + В) % С;

};

При этом проблема «клонирования» переменной x в каждом из потоков (да и начальной ее инициализации) не снимается, она только переносится на плечи пользователя, что, однако, достаточно просто решается при создании потоковой функции за счет ее стека локальных переменных:

void* thrfunc(void*) {

 int x = rand_init();

 ... = rand_r(&x);

};

Именно такова форма и многопоточного эквивалента в API QNX — rand_r().

2. В этом варианте мы сохраняем прототип описания функции без изменений за счет использования различных экземпляров собственных данных потока. (Весь приведенный ниже код размещен в отдельной единице компиляции; все имена, за исключением rand(), невидимы и недоступны из точки вызова, что подчеркнуто явным использованием квалификатора static.)

static pthread_key_t key;

static pthread_once_t once = PTHREAD_ONCE_INIT;

static void destr(void* db) { delete x; }

static void once_creator(void) { pthread_key_create(&key, destr); }

int rand(void) {

 pthread_once(&once, once_creator);

 int *x = pthread_getspecific(key);

 if (x == NULL) {

  pthread_setspecific(key, x = new int);

  *x = rand_init();

 }

 return x = (A * (*x) + B) % C;

}

В этом варианте, в отличие от предыдущего, весь код вызывающего фрагмента при переходе к многопоточной реализации остается текстуально неизменным:

void* thrfunc(void*) {

 // ...

 while (true) {

  ... = rand(x);

 }

}

Перевод всего программного проекта на использование потоковой среды состоит в замене объектной единицы (объектного файла, библиотеки), содержащей реализацию rand(), и новой сборке приложения с этой объектной единицей.

При таком способе изменяются под потоковую безопасность и стандартные общеизвестные библиотечные функции API, написанные в своем первозданном виде 25 лет назад… (по крайней мере, так предлагает это делать стандарт POSIX, вводящий в обиход собственные данные потоков).

 

Диспетчеризация потоков

 

Каждому потоку, участвующему в процессе диспетчеризации, соответствует экземпляр структуры, определенной в файле , в котором находятся все фундаментальные для ОС QNX определения:

struct sched_param {

 _INT32 sched_priority;

 _INT32 sched_curpriority;

 union {

  _INT32 reserved[8];

  struct {

   _INT32 __ss_low_priority;

   _INT32 __ss_max_repl;

   struct timespec __ss_repl_period;

   struct timespec __ss_init_budget;

  } __ss;

 } __ss_un;

};

#define sched_ss_low_priority __ss_un.__ss.__ss_low_priority

#define sched_ss_max_repl     __ss_un.__ss.__ss_max_repl

#define sched_ss_repl_period  __ss_un.__ss.__ss_repl_period

#define sched_ss_init_budget  __ss_un.__ss.__ss_init_budget

Все, что определяется внутри union __ss_un, имеет отношение только к спорадической диспетчеризации (спорадическая диспетчеризация была введена значительно позже других, и ей будет уделено достаточно много внимания). Для всех остальных типов диспетчеризации потока это поле заполняется фиктивным полем reserved, и именно так в укороченном виде) определялась структура диспетчеризации в версии QNX 6.1.

Сейчас нас интересуют начальные поля этой структуры, не зависящие от типа диспетчеризации потока:

sched_priority — статический приоритет, который присваивается потоку при его создании и который может быть программно изменен по ходу выполнения потока;

sched_curpriority — текущий приоритет, с которым выполняется (и согласно которому диспетчеризируется) данный поток в текущий момент времени. Это значение приоритета образуется системой на основе заданного статического приоритета, но оно может динамически изменяться системой, например при отработке дисциплин наследования приоритетов или граничных приоритетов для потока. Программа не имеет средств воздействия на это значение, но может его считывать.

Еще раз подчеркнем достаточно очевидную вещь: дисциплина диспетчеризации определяется относительно потока и на уровне потока (но не процесса). Проследить за дисциплиной диспетчеризации (и убедиться в справедливости утверждения предыдущей фразы) можно командой pidin. Вот несколько строк ее вывода, относящиеся к составным частям самой системы:

pid tid name              prio STATE     Blocked

 1   1  6/boot/sys/procnto 0f  READY

 1   2  6/boot/sys/procnto 10r RUNNING

...

 1   5  6/boot/sys/procnto 63r RECEIVE   1

...

 1   9  6/boot/sys/procnto 6r  NANOSLEEP

...

 6   1  roc/boot/devb-eide 10o SIGWAITINFO

В поле prio указывается приоритет (текущий; возможно, последнее из унаследованных значений!) каждого потока с установленной для него дисциплиной диспетчеризации: f — FIFO, r — RR, o — OTHER, s — SPORADIC.

В системе на сегодняшний день реализованы три дисциплины диспетчеризации: очередь потоков равных приоритетов (FIFO — first in first out; еще в ходу термин «невытесняющая»), карусельная (RR — round-robin) и спорадическая. Рассмотрим фрагмент их определения в файле :

#if defined(__EXT_QNX)

#define SCHED_NOCHANGE  0

#endif

#define SCHED_FIFO      1

#define SCHED_RR        2

#define SCHED_OTHER     3

#if defined(__EXT_QNX)

#define SCHED_SPORADIC  4 /* Approved 1003.1d D14 */

#define SCHED_ADJTOHEAD 5 /* Move to head of ready queue */

#define SCHED_ADJTOTAIL 6 /* Move to tail of ready queue */

#define SCHED_MAXPOLICY 6 /* Maximum valid policy entry */

#endif

Все дисциплины диспетчеризации, кроме спорадической, достаточно полно описаны в литературе [1], поэтому мы лишь перечислим их отличительные особенности:

1. FIFO — это та дисциплина диспетчеризации, которая в литературе по Windows 3.1/3.11 называлась «невытесняющей многозадачностью» (или «кооперативной»). Здесь выполнение потока не прерывается потоками равного приоритета до тех пор, пока сам поток «добровольно» не передаст управление, например вызовом sched_yield() (часто для этой цели используется косвенный эффект вызовов delay(), sleep() и им подобных). В других источниках такой способ диспетчеризации называют очередями потоков равных приоритетов.

2. RR — это та дисциплина диспетчеризации, которая в Windows 98/NT/XP именуется «вытесняющей многозадачностью»; еще в литературе для нее используется термин «режим квантования времени».

Поток работает непрерывно только в течение предопределенного кванта времени. (В нескольких местах документации утверждается, что значение этого кванта времени составляет 4 системных тика (time-slice), что в QNX 6.2.1 по умолчанию составляет 4 миллисекунды, и только в одном месте документации говорится, что квант диспетчеризации составляет 50 миллисекунд; это определенное разночтение. Справедливым является именно первое утверждение.)

После истечения отведенного ему кванта времени поток вытесняется потоком равного приоритета (при отсутствии других потоков этим новым потоком может быть и только что вытесненный, то есть его выполнение будет продолжено, но передиспетчеризация тем не менее происходит). Установленный квант времени диспетчеризации может быть получен вызовом (стандарт POSIX 1003.1):

#include

int sched_rr_get_interval(pid_t pid, struct timespec* interval);

где pid — это PID процесса, для которого определяется квант времени, как и для многих других подобных функций. Если PID = 0, вызов относится к текущему процессу;

interval — указатель на структуру timespec (стандарт POSIX 1003.1):

#include

struct timespec {

 time_t tv_sec;  // значение секунд

 long   tv_nsec; // значение наносекунд

}

При успешном выполнении функция sched_rr_get_interval() возвращает 0, в противном случае -1.

Примечание

Две другие функции, часто удобные для работы со структурой timespec :

#include <time.h>

void nsec2timespec(struct timespec *timespec_p, _uint64 nsec);

— это преобразование интервала, выраженного в наносекундах (nsec), в структуру timespec («выходной» параметр вызова timespec_p );

#include <time.h>

_uint64 timespec2nsec(const struct timespec* ts);

— это преобразование структуры timespec в значение, выраженное в наносекундах (это функция из native API QNX).

3. Спорадическая диспетчеризация — это гораздо более развитая форма «вытесняющей многозадачности», численные характеристики которой (время кванта, численные значения приоритетов и др.) могут детально параметризироваться и даже динамически изменяться по ходу выполнения. Подробнее спорадическая диспетчеризация рассмотрена далее.

Часто задают вопрос: «А как много потоков целесообразно делать? Не сколько снижается эффективность многопоточной программы за счет диспетчеризации потоков?» С другой стороны, в литературе часто встречаются (достаточно голословные, на качественном уровне) утверждения, что многопоточная программа будет заметно уступать в фиктивности своему последовательному (в одном потоке) эквиваленту. Проверим это на реальной задаче:

Множественные потоки в едином приложении

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

// преобразование процессорных циклов в миллисекунды:

static double cycle2milisec(uint64_t ccl) {

 const static double s2m = 1.E+3;

 // это скорость процессора

 const static uint64_t

 cps = SYSPAGE_ENTRY(qtime)->cycles_per_sec;

 return (double)ccl * s2m / (double)cps;

}

static int nsingl = 1;

// рабочая функция, которая имитирует вычисления:

void workproc(int how) {

 const int msingl = 30000;

 for (int j = 0; j < how; j++)

  for (uint64_t i=0; i < msingl * nsingl; i++)

   i = (i + 1) - 1;

}

static pthread_barrier_t bstart, bfinish;

struct interv { uint64_t s, f; };

interv *trtime;

void* threadfunc(void* data) {

 // все потоки после создания должны "застрять" на входном

 // барьере, чтобы потом одновременно "сорваться" в исполнение

 pthread_barrier_wait(&bstart);

 int id = pthread_self() - 2;

 trtime[id].s = ClockCycles();

 workproc((int)data);

 trtime[id].f = ClockCycles();

 pthread_barrier_wait(&bfinish);

 return NULL;

}

int main(int argc, char *argv[]) {

 // здесь только обработка многочисленных ключей...

 int opt, val, nthr = 1, nall = SHRT_MAX;

 while ((opt = getopt(argc, argv, "t:n:p:a:")) != -1) {

  switch(opt) {

  case 't':

   if (sscanf(optarg, "%i", &val) != 1)

    perror("parse command line failed"), exit(EXIT_FAILURE);

   if (val > 0 && val <= SHRT_MAX) nthr = val;

   break;

  case 'p':

   if (sscanf(optarg, "%i", &val) != 1)

    perror("parse command line failed"), exit(EXIT_FAILURE);

   if (val != getprio(0))

    if (setprio(0, val) == -1)

     perror("priority isn't a valid"), exit(EXIT_FAILURE);

   break;

  case 'n':

   if (sscanf(optarg, "%i", &val) != 1)

    perror("parse command line failed"), exit(EXIT_FAILURE);

   if (val > 0) nsingl *= val;

   break;

  case 'a':

   if (sscanf(optarg, "%i", &val) != 1)

    perror("parse command line failed"), exit(EXIT_FAILURE);

   if (val > 0) nall = val;

   break;

  default:

   exit(EXIT_FAILURE);

  }

 }

 // ... вот здесь начинается собственно сама программа.

 if (nthr > 1)

  cout << "Multi-thread evaluation, thread number = " << nthr;

 else cout << "Single-thread evaluation";

 cout << " , priority level: " << getprio(0) << endl;

 __clockperiod clcout;

 ClockPeriod(CLOCK_REALTIME, NULL, &clcout, 0);

 // интервал диспетчеризации - 4 периода tickslice

 // (системного тика):

 cout << "rescheduling = \t"

  << clcout.nsec * 4 / 1000000. << endl;

 // калибровка времени выполнения в одном потоке

 const int NCALIBR = 512;

 uint64_t tmin = 0, tmax = 0;

 tmin = ClockCycles();

 workproc(NCALIBR);

 tmax = ClockCycles();

 cout << "calculating = \t"

  << cycle2milisec(tmax - tmin) / NCALIBR << endl;

 // а теперь контроль времени многих потоков

 if (pthread_barrier_init(&bstart, NULL, nthr) != EOK)

  perror("barrier init"), exit(EXIT_FAILURE);

 if (pthread_barrier_init(&bfinish, NULL, nthr + 1) != EOK)

  perror("barrier init"), exit(EXIT_FAILURE);

 trtime = new interv[nthr];

 int cur = 0, prev = 0;

 for (int i = 0; i < nthr; i++) {

  // границы участков работы для каждого потока.

  cur = (int)floor((double)nall / (double)nthr * (i + 1) + .5);

  prev = (int)floor((double)nall / (double)nthr * i + 5);

  if (pthread_create(NULL, NULL, threadfunc, (void*)(cur - prev)) != EOK)

   perror("thread create"), exit(EXIT_FAILURE);

 }

 pthread_barrier_wait(&bfinish);

 for (int i=0; i < nthr; i++ ) {

  tmin = (i == 0) ? trtime[0].s : __min(tmin, trtime[i].s);

  tmax = ( i == 0 ) ? trtime[0].f : __max(tmax, trtime[i].f);

 }

 cout << "evaluation = \t"

  << cycle2milisec(tmax - tmin) / nall << endl;

 pthread_barrier_destroy(&bstart);

 pthread_barrier_destroy(&bfinish);

 delete trtime;

 exit(EXIT_SUCCESS);

}

Логика этого приложения крайне проста:

• Есть некоторая продолжительная по времени рабочая функция (workproc), выполняющая массированные вычисления.

• Многократно (это число определяется ключом запуска а) выполняется рабочая функция. Хорошо (то есть корректнее), если время ее единичного выполнения, которое задается ключом n, больше интервала диспетчеризации системы (в системе установлена диспетчеризация по умолчанию - круговая, или карусельная).

• Весь объем этой работы делится поровну (или почти поровну) между несколькими (ключ t) потоками.

• Сравниваем усредненное время единичного выполнения рабочей функции для разного числа выполняющих потоков (в выводе "calculating" — это время эталонного вычисления в одном главном потоке, a "evaluation" — время того же вычисления, но во многих потоках).

• Для того чтобы иметь еще большую гибкость, предоставляется возможность переопределять приоритет, под которым в системе все это происходит (ключ p).

Вот самая краткая сводка результатов (1-я строка вывода переносится для удобства чтения):

# t1 -n1 -t1000 -a2000

Multi-thread evaluation, thread number = 1000, priority level: 10

rescheduling = 3.99939

calculating =  1.04144

evaluation =   1.08001

# t1 -n1 -t10000 -a20000

Multi-thread evaluation, thread number = 10000, priority level: 10

rescheduling = 3.99939

calculating =  1.04378

evaluation =   1.61946

# t1 -n5 -a2000 -t1

Single-thread evaluation, priority level: 10

rescheduling = 3.99939

calculating =  5.07326

evaluation =   5.04726

# t1 -n5 -a2000 -t2

Multi-thread evaluation, thread number = 2, priority level: 10

rescheduling = 3.99939

calculating =  5.06309

evaluation =   5.04649

# t1 -n5 -a2000 -t20

Multi-thread evaluation, thread number = 20, priority level: 10

rescheduling = 3.99939

calculating =  5.06343

evaluation =   4.96956

# t1 -n5 -p51 -a512 -t1

Single-thread evaluation, priority level: 51

rescheduling = 3.99939

calculating =  4.94502

evaluation =   4.94511

# t1 -n5 -р51 -a512 -t11

Multi-thread evaluation, thread number = 11, priority level: 51

rescheduling = 3.99939

calculating =  4.94554

evaluation =   4.94549

# t1 -n5 -p51 -a512 -t111

Multi-thread evaluation, thread number = 111, priority level: 51

rescheduling = 3.99939

calculating =  5.02755

evaluation =   4.94487

# t1 -n5 -p51 -a30000 -t10000

Multi-thread evaluation, thread number = 10000, priority level: 51

rescheduling = 3.99939

calculating =  4.94575

evaluation =   5.31224

Краткий и, возможно, несколько парадоксальный итог этого теста может звучать так: при достаточно высоком уровне приоритета (выше 12–13, когда на его выполнение не влияют процессы обслуживания клавиатуры, мыши и др.) время выполнения в «классическом» последовательном коде и в многопоточном коде (где несколько тысяч потоков!) практически не различаются. Различия не более 8%, причем в обе стороны, что мы склонны считать «статистикой эксперимента». К обсуждению этого якобы противоречащего здравому смыслу феномена мы еще вернемся.

А пока посмотрим на текст примера, что и является нашей главной дачей. Обсуждаемое приложение вполне работоспособно в QNX с большой вероятностью в большинстве других UNIX-систем, но в Linux оно завершится аварийно. Причина этого кроется в операторах

int id = pthread_self() - 2;

trtime[id].s = ...

Это дает повод лишний раз обратиться к вопросу «POSIX-совместимости». POSIX описывает, что TID потока присваивается: а) в рамках процесса, которому принадлежит поток; б) начиная со значения 1, соответствующего главному потоку приложения. В Linux, выполняющем и pthread_create(), и fork() через единый системный вызов _clone() сделано небольшое «упрощение», навязанное в том числе и гонкой за повышением производительности: TID присваиваются из единого ряда PID. И сразу же «вылезает» несовместимость, ведущая к аварийному завершению показанного выше приложения. В последних редакциях ядра Linux делаются изменения по приведению механизмов параллельности к общей POSIX-модели.

Этот момент сам по себе достаточно интересен, поэтому остановимся на нем подробнее, для чего создадим простейший программный тест:

#define TCNT 10

void * test(void *in) {

 printf("pid %ld, tid %ld\n", getpid(), pthread_self());

 return NULL;

}

int main(int argc, char **argv, char **envp) {

 pthread_t tld[TCNT];

 int i, status;

 for (i=0; i < TCNT; i++) {

  status = pthread_create(&tid[i], NULL, test, NULL);

  if (status != 0)

  err(EXIT_FAILURE, "pthread_create()");

 }

 return(EXIT_SUCCESS);

}

Результаты выполнения этого теста в нескольких POSIX-совместимых ОС различны и весьма красноречивы:

$ uname -sr Linux 2.4.21-0.13mdk

$ ./test_pthread

pid 2008, tid 16386

pid 2009, tid 32771

pid 2010, tid 49156

pid 2011, tid 65541

pid 2012, tid 81926

pid 2013, tid 98311

pid 2014, tid 114696

pid 2015, tid 131081

pid 2016, tid 147466

pid 2017, tid 163851

А вот результат эволюции в направлении POSIX при переходе от ядра Linux 2.4.x к 2.6.x (алгоритм формирования TID все еще остается загадочным, но уже выполняются требования POSIX о выделении TID в рамках единого PID):

$ uname -sr Linux 2.6.3-7mdk

$ ./test_pthread

pid 13929, tid 1083759536

pid 13929, tid 1092156336

pid 13929, tid 1100549040

pid 13929, tid 1108941744

pid 13929, tid 1117334448

pid 13929, tid 1125727152

pid 13929, tid 1134119856

pid 13929, tid 1142512560

pid 13929, tid 1150905264

pid 13929, tid 1159297968

И наконец, тот же тест, выполненный в QNX 6.2.1:

# uname -a

QNX home 6.2.1 2003/01/08-14.50:46est х86рс x86

# ptid

pid 671779, tid 2

pid 671779, tid 3

pid 671779, tid 4

pid 671779, tid 5

pid 671779, tid 6

pid 671779, tid 7

pid 671779, tid 8

pid 671779, tid 9

pid 671779, tid 10

pid 671779, tid 11

 

Спорадическая диспетчеризация

Системы реального времени принципиально отличаются от систем общего назначения тем, что для таких систем важна не только корректность выполнения возложенных на них функций, но и время, за которое эти функции реализуются. Можно даже сказать, что для задач реального времени опоздание с выполнением практически эквивалентно невыполнению задачи: требуемая реакция или управляющее воздействие не поступили в срок. Предельный срок, в который задача реального времени должна быть выполнена, называют критическим сроком обслуживания (deadline).

Если система реального времени реализуется как многопоточная система (а в настоящее время такой вариант рассматривается фактически как стандартный), то при ее разработке зачастую возникает проблема определения того, действительно ли все задачи реального времени, конкурирующие в системе за вычислительный ресурс, успевают исполниться в их критический срок обслуживания.

Примечание

Здесь мы следуем «классической» модели обсуждения из области систем реального времени, хотя уместнее было бы акцентировать внимание не на абсолютной минимизации времени приложения, а именно на том, что приложение обязано «уложиться» в некоторый критический интервал времени (см. выше). Величина же того, насколько быстро приложение выполнит свои критические функции (если оно укладывается в критический интервал) по принципу «меньше — больше», практически уже не имеет никакого значения. Из этого не совсем четкого толкования сложился общий стереотип, состоящий в том, что системы реального времени (в частности, операционные системы реального времени) принято считать «быстрыми» (в том смысле, что они потенциально могут исполнять аналогичные функции быстрее, чем системы общего назначения). Этот взгляд в корне ошибочен: системы реального времени в общем случае, скорее, будут даже «медленнее», чем системы общего назначения, за счет более тщательной отработки операций, например диспетчеризации и переключений контекстов. Во многих случаях можно ожидать, что при многократном выполнении участка кода средняя величина времени его выполнения в ОС общего назначения будет ниже, но вот дисперсия этой средней величины будет намного ниже в системах реального времени.

На сегодняшний день существует несколько систем математического анализа временных характеристик систем реального времени, призванных помочь разработчику в построении системы, распределении приоритетов между задачами и, в конечном счете, определении диспетчеризуемости системы. Систему называют диспетчеризуемой, если все ее задачи укладываются в свои сроки критического обслуживания.

Одна из наиболее известных систем математического анализа временных характеристик систем реального времени с периодическим поступлением запросов на выполнение задач называется «Частотно-монотонный анализ» (ЧМА — Rate Monotonic Analyzing) [13]. Свое название эта система получила от ее основного принципа: «Чем короче период поступления (выше частота) задачи, тем выше ее приоритет». Как уже говорилось, ЧМА предназначен для анализа систем реального времени, в которых каждая задача реального времени обрабатывается со своим периодом, причем еще одним ограничением ЧМА является условие, что период поступления задачи является также и ее критическим сроком обслуживания. В настоящее время появился ряд новых методов анализа характеристик систем реального времени для случаев критических сроков обслуживания, больших или меньших периода поступления, но здесь мы не будем на них останавливаться.

К сожалению, практически невозможно создать эффективную методику анализа систем с полностью случайными сроками поступления задач реального времени. Однако на практике такие ситуации в чистом виде встречаются не особо часто. В отличие от задач с полностью случайным сроком поступления, в математическом анализе систем реального времени рассматриваются так называемые спорадические задачи, то есть задачи, последующий срок поступления которых может наступить не ранее некоторого времени после их предыдущего поступления.

Планирование обслуживания таких задач можно свести к планированию периодических задач и, таким образом, провести для них анализ диспетчеризуемости. Для этого теория ЧМА предлагает введение дополнительной периодической задачи (называемой спорадический сервер), которая проводит обслуживание непериодических (спорадических) задач.

Алгоритм работы такого сервера [13] следующий:

• Шаг 1. Если спорадический запрос прибывает и сервер не может его обработать, потому что уже занят или не имеет свободного ресурса вычислений, запрос будет поставлен в очередь обработки.

• Шаг 2. Если получен спорадический запрос и сервер может его обработать, он делает следующее:

• Шаг 2а. Выполняется до служебного завершения или истощения ресурса вычисления.

• Шаг 2с. Уменьшает текущий ресурс вычисления на используемое количество и на столько же увеличивает его ресурс вычисления в точке пополнения.

Для реализации теоретически обобщенной модели спорадического сервера в качестве механизма, реализующего эту модель, в QNX 6.2.1 была введена специализированная дисциплина диспетчеризации — спорадическая.

Сутью спорадической диспетчеризации в QNX является установка для соответствующего потока двух значений приоритета: основного (normal) и фонового (foreground). В момент запуска потока, подчиняющегося спорадической диспетчеризации (момент времени 0), поток имеет запас времени (С), называемый начальным бюджетом (initial budget) потока, в течение которого поток выполняется со своим основным приоритетом (N). Когда же запас времени исчерпывается, его приоритет понижается до уровня фонового (L). Через некоторый период времени T происходит пополнение (replenishment) запаса времени потока до значения начального бюджета, и он снова может выполняться с основным приоритетом.

Рассмотрим порядок выполнения такого потока подробнее. В начальный момент времени после запуска поток имеет приоритет N и время С для выполнения с этим приоритетом. Если поток блокируется на время R, то запас времени все равно расходуется и пополнение этого запаса может произойти только через период T после начала выполнения потока. Если же поток вытесняется более приоритетным, то расход его запаса времени прекращается. Когда управление возвращается к потоку, он вновь начинает тратить оставшееся количество времени на основном приоритете. Однако с момента повторного начала выполнения потока начинается отсчет нового периода до момента пополнения.

На рис. 2.6 проиллюстрирована работа спорадического потока. После запуска (момент времени 0) поток переходит в блокированное состояние на время R (10 мс), но его бюджет все равно расходуется. Поток становится активным, но через 3 мс (13 мс от начала выполнения) вытесняется более приоритетным потоком. Факт вытеснения означает, что через период пополнения T (40 мс) бюджет потока будет пополнен на израсходованную величину (13 мс). Еще через 3 мс более приоритетный поток заканчивает свою работу и управление возвращается назад. От начального бюджета потока С (20 мс) осталось еще 7 мс, и поток выполняется это время с основным приоритетом. При этом от повторного начала его выполнения (16 мс) отсчитывается новый период пополнения, то есть через 56 мс бюджет потока будет пополнен на 7 мс. После полного исчерпания бюджета приоритет потока понижается до фонового (L) и поток может вытесняться или нет в зависимости от приоритетов остальных потоков в системе. После наступления очередного времени пополнения бюджет потока восстанавливается на израсходованную в этом периоде величину и т.д.

Рис. 2.6. Периодическое выполнение спорадической задачи

Если поток много раз вытесняется в период своей работы с основным приоритетом, то его выполнение может превратиться в многократное колебание с высокой частотой между основным и фоновым приоритетами. Поэтому в QNX 6.2.1 в параметрах для спорадической диспетчеризации можно установить (ограничить) максимальное количество пополнений бюджета за период.

Как уже описывалось выше, структура shed_param содержит в своем составе, в частности, еще и структуру параметров для спорадической диспетчеризации (при других типах диспетчеризации эта часть не используется):

struct {

 _INT32 __ss_low_priority;

 _INT32 __ss_max_repl;

 struct timespec __ss_repl_period;

 struct timespec __ss_init_budget;

} __ss;

где low_priority — фоновый приоритет; max_repl — максимальное количество пополнений бюджета за период; repl_period — период пополнения бюджета и init_budget — начальный бюджет.

 

Соображения производительности

Выполним «симметричный» тест аналогично тому, как это делалось для переключения контекстов процессов (стр. 44), но теперь применительно к потокам (файл p5t.cc). При этом мы постараемся максимально сохранить принципы функционирования, имевшие место в приложении «Затраты на взаимное переключение процессов» (файл p5.сс) (естественно, из-за принципиального различия механизмов тексты кодов будут существенно отличаться).

Затраты на взаимное переключение потоков

#include

#include

#include

#include

#include

#include

#include

#include

unsigned long N = 1000;

// потоковая функция:

void* threadfunc(void* data) {

 uint64_t t = ClockCycles();

 for (unsigned long i = 0; i < N; i++) sched_yield();

 t = ClockCycles() - t;

 // дать спокойно завершиться 2-му потоку до начала вывода

 delay(100);

 cout << pthread_self() << "\t: cycles - " << t

  << ", on sched - " << (t / N) / 2 << endl;

  return NULL;

}

int main(int argc, char* argv[]) {

 int opt, val;

 while ((opt = getopt(argc, argv, "n:")) != -1) {

  switch(opt) {

  case 'n': // переопределения числа переключений

   if (sscanf(optarg, "%i", &val) != 1)

    cout << "parse command line error" << endl, exit(EXIT_FAILURE);

   if (val > 0) N = val;

   break;

  default:

   exit(EXIT_FAILURE);

  }

 }

 const int T = 2;

 pthread_t tid[T];

 // создать взаимодействующие потоки

 for (int i = 0; i < T; i++)

  if (pthread_create(tid + i, NULL, threadfunc, NULL) != EOK)

   cout << "thread create error", exit(EXIT_FAILURE);

 // и дожидаться их завершения ...

 for (int i = 0; i < T; i++)

  pthread_join(tid[i], NULL);

 exit(EXIT_SUCCESS);

}

Результаты выполнения программы:

# nice -n-19 p5t -n100

2       : cycles - 79490; on sched - 397

3       : cycles - 78350; on sched — 391

# nice -n-19 p5t -n1000

2       : cycles - 753269; on sched - 376

3       : cycles - 752069; on sched - 376

# nice -n-19 p5t -n10000

2       : cycles - 7494255; on sched - 374

3       : cycles - 7493225; on sched - 374

# nice -n-19 p5t -n100000

2       : cycles - 74897795; on sched - 374

3       : cycles - 74895800; on sched — 374

# nice -n-19 p5t -n1000000

2       : cycles - 748850811, on sched - 374

3       : cycles - 748850432; on sched - 374

Как и в случае с процессами, результаты отличаются очень высокой устойчивостью при изменении «объема вычислений» на 4 порядка, однако по своим величинам значения для потоков почти в 2 раза меньше, чем для процессов (стр. 45).

 

Завершение потока

 

Как и в случае обсуждавшегося ранее завершения процесса, для потоков мы будем отчетливо различать случаи:

• «естественного» завершения выполнения потока из кода самого потока;

• завершения потока извне, из кода другого потока или по сигналу. Для этого действия, в отличие от «естественного» завершения, будем использовать другой термин — отмена.

Завершение потока происходит при достижении функцией потока своего естественного конца и выполнения оператора return (явно или неявно) или выполнения потоком вызова:

void pthread_exit(void* value_ptr)

где value_ptr — указатель на результат выполнения потока.

При выполнении pthread_exit() поток завершается. Если этот поток принадлежит к категории ожидаемых, он может возвратить результат своей работы другому потоку, ожидающему его завершения на вызове pthread_join() (только один поток может получить результат завершения). Если же этот поток отсоединенный, то по его завершении все системные ресурсы, задействованные потоком, освобождаются немедленно.

Перед завершением потока будут выполнены все завершающие процедуры, помещенные в стек завершения, а также деструкторы собственных данных потока, о которых мы говорили ранее. Для последнего потока процесса вызов pthread_exit() эквивалентен exit().

 

Возврат результата потока

Выше отмечено, что вызов pthread_exit(), завершающий ожидаемый поток, может передать результат выполнения потока. То же действие может быть выполнено и оператором return потоковой функции, которая из прототипа ее определения должна возвращать значение типа void*.

В обоих случаях результат может иметь сколь угодно сложный структурированный тип; никакая типизация результата не предусматривается (тип void*). Важно, чтобы код, ожидающий результата на вызове pthread_join(), понимал его так же, как и функция потока, возвращающая этот результат.

Другим условием является то, что переменная «результат» должна существовать к моменту вызова pthread_join(), то есть вполне возможно, что уже далеко после завершения самой функции ожидаемого потока. Этому условию не удовлетворяют, например, любые локальные для функции потока объекты, размещаемые в стеке. Приведем пример часто допускаемой ошибки. Следующая функция потока практически обречена на ошибку защиты памяти:

void* threadfunc(void* data) {

 int res; // результат некоторых вычислений

 res = ...

 pthread_exit(&res);

}

А вот один из многих допустимых вариантов:

void* threadfunc(void* data) {

 struct data *res = new struct; // результат некоторых вычислений

 ...

 *res = ...

 pthread_exit(res);

}

...

pthread_t tid;

pthread_create(&tid, NULL, threadfunc, NULL);

struct data *res;

pthread_join(tid, &res);

...

delete res;

Недостатком этого варианта является то, что память под блок данных результата выделяется в одной программной единице (в функции потока), а освобождаться должна в другой (в коде, ожидающем результата), при этом сами программные единицы могут размещаться даже в различных файлах исходного кода. (Здесь ситуация зеркально подобна ранее рассмотренному случаю передачи параметров в функцию создаваемого потока.)

 

Уничтожение (отмена) потока

Корректное завершение выполняющегося потока «извне», из другого потока (то есть асинхронно относительно прерываемого потока), — задача отнюдь не тривиальная; она намного сложнее аналогичной задачи прерывания процесса. Это связано с обсуждавшимся ранее при рассмотрении завершения потоков временем жизни объектов, которые могут быть использованы потоком к моменту его отмены (блоки динамической памяти, файловые дескрипторы, примитивы синхронизации и другие объекты системы).

Если для процесса в перечень «опасных» (с точки зрения завершения) объектов включаются только объекты со временем жизни выше уровня процесса (их число достаточно ограничено), то для потока в число таких объектов включаются уже все объекты со временем жизни процесса (process-persistent). Завершающийся (покидающий процесс) поток обязан оставить все объекты процесса в состоянии, пригодном для их дальнейшего использования другими потоками процесса.

Далее мы подробно рассмотрим то множество предосторожностей, которыми «обложена» отмена потока. Однако именно по причине их «множества» стоит сформулировать краткое правило: не пытайтесь завершать поток извне его функции потока, если для этого нет в высшей степени обоснованной необходимости (а такая необходимость действительно бывает, но крайне редко). Даже в крайнем случае следует рассмотреть возможность вместо отмены потока послать ему сигнал (даже не только «сигнал UNIX», а в более широком смысле — «некоторое сообщение»), который, обрабатываясь в контексте потока, после корректных завершающих действий вызовет его завершение. (Как обращаться с сигналами в потоке, будет детально рассмотрено позже.)

Для отмены (принудительного завершения) потока используется вызов:

int pthread_cancel(pthread_t thread);

где в качестве параметра thread указывается TID отменяемого потока. Однако этот вызов не отменяет поток, а только запрашивает завершение потока. В зависимости от статуса отмены, который мы сейчас рассмотрим, поток может перейти (или нет) к действию завершения, которое состоит в том, что:

• выполняются все процедуры завершения, занесенные ранее в стек завершения вызовами pthread_cleanup_push();

• выполняются деструкторы собственных данных потока;

• отменяемый поток завершается;

• процесс отмены — асинхронный с точки зрения вызывающего pthread_cancel() кода, поэтому вызывающий отмену поток должен дождаться завершения потока на вызове pthread_join().

Прежде всего, поток может вообще отказаться выполнять любые отмены, вызвав из своей функции потока:

int pthread_setcancelstate(int state, int* oldstate);

где state и oldstate — устанавливаемое и установленное ранее (возвращаемое вызовом) состояния отмены потока, которые могут принимать значения PTHREAD_CANCEL_DISABLE либо PTHREAD_CANCEL_ENABLE. (Естественно, как и во многих функциях с подобным прототипом, значением oldstate может быть NULL, и тогда нам не нужно возвращать ранее установленное состояние.)

Далее, даже если для потока установлено состояние завершаемости (также называемое «состоянием отмены») PTHREAD_CANCEL_ENABLE (это значение по умолчанию при создании потока), поток может переопределить еще и тип отмены, вызвав:

int pthread_setcanceltype(int type, int* oldtype);

где type и oldtype — как и в предыдущем случае, новое и ранее установленное значения типа отмены потока, которые могут принимать значения PTHREAD_CANCEL_ASYNCHRONOUS (асинхронный по отмене поток) либо PTHREAD_CANCEL_DEFERRED (синхронный по отмене поток). Значением по умолчанию, устанавливаемым при создании потока, является PTHREAD_CANCEL_DEFERRED, хотя предписываемым POSIX умолчанием является PTHREAD_CANCEL_ASYNCHRONOUS.

Обе рассмотренные функции установок параметров отмены при успешном выполнении возвращают значение EOK.

Итак, действия потока на запрос его завершения будут определяться текущей комбинацией двух установленных для него параметров: состоянием и типом отмены.

Теперь о том, чем же отличается отмена асинхронно и синхронно завершаемых потоков. Поток с асинхронным типом отмены (установленный с PTHREAD_CANCEL_ASYNCHRONOUS) может быть отменен в любой произвольный момент времени, то есть он всегда «свободен» для отмены и отмена производится немедленно. Поток с синхронным типом отмены (установленный с PTHREAD_CANCEL_DEFERRED) может быть остановлен только в тех точках выполнения потока, когда ему «удобно», и соответствующие места в программе называются точками отмены. При поступлении запроса на отмену такого потока (после выполнения извне pthread_cancel()) запрос помещается в очередь, а процесс отмены активизируется только после того, как отменяемый поток в ходе своего выполнения достигнет очередной точки отмены. Как определяются (создаются) точки отмены в коде потока? Для этого служит функция:

void pthread_testcancel(void);

Каждый вызов pthread_testcancel() тестирует очередь поступивших запросов на отмену на предмет наличия запросов, и если таковой запрос есть, процесс отмены активизируется. Если в коде отсутствуют вызовы pthread_testcancel(), то в нем практически отсутствуют точки отмены и поток становится неотменяемым (подобно установке его состояния отмены в PTHREAD_CANCEL_DISABLE). Поэтому при выполнении длительных вычислений функцию pthread_testcancel() следует периодически вызывать в потоковой функции в тех точках, где потенциальная отмена потока не опасна.

Примечание

( Очень важно! ) Достаточно много библиотечных функций могут сами устанавливать точки отмены. Более того, такие функции могут косвенно вызываться из других функций в программе и тем самым неявно устанавливать точки отмены. Информацию о таких функциях следует искать в справочной man-странице по функции pthread_testcancel() . В результате этого эффекта можно получить отмену потока не в той точке, которую вы считаете безопасной и которую явно отмечаете вызовом pthread_testcancel() , а ранее этой точки — когда будет вызвана одна из таких функций. А это, очевидно, вовсе не то, на что вы рассчитывали!

Если состояние отмены потока, как это описывалось ранее, установлено в PTHREAD_CANCEL_DISABLE, то никакая расстановка точек отмены не имеет эффекта и поток остается неотменяемым.

Покажем, как могут быть использованы все эти предосторожности в коде функции потока, чтобы сделать код безопасным с позиции возможной асинхронной отмены потока извне:

void* function(void* data) {

 int state;

 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &state);

 // ... здесь выполняется инициализация ...

 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);

 pthread_setcancelstate(&state, NULL);

 while (true) {

  struct blockdata *blk = new blockdata;

  // ... обработка блока данных blk ...

  delete blk;

  pthread_testcancel();

 }

}

...

pthread_t tid;

...

pthread_create(&tid, NULL, function, NULL);

...

pthread_cancel(tid); // отмена потока

void* res;

pthread_join(tid, &res); // ожидание отмены

if (res != PTHREAD_CANCELED)

cout << "Что-то не так!" << endl;

Наконец, в QNX (но не в POSIX) существует вызов, подобный pthread_cancel(), принудительно отменяющий поток независимо от его установок («желания»):

int pthread_abort(pthread_t thread);

В отличие от pthread_cancel(), этот вызов принудительно и немедленно отменяет поток. Кроме того, никакие процедуры завершения и деструкторы собственных данных потока не выполняются. Очевидно, что в результате такого «завершения» состояния объектов процесса будут просто неопределенными, поэтому такой вызов крайне опасен. При таком способе отмены в программный код, ожидающий завершения на pthread_join(), в качестве результата завершения возвращается константа (тип void*) PTHREAD_ABORTED (аналогично возвращается константа PTHREAD_CANCELED при выполнении pthread_cancel()).

Но и этих мер безопасности недостаточно на все случаи жизни, поэтому механизм потоков предусматривает еще один уровень (механизм) страховки.

 

Стек процедур завершения

Для поддержания корректности состояния объектов процесса каждый поток может помещать (добавлять) в стек процедур завершения (thread's cancellation-cleanup stack) функции, которые при завершении (pthread_exit() или return) или отмене (по pthread_cancel()) выполняются в порядке, обратном помещению. Для манипуляции со стеком процедур завершения предоставляются вызовы (оба вызова реализуются макроопределениями, но это не суть важно):

void pthread_cleanup_push(void (routine)(void*), void* arg);

где routine — адрес функции завершения, помещаемой в стек; arg — указатель блока данных, который будет передан routine при ее вызове.

Функции завершения (начиная с вершины стека) вызываются со своими блоками данных в случаях, когда:

• поток завершается, выполняя pthread_exit();

• активизируется действие отмены потока, ранее запрошенное по вызову pthread_cancel();

• выполняется второй (комплементарный к pthread_cleanup_push()) вызов с ненулевым значением аргумента:

void pthread_cleanup_pop(int execute);

Этот вызов выталкивает из стека последнюю помещенную туда pthread_cleanup_push() функцию завершения и, если значение execute ненулевое, выполняет ее.

Вот как может выглядеть в этой технике безопасный (с позиции возможной асинхронной отмены потока) захват мьютекса:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void cleanup(void* arg) { pthread_mutex_unlock(&mutex); }

void* thread_function(void* arg) {

 while (true) {

  pthread_mutex_lock(&mutex);

  pthread_cleanup_push(&cleanup, NULL);

  {

   // все точки отмены должны быть расставлены в этом блоке!

  }

  pthread_testcancel();

  pthread_cleanup_pop(1);

 }

}

 

«Легковесность» потока

Вот теперь, завершив краткий экскурс использования процессов и потоков, можно вернуться к вопросу, который вскользь уже звучал по ходу рассмотрения: почему и в каком смысле потоки часто называют «легкими процессами» (LWP — lightweight process)?

Выполним ряд тестов по сравнительной оценке временных затрат на создание процесса и потока. Начнем с процесса (файл p2-1.cc):

Затраты на порождение нового процесса

struct mbyte { // мегабайтный блок данных

 #pragma pack(1)

 uint8_t data[1024 * 1024];

 #pragma pack(4)

};

int main(int argc, char *argv[]) {

 mbyte *blk = NULL;

 if (argc > 1 && atoi(argv[1]) > 0) {

  blk = new mbyte[atoi(argv[1])];

 }

 uint64_t t = ClockCycles();

 pid_t pid = fork();

 if (pid == -1) perror("fork"), exit(EXIT_FAILURE);

 if (pid == 0) exit(EXIT_SUCCESS);

 if (pid > 0) {

  waitpid(pid, NULL, WEXITED);

  t = ClockCycles() - t;

 }

 if (blk != NULL) delete blk;

 cout << "Fork time " << cycle2milisec(t)

  << " msec. [" << t << " cycles]" << endl; exit(EXIT_SUCCESS);

}

Эта программа сделана так, что может иметь один численный параметр: размер (в мегабайтах) блока условных данных (в нашем случае даже неинициализированных), принадлежащего адресному пространству процесса. (Функцию преобразования процессорных циклов в соответствующий миллисекундный интервал cycle2milisec() мы видели раньше, и поэтому в листинг она не включена.)

А теперь оценим временные затраты на создание клона процесса в зависимости от объема программы (мы сознательно использовали клонирование процесса вызовом fork(), а не загрузку spawn*() или exec*(), чтобы исключить из результата время загрузки образа процесса из файла):

# p2-1

fork time: 3.4333 msec. [1835593 cycles]

# p2-1 1

Fork time: 17.0706 msec [9126696 cycles]

# p2-1 2

Fork time: 31.5257 msec. [16855024 cycles]

# p2-1 5

Fork time: 70.7234 msec. [37811848 cycles]

# p2-1 20

Fork time: 264.042 msec. [141168680 cycles]

# p2-1 50

Fork time: 661.312 msec. [353566688 cycles]

# p2-1 100

Fork time: 1169.45 msec. [625241336 cycles]

Наблюдаются, во-первых, достаточно большие временные затраты на создание процесса (к этому мы еще вернемся), а во-вторых, близкая к линейной зависимость времени создания процесса от размера его образа в памяти и вариации этого времени на несколько порядков. Об этом уже говорилось при рассмотрении функции fork(): это следствие необходимости полного копирования образа адресного пространства родительского процесса во вновь создаваемое для дочернего процесса адресное пространство. При этом линейный рост времени копирования от размера образа процесса становится естественным (вот почему для образов таких задач при их построении посредством программы make в высшей степени целесообразно выполнить завершающую команду strip для уменьшения размера итогового образа задачи). Более того, это «высоко затратная» операция копирования, не в пример привычной функции memcpy(). Копирование производится между различными адресными пространствами обращением к средствам системы по принципу: скопировать N байт, начиная с адреса А адресного пространства Р, по адресу, начиная с А (тот же адрес!) адресного пространства С. В большинстве других ОС некоторое смягчение вносит использование техники COW (copy on write), но и этот эффект кажущийся (см. выше подробное обсуждение при описании функции fork()).

На результаты наших оценок очень существенное влияние оказывают процессы кэширования памяти, что можно легко увидеть, экспериментируя с приложением, но затраты (число процессорных тактов) на выполнение fork() будут оценены очень грубо:

T = 3000000 + Р * 6000

где Р — размер (в килобайтах) файла образа программы, в которой выполняется fork().

Теперь проведем столь же элементарный альтернативный тест (файл p2-2.cc) по созданию потока. (В случае потока время гораздо проще измерять и с более высокой точностью, но мы для сравнимости результатов почти текстуально сохраним предыдущий пример с включением в результат операторов завершения дочернего объекта, ожидания результата и т.д.)

Затраты на создание потока

void* threadfunc(void* data) { pthread_exit(NULL); }

int main(int argc, char *argv[]) {

 uint64_t t = ClockCycles();

 pthread_t tid;

 pthread_create(&tid, NULL, threadfunc, NULL);

 pthread_join(tid, NULL);

 t = ClockCycles() - t;

 cout << "Thread time, " << cycle2milisec(t) << " msec. [" << t <<

  " cycles]" << endl;

 exit(EXIT_SUCCESS);

}

На результаты этого теста (в отличие от предыдущего) уже достаточно существенно влияет приоритет, под которым выполняется задача, поэтому проделаем его с достаточно высоким приоритетом (29):

# nice -n-19 p2-2

Thread time: 0.147139 msec. [78667 cycles]

# nice -n-19 p2-1

Fork time: 2.5366 msec. [1356179 cycles]

Вот так… время порождения нового «пустого» процесса, даже минимального размера (размер исполняемого файла этого процесса чуть больше 4 Кбайт), почти в 20 раз больше затрат на создание потока! А для процессов большого объема эта разница может доходить до 3–4 порядков (см. результаты первого теста).

Далее рассмотрим сравнительную эффективность с другой стороны: будет ли диспетчеризация многочисленных потоков, принадлежащих одному процессу, эффективнее диспетчеризации такого же количества отдельных процессов? Для процессов задача текстуально выглядит так (файл p4-1.cc):

void workproc(int how = 1) {

 const int nsingl = 1000, msingl = 30;

 for (int j = 0; j < how; j++) // ... имитация вычислений

  for (uint64_t i = 0; i < msingl; i++)

   for (uint64_t k = 0; k < nsingl; k++)

    k = (k + 1) - 1;

}

int main(int argc, char *argv[]) {

 int numpar = 1;

 if (argc > 1 && atoi(argv[1]) > 0)

  numpar = atoi(argv[1]);

 _clockperiod clcold;

 ClockPeriod(CLOCK_REALTIME, NULL, &clcold, 0);

 if (argc > 2 && atoi(argv[2]) > 0) {

  _clockperiod clcnew = { atoi(argv[2]) * 1000, 0 };

   ClockPeriod(CLOCK_REALTIME, &clcnew, &clcold, 0);

 }

 timespec interval;

 sched_rr_get_interval(0, &interval);

 cout << "Rescheduling interval = "

  << (double)interval.tv_nsec / 1000000 << " msec." << endl;

 uint64_t t = ClockCycles();

 for (int i = 0, i < numpar; i++) {

  pid_t pid = fork();

  if (pid == -1) perror("fork"), exit(EXIT_FAILURE);

  if (pid == 0) {

   workproc(1000);

   exit(EXIT_SUCCESS);

  }

 }

 for (int i = 0; i < numpar; i++) wait3(NULL, WEXITE0, NULL);

 t = ClockCycles() - t;

 cout << "Forks scheduling time" << cycle2milisec(t)

  << " msec [" << t << " cycles]" << endl;

 ClockPeriod(CLOCK_REALTIME, &clcold, NULL, 0);

 exit(EXIT_SUCCESS);

}

Имитатором активной вычислительной нагрузки программы является функция workproc(), отличительной особенностью которой является то, что она при активной (хоть и бессмысленной) загрузке процессора не делает на всем интервале своего выполнения никаких системных вызовов, которые могли бы привести к вытеснению выполняющего ее потока.

Первым параметром программы является количество процессов, на которые распределяется общий объем вычислений. Но самое главное: начнем управлять размером периода временного системного тика.

Примечание

По умолчанию системный тик (для QNX 6.2.1) равен 1 мсек., но в принципе его значение можно уменьшать функцией ClockPeriod() вплоть до 10 мксек. Кстати, в описании именно этой функции присутствует замечание о том, что « …период решедулирования равен 4 тикам, и это соотношение в системе нельзя изменить ».

Второй параметр запуска программы (при его наличии) и определяет размер периода системного тика, выраженный в микросекундах. (В конце выполнения задач подобного рода, изменяющих размер системного тика, нужно обязательно принять меры к восстановлению его прежнего значения даже в случаях экстремального и аварийного завершения задачи!) Для повышения достоверности тестов величина размера интервала диспетчеризации контролируется независимым образом (вызовом sched_rr_get_interval()).

При распараллеливании вычислительного объема между потоками эквивалентный код (файл p4-2.cc) будет иметь вид (используется та же функция workproc()), которую мы повторно не показываем):

void* threadfunc(void* data) {

 workproc(100);

 pthread_exit(NULL);

}

int main(int argc, char *argv[]) {

 int numpar = 1;

 if (argc > 1 && atoi(argv[1]) > 0)

  numpar = atoi(argv[1]);

 pthread_t *tids = new pthread_t [numpar];

 _clockperiod clcold;

 ClockPeriod(CLOCK_REALTIME, NULL, &clcold, 0);

 if (argc > 2 && atoi(argv[2]) > 0) {

  _clockperiod clcnew = { atoi(argv[2]) * 1000, 0 };

  ClockPeriod(CLOCK_REALTIME, &clcnew, &clcold, 0);

 }

 timespec interval;

 sched_rr_get_interval(0, &interval);

 cout << "Rescheduling interval = "

  << (double)interval.tv_nsec / 1000000 << " msec. " << endl;

 uint64_t t = ClockCycles();

 for (int i = 0; i < numpar; i++)

  pthread_create(&tids[i], NULL, threadfunc, NULL);

 for (int i = 0; i < numpar; i++)

  pthread_join(tids[i], NULL);

 t = ClockCycles() - t;

 cout << "Threads scheduling time " << cycle2milisec(t)

  << " msec. [" << t << " cycles]" << endl;

 ClockPeriod(CLOCK_REALTIME, &clcold, NULL, 0);

 exit(EXIT_SUCCESS);

}

Наконец, для сравнительного анализа выполним тот же объем вычислительной работы в одиночном потоке, то есть в последовательной «классической» программе (файл p4-3.cc):

int main(int argc, char *argv[]) {

 int numpar = 1;

 if (argc > 1 && atoi(argv[1]) > 0)

  numpar = atoi(argv[1]);

 _clockperiod clcold;

 ClockPeriod(CLOCK_REALTIME, NULL, &clcold, 0);

 if (argc > 2 && atoi(argv[2]) > 0) {

  _clockperiod clcnew = { atoi(argv[2]) * 1000, 0 };

  ClockPeriod(CLOCK_REALTIME, &clcnew, &clcold, 0);

 }

 timespec interval;

 sched_rr_get_interval(0, &interval);

 cout << "Rescheduling interval = "

  << (double)interval.tv_nsec / 1000000. << " msec." << endl;

 uint64_t t = ClockCycles();

 workproc(1000 * numpar);

 t = ClockCycles() - t;

 cout << "Single scheduling time. " << cycle2milisec(t)

  << " msec. [" << t << " cycles]" << endl;

 ClockPeriod(CLOCK_REALTIME, &clcold, NULL, 0);

 exit(EXIT_SUCCESS);

}

Выполняем 3 полученных теста для различных значений периода системного тика (показано группами по 3 запуска) в таком порядке: одиночный процесс, параллельные потоки, параллельные процессы:

# nice -n-19 p4-3 10

Rescheduling interval = 3.99939 msec

Single scheduling time: 5928.8 msec [3169850746 cycles]

# nice -n-19 p4-2 10

Rescheduling interval = 3.99939 msec.

Threads scheduling time: 5919.82 msec. [3165049513 cycles]

# nice -n-19 p4-1 10

Rescheduling interval = 3.99939 msec.

Forks scheduling time: 5962.21 msec. [3187713371 cycles]

# nice -n-19 p4-3 10 50

Rescheduling interval = 0.197788 msec

Single scheduling time: 6427.33 msec. [3436394566 cycles]

# nice -n-19 p4-2 10 50

Rescheduling interval = 0.197788 msec.

Threads scheduling time: 6207.96 msec. [3319104030 cycles]

# nice -n-19 p4-1 10 50

Rescheduling interval = 0.197788 msec

Forks scheduling time 6029.23 msec. [3223548073 cycles]

# nice -n-19 p4-3 10 20

Rescheduling interval = 0.077104 msec.

Single scheduling time: 6745.37 msec. [3606433666 cycles]

# nice -n-19 p4-2 10 20

Rescheduling interval = 0.077104 msec

Threads scheduling time: 6762.69 msec [3615692975 cycles]

# nice -n-19 p4-1 10 20

Rescheduling interval = 0.077104 msec

Forks scheduling time: 6647.42 msec [3554062343 cycles]

# nice -n-19 p4-3 10 11

Rescheduling interval = 0.04358 msec

Single scheduling time. 7517.74 msec [4019381476 cycles]

# nice -n-19 p4-2 10 11

Rescheduling interval = 0.04358 msec

Threads scheduling time: 7638.89 msec. [4084155676 cycles]

# nice -n-19 p4-1 10 11

Rescheduling interval = 0.04358 msec.

Forks scheduling time: 7679 29 msec. [4105758575 cycles]

# nice -n-19 p4-3 10 10

Rescheduling interval = 0.036876 msec.

Single scheduling time: 7937.35 msec. [4243731124 cycles]

# nice -n-19 p4-2 10 10

Rescheduling interval = 0.036876 msec.

Threads scheduling time. 8136.42 msec. [4350159949 cycles]

# nice -n-19 p4-1 10 10

Rescheduling interval = 0.036876 msec

Forks scheduling time: 8172.35 msec [4369372230 cycles]

Результаты могут показаться достаточно неожиданными: во всех 3-х вариантах (в группах) это практически одни и те же цифры — различия затрат на выполнение и в едином последовательном потоке, и во многих параллельных процессах (как предельные случаи) не превышают 0,5–2%! Но результат есть результат, и его нужно как-то интерпретировать, ведь, как известно, «из песни слова не выкинешь».

Эти результаты позволяют (пусть грубо и оценочно) «разложить» затраты производительности между обслуживанием системного таймера (службы времени ОС) и диспетчеризацией. Еще раз обратимся к отдельным выборочным результатам:

# nice -n-19 p4-3 10

Rescheduling interval = 3.99939 msec.

Single scheduling time: 5928.8 msec. [3169850746 cycles]

To есть на протяжении «работы» было 5928,8/0,9998475 = 5929 прерываний от службы времени.

# nice -n-19 p4-3 10 10

Rescheduling interval = 0.036876 msec

Single scheduling time: 7937.35 msec. [4243731124 cycles]

На этот раз за счет уменьшения периода системного тика на 2 порядка на протяжении «работы» (того же объема полезной работы!) было уже 7937,35/0,009219 = 860977 событий диспетчеризации.

Поскольку объем работы программы, выполняемый в этих двух случаях, остается неизменным, то на обслуживание дополнительных 860 977 – 5929 = 855 048 системных тиков (совместно с 855 048/4 = 213 762 точками диспетчеризации) и потребовались те 4 243 731 124 – 3 169 850 746 = 1 073 880 378 дополнительных тактов процессора, или около 1256 тактов на один системный тик. Ранее мы уже получали оценки затрат собственно на переключение контекстов между процессами (617) и потоками (374), которые происходят каждый четвертый системный тик, то есть непосредственно переключение контекста «отъедает» в среднем 90–150 (¼ часть затрат переключения контекста) на каждый системный тик или, другими словами, не более 10% затрат на обслуживание службы системных часов.

Попытаемся осмыслить полученные результаты:

• Время переключения адресных пространств процессов, управляемых MMU аппаратно, в принципе должно быть продолжительнее времени переключения контекстов потоков и тем более восстановления контекста единого последовательного потока, но…

• …но объем работы по обслуживанию каждого системного тика (прерывания таймера) настолько превышает объем операций переключения контекстов (рис. 2.7), что это практически полностью нивелирует разницу, будь то приложение в виде многих автономных процессов, многопоточное приложение или приложение в виде единого последовательного потока.

Рис. 2.7. Эффекты, возникающие при принудительном изменении частоты системных часов

На рис. 2.7 показана последовательность тиков системных часов и связанная с нею последовательность актов диспетчеризации. При уменьшении периода наступления системных тиков (частоты аппаратных прерываний от системных часов) в силу фиксированных объемов операций, требуемых как для одних, так и для других действий, относительная доля времени, остающаяся для выполнения полезной работы, падает.

• И это будет выполняться не только для потоков, диспетчеризуемых с дисциплиной RR (вытесняемых по истечении бюджета времени выделенного им кванта), но и для потоков с любой дисциплиной диспетчеризации, в том числе и FIFO, когда выполняющийся поток (а значит, поток наивысшего приоритета в системе) вообще «не собирается» никому передавать управление.

• Для программиста-разработчика результаты этого теста позволяют сформулировать правило, возможно абсурдное с позиций элементарной (но поверхностной) логики: Распараллеливание задачи (если это возможно) на N ветвей (будь то использование потоков или процессов) практически не изменяет итоговое время ее выполнения.

Еще одним побочным результатом рассмотрения можно назвать следующее: эффективность диспетчеризации потоков (сохранения и переключения контекстов), принадлежащих одному процессу, ни в чем не превосходит эффективность диспетчеризации группы потоков, принадлежащих различным процессам. И в этом своем качестве — эффективности периода выполнения — потоки в своей «легковесности» ничем не превосходят автономные параллельные процессы.

В завершение воспользуемся все теми же тестовыми приложениями для ответа на часто задаваемый вопрос: «Насколько эффективно ОС QNX поддерживает приложения, содержащие большое («слишком большое») количество потоков? Посмотрим, как это выглядит. Все выполнения мы делаем при минимально возможном значении системного тика, когда ОС существенно более «озабочена» своими внутренними процессами, нежели процессом вычислений:

# nice -n-19 p4-2 2 10

Rescheduling interval = 0.036876 msec.

Threads scheduling time: 1555.43 msec [831574415 cycles]

# nice -n-19 p4-2 20 10

Rescheduling interval = 0.036876 msec.

Threads scheduling time: 15642 msec. [8362674590 cycles]

# nice -n-19 p4-2 200 10

Rescheduling interval = 0.036876 msec

Threads scheduling time: 161112 msec. [86134950020 cycles]

Наблюдается очень хорошая линейная зависимость итогового времени от числа потоков (от 2 до 200). Таким образом, время выполнения работы в каждом из потоков практически не зависит от общего числа параллельно выполняющихся с ним потоков.

Повторим то же самое, но уже для случая параллельных процессов:

# nice -n-19 p4-1 2 10

Rescheduling interval = 0.036876 msec.

Forks scheduling time: 1622.87 msec [867633362 cycles]

# nice -n-19 p4-1 20 10

Rescheduling interval = 0.036876 msec.

Forks scheduling time: 16682.1 msec [8918698991 cycles]

# nice -n-19 p4-1 200 10

Rescheduling interval = 0.036876 msec

Forks scheduling time: 173398 msec. [92703484992 cycles]

Здесь наблюдается лишь незначительное увеличение крутизны линейной зависимости, что можно отнести к некоторым накладным расходам на поддержание достаточно большого числа записей о процессах в таблицах менеджера процессов, но величина этого эффекта также весьма малосущественна.

В итоге, в отношении «легковесности» потоков можно сказать следующее:

• При необходимости динамического создания параллельных ветвей в ходе выполнения программы (а это достаточно классический случай, например в разнообразных сетевых серверах, создающих ветвь обслуживания для каждого нового клиента) производительность приложения, функционирующего на основе потоков, может быть значительно выше (до нескольких порядков), а время реакции соответственно ниже.

• При статическом выполнении (фиксированном количестве параллельных ветвей в приложении) эффективность приложений, построенных на параллельных потоках или параллельных процессах, практически не отличается. Более того, эффективности таких приложений не отличаются и от классической последовательной организации приложения, работающего в одном потоке.

• Существует дополнительный фактор, обеспечивающий «легковесность» потоков в противовес процессам, — это легкость и эффективность их взаимодействия в едином адресном пространстве. В случае процессов для обеспечения таких взаимодействий возникает необходимость привлечения «тяжеловесных» механизмов IPC разнообразной природы (именованные и неименованные каналы, разделяемая память, обмен UNIX-сообщениями и другие). При рассмотрении обмена сообщениями QNX мы еще раз убедимся в том, что обмены и взаимодействия между процессами могут требовать весьма существенных процессорных ресурсов, а при обменах с интенсивным трафиком могут стать доминирующей компонентой, определяющей пределы реальной производительности системы.

 

Пример: синхронное выполнение кода

Выше приводилось достаточно много подобных примеров, но это были примеры, так сказать, «локальные», фрагментарные, иллюстрирующие использование какой-то одной возможности применительно к потокам. Сейчас мы приведем пример, реализующий часто возникающую на практике возможность. Некоторые программные действия (функции) мы хотели бы запускать периодически с фиксированным временным интервалом T, что весьма напоминает действия и аппаратной реализации, которые должны быть выполнены по каждому импульсу «синхронизирующей последовательности».

Простейшая реализация могла бы выглядеть так:

...

while(true) {

 delay(T);

 func();

}

Но это очень «слабое» решение:

• Задержка, обеспечиваемая функцией пассивной задержки delay(), согласно требованиям POSIX не может быть меньше указанного параметра T, но... может быть сколь угодно больше! (В [4] мы писали, что при T = 1 реальная величина задержки будет составлять не 1 мсек., как можно было бы ожидать, а с большой степенью вероятности 3 мсек., и там же мы подробно показывали, как это происходит.)

• Если в системе одновременно с этим приложением работает процесс (поток) более высокого приоритета, то наше приложение может вообще никогда «не проснуться», по крайней мере, пока это не «соизволит» санкционировать параллельное приложение.

• Здесь мы обеспечиваем только одну синхронизированную последовательность вызовов функции func(). А если бы нам потребовалось несколько (много) синхросерий, в каждой из которых выполняется своя функция, а периоды серий не кратны друг другу?

• Наконец, время выполнения целевой функции func() включается в период одного «кругового пробега» цикла, то есть период T отсчитывается от конца предыдущего выполнения функции до начала текущего, а это не совсем то, что мы подразумевали при использовании термина «синхронное».

• Более того, если время выполнения функции func() достаточно флуктуирует от одного вызова до другого (например, из-за изменений данных, с которыми работает функция), то периоды вызовов начинают «гулять», а дисперсия периода результирующей последовательности вызовов func() становится просто непомерно большой.

Ниже показано решение, свободное от многих из этих недостатков (файл t3.cc). Приложение представляет собой тестовую программу, осуществляющую 3 цепочки выполнения различных целевых функций (mon1, mon2, mon3) с разными периодами для каждой цепочки (массив period[]):

Синхронизация выполнения участка кода

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

static void out(char s) {

 int policy;

 sched_param param;

 pthread_getschedparam(pthread_self(), &policy, ¶m);

 cout << s << param.sched_curpriority << flush;

}

// целевые функции каждой из последовательностей только

// выводят свой символ-идентификатор и следующий за ним

// приоритет, на котором выполняется целевая функция

static void mon1(void) { out('.'); }

static void mon2(void) { out('*'); }

static void mon3(void) { out('+'); }

// это всего лишь перерасчет временных интервалов,

// измеренных в тактах процессора (в наносекундах)

inline uint64_t cycles2nsec(uint64_t с) {

 const static uint64_t cps =

  // частота процессора

  SYSPAGE_ENTRY(qtime)->cycles_per_sec;

 return (с * 1000000000) / cps;

}

// структура, необходимая только для накопления статистики параметров

// ряда временных отметок: среднего, среднеквадратичного отклонения,

// минимального и максимального значений

struct timestat {

private:

 uint64_t prev;

public:

 uint64_t num;

 double mean, disp, tmin, tmax;

 timestat(void) {

  mean = disp = tmin = tmax = 0.0;

  num = 0;

 }

 // новая временная отметка в ряду:

 void operator++(void) {

  uint64_t next = ClockCycles(), delta;

  if (num i= 0) {

   double delta = cycles2nsec(next — prev);

   if (num == 1) tmin = tmax = delta;

   else tmin = min(tmin, delta), tmax = max(tmax, delta);

   mean += delta;

   disp += delta * delta;

  }

  prev = next;

  num++;

 }

 // подвести итог ряда;

 void operator !(void) {

  mean /= (num - 1);

  disp = sqrt(disp / (num - 1) - mean * mean);

 }

}

// предварительное описание функции потока объекта

void* syncthread(void*);

class thrblock {

private:

 static int code;

 bool ok, st;

public:

 pthread_t tid;

 struct sigevent event;

 timer_t timer;

 int chid;

 void* (*func)(void*);

 sched_param param;

 // структура только для статистики:

 timestat sync;

 // конструктор класса - он не только инициализирует структуру данных

 // создаваемого объекта, но и запускает отдельный поток для его исполнения

 thrblock(

  // параметры конструктора

  // - целевая функция последовательности

  void (*dofunc)(void);

  // - период ее синхронизации

  unsigned long millisec;

  // - приоритет возбуждения синхросерии

  unsigned short priority;

  // - копить ли статистику временных интервалов?

  bool statist = false

 ) {

  // создание канала для получения уведомлений от таймера

  if (!(ok = ((chid = ChannelCreate(0)) >= 0))) return;

  // создать соединение по каналу, которое будет использовать таймер

  event.sigev_coid =

   ConnectAttach(ND_LOCAL_NODE, 0, chid, NTO_SIDE_CHANNEL, 0);

  if (!(ok = (event.sigev_coid >= 0))) return;

  // занести целевую функцию, заодно выполнив

  // трюк преобразования над ее типом

  func = (void*(*)(void*))dofunc;

  int policy;

  // запомнить приоритет вызывающей программы

  // под этим приоритетом и вызывать целевую функцию

  pthread_getschedparam(pthread_self(), &policy, ¶m);

  st = statist;

  event.sigev_code = code++;

  event.sigev_notify = SIGEV_PULSE;

  // а вот это приоритет, с которым нужно будет пробуждаться от таймера!

  event.sigev_priority = priority;

  // создание таймера

  if (!(ok = (timer_create(CLOCK_REALTIME, &event, &timer) == 0))) return;

  // запуск отдельного потока, который по сигналу

  // таймера будет выполнять целевую функцию

  if (!(ok = (pthread_create(&tid, NULL, &syncthread, (void*)this) == EOK)))

   return;

  // и только после этого можно установить период срабатывания

  // таймера, после чего он фактически и запускается

  struct itimerspec itime;

  nsec2timespec(&itime.it_value, millisec * 1000000ull);

  itime it_interval = itime.it_value;

  if (!(ok = (timer_settime(timer, 0, &itime, NULL) == 0))) return;

 }

 // признак того, что объект создан успешно и его поток запущен:

 bool OK(void) { return ok; }

 bool statistic(void) { return st; }

};

int thrblock.code = _PULSE_CODE_MINAVAIL;

// функция потока объекта

void* syncthread(void *block) {

 thrblock *p = (thrblock*)block;

 struct _pulse buf;

 pthread_attr_t attr;

 while(true) {

  // ожидание пульса от периодического таймера объекта

  MsgReceivePulse(p->chid, &buf, sizeof(struct _pulse), NULL);

  pthread_attr_init(&attr);

  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

  // восстановить приоритет целевой функции до уровня того,

  // кто ее устанавливал, вызывая конструктор

  pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);

  pthread_attr_setschedparam(&attr, &p->param);

  // запуск целевой функции в отдельном "отсоединенном" потоке

  pthread_create(NULL, &attr, p->func, NULL);

  if (p->statistic()) ++p->sync;

 }

}

// 'пустой' обработчик сигнала SIGINT (реакция на ^С)

inline static void empty(int signo) {}

int main(int argc, char **argv) {

 // с этой точки стандартная реакция на ^С отменяется...

 signal(SIGINT, empty);

 // массив целевых функций

 void(*funcs[])(void) = { &mon1, &mon2, &mon3 };

 // периоды их синхросерий запуска

 int period[] = { 317, 171, 77 };

 // приоритеты, на которых отрабатывается реакция

 // синхросерий на каждый из таймеров синхросерий

 int priority[] = { 15, 5, 25 };

 int num = sizeof(funcs) / sizeof(*funcs);

 // запуск 3-х синхронизированных последовательностей

 // выполнения (созданием объектов)

 thrblock** tb = new (thrblock*)[num];

 for (int i = 0; i < num; i++) {

  tb[i] = new thrblock(funcs[i], period[i],

  priority[i], true);

  if (!tb[i]->OK())

  perror("synchro thread create"), exit(EXIT_FAILURE);

 }

 // ... а теперь ожидаем ^С.

 pause();

 // подсчет статистики и завершение программы

 cout << endl << "Monitoring finalisation!" << endl;

 // вывод временных интервалов будем делать в миллисекундах:

 const double n2m = 1000000.;

 for (int i = 0; i < num, i++) {

  timestat *p = &tb[i]->sync;

  !(*p); // подсчет статистики по объекту

  cout << i << '\t' << p->num << "\t=> " << p->mean / n2m << " [" <<

   p->tmin / n2m << "..." << p->tmax / n2m << "]\t~" << p->disp / n2m <<

   " (" << p->disp / p->mean * 100 << "%)" << endl;

 }

 return EXIT_SUCCESS;

}

Вся функциональность программы сосредоточена в одном классе — thrblock, который может в неизменном виде использоваться для разных приложений. Необычной особенностью объекта этого класса является то, что он выполнен в технике «активных объектов», навеянной поверхностным знакомством с языками программирования школы Н. Вирта — ActiveOberon и Zormon. В ней говорится, что конструктор такого объекта не только создает объект данных, но и запускает (как вариант) отдельный поток выполнения для каждого создаваемого объекта. В нашем случае задача потоковой функции состоит в вызове целевой функции, адрес которой был передан конструктору объекта в качестве одного из параметров.

Ниже представлены отличия нашей реализации от простого цикла с задержкой, обсуждавшейся выше (помимо исправлений очевидных недостатков):

• Для каждого синхронизирующего таймера установлен свой приоритет «пробуждения», и он может быть достаточно высоким, для того чтобы предотвратить вытеснение этого синхронизирующего потока.

• После «пробуждения» по таймеру запускается целевая функция, но выполняется это отдельным потоком, причем потоком «отсоединенным». Другими словами, процесс выполнения целевой функции никак не влияет на общую схему синхронизации.

• Перед запуском целевой функции выполняющему ее потоку восстанавливается приоритет породившего потока (но не потока обслуживания таймера!), ведь нам не нужно, чтобы целевая функция, тем более, возможно и не очень значимая, как в нашем примере, могла влиять вытеснением на процессы синхронизации.

Запустим наше тестовое приложение:

# t3

+10+10*10+10+10.10*10+10+10*10+10+10.10*10+10+10+10*10+10.10+10*10+10+10*10+10.10+10*10+10+10*10+10.10+10+10*10+10+10+10.10+10+10*10+10+10.10*10+10+10+10*10+10.10+10*10+10+10*10+10+10.10*10+10+10*10+10+10.10+10*10+10+10*10+10.10+10*10+10+10*10+10.10+10+10*10+10+10*10^C

Monitoring finalisation!

0 32  => 316.919 [316.867...317.895] ~0.178511 (0.056327%)

1 59  => 170.955 [168.583...173.296] ~0.92472 (0.540914%)

2 132 => 76.9796 [76.942...77.9524]  ~0.085977 (0.111688%)

Первое, что мы должны отметить, — это очень приличную точность выдержки периода синхронизации (последняя колонка вывода). Для того чтобы убедиться в том, что целевая функция при этом выполняется под приоритетом породившего ее потока, закомментируем строки, выделенные жирным шрифтом в коде программы:

# t3

+25+25*5+25+25.15*5+25+25*5+25+25.15*5+25+25+25*5+25.15+25*5+25+25*5+25.15+25*5+25+25*5*5+25.15+25+25*5+25+25*5.15+25+25*5+25+25.15*5+25+25+25*5+25.15+25*5+25+25*5+25+25.15*5+25+25*5+25+25^C

Monitoring finalisation!

0 32 => 316.919 [316.797...317.915] ~0.185331 (0.0584792%)

1 60 => 170.955 [168.964...173.925] ~0.47915 (0.280279%)

2 34 => 76.9796 [76.8895...77.9694] ~0.0937379 (0.12177%)

В этом варианте (и диагностический вывод это подтверждает) мы искусственно ликвидировали наследование приоритета по цепочке порождения: сработавший таймер — функция потока — целевая функция объекта. Это не совсем соответствует цели, намеченной в начале этого раздела, но все же этот вариант иллюстрирует, что именно наш предыдущий вариант удовлетворял всем поставленным целям.