QNX/UNIX: Анатомия параллелизма

Цилюрик Олег Иванович

Горошко Егор

Зайцев Владимир

5. Специфические механизмы QNX

 

 

Операционная система QNX изнутри вся построена на клиент-серверных принципах, которые вытекают из микроядерной архитектуры и обмена сообщениями микроядра. Мы не могли обойти вниманием эти механизмы, поскольку они предоставляют огромный арсенал возможностей, однако их обстоятельное описание потребовало бы отдельной книги (полное описание см. в технической документации QNX по системной архитектуре). Более того, лучшая книга по обмену сообщениями микроядра уже, пожалуй, написана и переведена на русский язык [1]. В дополнение ко всему приложение «Организация обмена сообщениями», написанное В. Зайцевым и ранее не публиковавшееся, содержит обстоятельный анализ этого механизма.

Поэтому в главе мы лишь кратко рассмотрим вопросы параллелизма и синхронизации, присущие самой микроядерной архитектуре системы.

 

Обмен сообщениями микроядра

Модель обмена сообщениями — это тот фундамент, на котором стоит архитектура любой микроядерной ОС, как на трех китах: SEND — RECEIVE — REPLY. Обмен сообщениями микроядра построен на трех группах вызовов native API QNX (рис. 5.1):

1. Принять сообщение. Процесс, являющийся сервером некоторой услуги, выполняет вызов группы MsgReceive*(), фактически сообщая этим о готовности обслуживать запрос клиента, и переходит при этом в блокированное состояние со статусом RECEIVE, ожидая прихода клиентского запроса.

2. Послать сообщение. Клиентский процесс запрашивает эту услугу, посылая сообщение вызовом MsgSend*(), и переходит в блокированное состояние со статусом SEND. Переход осуществляется обычно на очень короткое время, пока сервер не примет его сообщение и не начнет обработку. Как только сервер принимает посланное сообщение, он разблокируется и меняет статус с RECEIVE на READY. Сервер начинает обработку полученного сообщения, а статус блокировки клиентского процесса меняется на REPLY.

3. Ответить на полученное сообщение. Завершив обработку полученного на предыдущем шаге сообщения, сервер выполняет вызов группы MsgReply*() для передачи запрошенного результата ожидающему клиенту. После этого вызова клиент, блокированный на вызове MsgSend*() со статусом REPLY, разблокируется (переходит в состояние READY). После выполнения MsgReply*() сервер также переходит в состояние READY. Однако чаще всего сервер снова входит в блокированное состояние на вызове MsgReceive*(), поскольку его работа организована как бесконечный цикл.

Рис. 5.1. Обмен сообщениями микроядра и менеджер ресурсов

Уже из этого поверхностного описания понятно, что передача сообщений микроядра — это не только средство взаимодействия процессов с обменом данными, но и крайне гибкая система синхронизации всех участников взаимодействия.

Могут возникнуть вопросы: Это один из низкоуровневых механизмов (существуют ли другие нативные механизмы?), на которых базируется ОС QNX? Какое это может иметь отношение к взаимодействиям на уровне POSIX API? Самое прямое! Все традиционные вызовы POSIX (open(), read(), … и все другие) реализованы в ОС QNX как обмен сообщениями, который только «камуфлируется» под стандарты техникой использования менеджеров ресурсов, о которых разговор еще впереди.

Технология обмена сообщениями микроядра хорошо описана [1] и требует для своего понимания и освоения тщательного изучения. В этой же главе, посвященной совершенно другим предметам, мы не будем детально описывать эту технологию.

Остановимся только на одном обстоятельстве: адресат получателя, которому направляется каждое сообщение, определяется при начальном установлении идентификатора соединения (coid — connect ID) вызовом:

#include

int ConnectAttach(int nd, pid_t Did, int chid,

 unsigned index, int flags);

Адрес назначения (сервера) в этом вызове определяется триадой {ND/PID/CHID}, где:

nd — идентификатор сетевого узла. Мы не станем углубляться в идентификацию сетевых узлов сети QNET. Возьмем на заметку лишь тот факт, что обмен сообщениями с одинаковой легкостью осуществляется как с процессом на локальном узле (nd = 0), так и на любом другом сетевом узле.

pid — PID процесса-сервера, с которым производится соединение.

chid — идентификатор канала, который открыл процесс с указанным PID, выполнив предварительно ChannelCreate(), и к которому устанавливается соединение вызовом ConnectAttach().

Выше мы неоднократно отмечали, что с процессом как с пассивной субстанцией, вообще говоря, невозможно обмениваться сообщениями. Хотя в адресной триаде обмена фигурирует именно PID процесса! Это обстоятельство не меняет положения вещей: именно адресная компонента CHID и определяет тот поток (часто это может быть главный поток приложения), с которым будет осуществляться обмен сообщениями, a PID определяет то адресное пространство процесса, в которое направляется сообщение, адресованное CHID.

Детальнее это выглядит так: в коде сервера именно тот поток, который выполнит MsgReceive*(chid, ...), и будет заблокирован в ожидании запроса от клиента MsgSend*(). Аналогично и в коде клиента вся последовательность выполнения блокировок, обозначенная выше, будет относиться именно к потоку, выполняющему последовательные операции:

coid = ConnectAttach(... , chid, ...);

MsgSend*(coid, ...);

Содержимое двух предыдущих абзацев ни одной буквой не противоречит и не отменяет положения традиционного изложения [1] технологии обмена сообщениями микроядра. Тогда зачем же мы даем именно такую формулировку? Для того чтобы акцентировать внимание на том, что все блокированные состояния и их освобождение имеют смысл относительно потоков (и только потоков!), которые выполняют последовательность операций MsgSend*() — MsgReceive*() — MsgReply*() (даже если это единственный поток — главный поток приложения, и тогда мы говорим о блокировании процессов). Проиллюстрируем сказанное следующим приложением (файл n1.cc):

Обмен сообщениями и взаимные блокировки

#include

#include

#include

#include

#include

#include

#include

#include

#include

#include

static const int TEMP = 500; // темп выполнения приложения

static int numclient = 1;    // число потоков клиентов

// многопотоковая версия вывода диагностики в поток:

iostream& operator <<(iostream& с, char* s) {

 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

 pthread_mutex_lock(&mutex);

 c << s << flush;

 pthread_mutex_unlock(&mutex);

 return c;

}

static uint64_t tb; // временная отметка начала приложения

// временная отметка точки вызова:

inline uint64_t mark(void) {

 // частота процессора:

 const static uint64_t cps =

  SYSPAGE_ENTRY(qtime)->cycles_per_sec;

 return (ClockCycles() - tb) * 1000 / cps;

}

const int MSGLEN = 80;

// потоковая функция сервера:

void* server(void* chid) {

 int rcvid;

 char message[MSGLEN];

 while (true) {

  rcvid = MsgReceive((int)chid, message, MSGLEN, NULL);

  sprintf(message + strlen(message), "[%07llu] ... ", mark());

  delay(TEMP); // имитация обслуживания

  sprintf(message + strlen(message), [%07llu]->", mark());

  MsgReply(rcvid, EOK, message, strlen(message) + 1);

 }

 return NULL;

}

// потоковая функция клиента:

void* client(void* data) {

 while (true) {

  char message[MSGLEN];

  sprintf(message, "%d:\t[%07llu]->", pthread_self(), mark());

  MsgSend((int)data, message, strlen(message) + 1, message, MSGLEN);

  sprintf(message + strlen(message), "[%07llu]", mark());

  cout << message << endl;

  static unsigned int seed = 0;

  delay(numclient*(((long)rand_r(&seed ) * TEMP / RAND_MAX) + TEMP));

  // имитация вычислений...

 }

 return NULL;

}

int main(int argc, char** argv) {

 // 1-й параметр - число потоков клиентов:

 if (argc > 1 && atoi(argv[1]) > 0)

  numclient = atoi(argv[1]);

 tb = ClockCycles();

 int chid = ChannelCreate(0);

 if (pthread_create(NULL, NULL, server, (void*)chid) != EOK)

  perror("server create"), exit(EXIT_FAILURE);

 for (int i = 0; i < numclient; i++)

  if (pthread_create(NULL, NULL, client,

   (void*)ConnectAttach(0, 0, chid, _NTO_SIDE_CHANNEL, 0)) != EOK)

   perror("client create"), exit(EXIT_FAILURE);

 sigpause(SIGINT);

 return EXIT_SUCCESS;

}

Все происходит в рамках единого процесса:

• Создается единый поток сервера, ожидающий сообщений от клиентов и отвечающий на них.

• Создается N потоков клиентов (задается параметром командной строки запуска приложения), которые будут обращаться к серверу.

• К одному каналу сервера устанавливается N соединений от клиентов.

• Канал прослушивания для сервера и идентификаторы соединений для клиентов сознательно создаются в главном потоке (т.e. вне потоков, которые их будут использовать); их значения поступают в потоки (сервера и клиентов) как параметры потоковых функций (трюк с подменой целочисленных значений на указатели мы рассматривали ранее).

• Сообщение продвигается от клиента к серверу и обратно к клиенту; в ходе пересылки объем сообщения нарастает: оно образуется конкатенацией полей, добавляемых последовательно клиентом, сервером и снова клиентом.

• В результате полного цикла обмена сообщением в теле самого сообщения формируется текст, содержащий 5 последовательных полей — идентификатор потока клиента (обращающегося с сообщением) и 4 абсолютные временные метки (в миллисекундах): передачи сообщения клиентом, приема сообщения сервером (начало обработки), ответа на сообщение сервером (конец обработки), приема ответа клиентом.

Запустим полученное приложение, например, так:

# n1 5

И прежде чем обсуждать результаты его работы, понаблюдаем состояния (блокировки) его потоков командой pidin (с другого терминала, естественно). Вот несколько «снимков» состояний, здесь можно наблюдать весь спектр блокированных состояний, о которых говорилось выше:

5546027 1 ./n1  10r SIGSUSPEND

5546027 2 ./n1  10r NANOSLEEP

5546027 3 ./n1  10r NANOSLEEP

5546027 4 ./n1  10r SEND       5546027

5546027 5 ./n1  10r REPLY      5546027

5546027 6 ./n1  10r NANOSLEEP

5546027 7 ./n1  10r NANOSLEEP

5730347 1 ./n1  10r SIGSUSPEND

5730347 2 ./n1  10r RECEIVE 1

5730347 3 ./n1  10r NANOSLEEP

5730347 4 ./n1  10r NANOSLEEP

5730347 5 ./n1  10r NANOSLEEP

5730347 6 ./n1  10r NANOSLEEP

5730347 7 ./n1  10r NANOSLEEP

А теперь рассмотрим результаты выполнения (на меньшем числе потоков клиентов, которое легче анализировать):

# n1 3

3: [0000000]->[0000000] ... [0000501]->[0000501]

4: [0000000]->[0000501] ... [0001003]->[0001003]

5: [0000000]->[0001003] ... [0001505]->[0001505]

3: [0002003]->[0002003] ... [0002504]->[0002505]

5: [0003462]->[0003462] ... [0003964]->[0003964]

4: [0003485]->[0003964] ... [0004466]->[0004466]

3: [0005017]->[0005017] ... [0005518]->[0005518]

5: [0005624]->[0005624] ... [0006126]->[0006126]

4: [0006741]->[0006741] ... [0007243]->[0007243]

...

Видно, как 3 клиента отправляют сообщения одновременно ([0000000]), поток сервера (TID = 2) немедленно получает сообщение ([0000000], 1-я строка), отправленное клиентом с TID = 3, два других сообщения (от клиентов с TID = 4 и 5) помещаются системой в очередь обслуживания (строки 2 и 3). После завершения обслуживания запроса от TID = 3 и ответа ([0000501]) поток сервера получает (извлекается из очереди ранее отправленное сообщение) сообщение от TID = 4 и так далее.

Еще содержательнее для интерпретации становится картина для большего числа потоков клиентов (здесь очередь ожидающих запросов становится гораздо длиннее, а ее поведение трудно предсказуемым - почти каждый запрос ожидает обслуживания), но эти результаты требуют намного более тщательного разбора для их осмысления:

# n1 10

3:  [0000000]->[0000000] ... [0000501]->[0000501]

4:  [0000000]->[0000501] ... [0001003]->[0001003]

5:  [0000000]->[0001003] ... [0001505]->[0001505]

6:  [0000000]->[0001505] ... [0002007]->[0002007]

7:  [0000000]->[0002007] ... [0002508]->[0002508]

8:  [0000000]->[0002508] ... [0003010]->[0003010]

9:  [0000000]->[0003010] ... [0003512]->[0003512]

10: [0000000]->[0003512] ... [0004014]->[0004014]

11: [0000000]->[0004014] ... [0004516]->[0004516]

12: [0000000]->[0004516] ... [0005017]->[0005018]

3:  [0005501]->[0005501] ... [0006003]->[0006003]

5:  [0008024]->[0008024] ... [0008526]->[0008526]

7:  [0008038]->[0008526] ... [0009028]->[0009028]

4:  [0009273]->[0009273] ... [0009775]->[0009775]

6:  [0010377]->[0010377] ... [0010878]->[0010878]

8:  [0010590]->[0010878] ... [0011380]->[0011380]

9:  [0010952]->[0011380] ... [0011882]->[0011882]

12: [0011297]->[0011882] ... [0012384]->[0012384]

11: [0011356]->[0012384] ... [0012886]->[0012886]

10: [0012024]->[0012886] ... [0013387]->[0013388]

3:  [0012874]->[0013388] ... [0013889]->[0013889]

7:  [0014888]->[0014888] ... [0015390]->[0015390]

4:  [0016254]->[0016254] ... [0016756]->[0016756]

5:  [0017646]->[0017646] ... [0018148]->[0018148]

6:  [0019088]->[0019088] ... [0019590]->[0019590]

11: [0020206]->[0020206] ... [0020708]->[0020708]

8:  [0020320]->[0020708] ... [0021210]->[0021210]

10: [0021078]->[0021210] ... [0021712]->[0021712]

12: [0021384]->[0021712] ... [0022213]->[0022213]

7:  [0021630]->[0022213] ... [0022715]->[0022715]

9:  [0021811]->[0022715] ... [0023217]->[0023217]

3:  [0022009]->[0023217] ... [0023719]->[0023719]

 

Динамический пул потоков

Динамический пул потоков не является каким-то специфическим механизмом, продиктованным именно микроядерной архитектурой QNX. Это удачная искусственная конструкция, все определения которой размещены в файле . Удивительно не то, что в составе API QNX имеется такой механизм, а то, что подобные инструменты отсутствуют в других ОС.

В предыдущих примерах кода мы неоднократно создавали наборы потоков для тех или иных целей, но всем им было присуще одно: общее количество потоков в них было фиксированным на момент создания. Это и были статические пулы потоков, разделяющих между собой работу приложения. Архитекторы QNX идут чуть дальше: они предоставляют инструментарий для создания пулов однотипных (с общей функцией потока) потоков, в которых конкретное число потоков может увеличиваться или уменьшаться синхронно с изменением нагрузки на приложение. Именно своим динамическим составом эта конструкция и отличается.

Динамический пул потоков нужен разработчикам QNX в первую очередь как инструмент построения многопоточных менеджеров ресурсов - основы построения сервисов ОС QNX. Но и помимо этой цели динамический пул потоков представляет собой мощнейшее средство для конструирования параллельных механизмов обработки.

Проиллюстрируем применение динамического пула потоков примером программного кода, который был нами описан в книге [4] в главе «Сервер TCP/IP... много серверов хороших и разных». По сути, это ретранслирующий TCP/IP-сервер, но сейчас это для нас неважно:

Сервер на базе динамического пула потоков

#include

#include

static int ls; // прослушивающий TCP-сокет

THREAD_POOL_PARAM_T* alloc(THREAD_POOL_HANDLE_T* h) {

 return (THREAD_POOL_PARAM_T*)h;

}

// функция блокирования пула потоков

THREAD_POOL_PARAM_T* block(THREAD_POOL_PARAM_T* p) {

 int rs = accept(ls, NULL, NULL);

 if (rs < 0) errx("accept error");

 return(THREAD_POOL_PARAM_T*)rs;

}

int handler(THREAD_POOL_PARAM_T* p) {

 retrans((int)p);

 close((int)p);

 delay(250);

 cout << pthread_self() << flush;

 return 0;

}

int main(int argc, char* argv[]) {

 // создать TCP-сокет на порт

 ls = getsocket(THREAD_POOL_PORT);

 // создание атрибутной записи пула потоков:

 thread_pool_attr_t attr;

 memset(&attr, 0, sizeof(thread_pool_attr_t));

 // заполнение блока атрибутов пула

 /* - mm число блокированных потоков в пуле */

 attr.lo_water = 3;

 /* - max число блокированных потоков в пуле */

 attr.hi_water = 7;

 /* - инкремент шага создания потоков */

 attr.increment = 2;

 attr.maximum = 9;

 /* - общий предел числа потоков в пуле */

 attr.handle = dispatch_create();

 attr.context_alloc = alloc;

 attr.block_func = block;

 attr.handler_func = handler;

 // фактическое создание пула потоков:

 void* tpp = thread_pool_create(&attr, POOL_FLAG_USE_SELF);

 if (tpp == NULL) errx("create pool");

 // начало функционирования пула потоков:

 thread_pool_start(tpp);

 // ... выполнение никогда не дойдет до этой точки!

 exit(EXIT_SUCCESS);

}

Примечание

В примере используются, но не определены две функции, которые не столь существенны для понимания примера сточки зрения функционирования пула:

•  errx() — реакция на ошибку выполнения с выводом сообщения и последующим аварийным завершением;

•  retrans() — прием сообщения с присоединенного TCP-сокета с последующей ретрансляцией полученного содержимого в него же.

Итак, первая особенность пула потоков в том, что мы построили многопоточный сервер, почти не прописывая собственного кода, — большую часть рутинной работы за нас сделала библиотека пула.

Приведем описание логики работы пула потоков и показанного примера на самом качественном, простейшем уровне:

• Первоначально (при запуске пула потоков в работу вызовом thread_pool_start()) создается attr.lo_water потоков («нижняя ватерлиния» числа блокированных потоков).

• При создании любого потока (как в процессе начального, так и в процессе последующего создания) вызывается функция attr.соntext_alloc() (в контексте созданного потока).

• По завершении функция вызывает блокирующую функцию потока attr.block_func(), на которой созданный поток ожидает события активизации (в показанном примере событие активизации — это установление соединения новым клиентом по возврату из accept()).

• Блокирующая функция после наступления события активизации переведет поток в состояние READY и вызовет в контексте этого потока функцию обработчика attr.handler_func().

• Если после предыдущего шага число оставшихся заблокированных потоков станет ниже attr.lo_water, механизм пула создаст дополнительно attr.increment потоков и «доведет» их до блокирующей функции.

• Активизированный поток производит всю обработку, предписанную функцией потока, и после выполнения потоковой функции будет опять переведен в блокированное состояние в функции блокирования…

• …но перед переводом потока вновь в блокированное состояние проверяется, не будет ли при этом превышено число блокированных потоков attr.hi_water («верхняя ватерлиния»), и если это имеет место, то поток вместо перевода в блокированное состояние самоуничтожается.

• Все проверки числа потоков производятся для того, чтобы общее число потоков пула (т. e. число активизированных потоков вместе с блокированными) не превышало общее ограничение attr.maximum.

Разобрав общую логику функционирования пула потоков, можно теперь детальнее рассмотреть отдельные шаги всего процесса:

1. Прежде чем создавать пул потоков, мы должны создать атрибутную запись, определяющую все поведение пула. Атрибутная запись описана так ():

typedef struct _thread_pool_attr {

 THREAD_POOL_HANDLE_T* handle;

 THREAD_POOL_PARAM_T*

 (*block_func)(THREAD_POOL_PARAM_T* ctp);

 void (*unblock_func)(THREAD_POOL_PARAM_T* ctp);

 int (*handler_func)(THREAD_POOL_PARAM_T* ctp);

 THREAD_POOL_PARAM_T*

  (*context_alloc)(THREAD_POOL_HANDLE_T* handle);

 void (*context_free)(THREAD_POOL_PARAM_T* ctp);

 pthread_attr_t* attr;

 unsigned short lo_water;

 unsigned short increment;

 unsigned short hi_water;

 unsigned short maximum;

 unsigned reserved[8];

} thread_pool_attr_t;

Дескриптор создаваемого пула потоков handle, посредством которого мы будем ссылаться на пул, является просто синонимом типа dispatch_t:

#ifndef THREAD_POOL_HANDLE_T

 #define THREAD_POOL_HANDLE_T dispatch_t

#endif

Атрибуты потоков, которые будут работать в составе пула, определяются полем attr типа pthread_attr_t (эту структуру мы детально рассматривали ранее при обсуждении создания единичных потоков).

Численные параметры пула определяют:

lo_water — «нижняя ватерлиния», минимальное число потоков пула, находящихся в блокированном состоянии (в ожидании активизации). Если в результате некоторого события один из ожидающих потоков переходит в состояние активной обработки и число оставшихся блокированных потоков становится меньше lo_water, создается дополнительно increment потоков, которые переводятся в блокированное состояние.

hi_water — максимальное число потоков, которые допустимо иметь в блокированном состоянии. Если после завершения обработки некоторым потоком число заблокированных потоков становится больше hi_water, то этот поток уничтожается.

maximum — общая верхняя граница числа потоков пула (активизированных и заблокированных). Даже если число заблокированных потоков (в пике активности) станет ниже lo_water, но общее число потоков уже достигнет maximum, то новые потоки для пула создаваться не будут.

Функциональные параметры пула определяют:

context_alloc() и context_free() — функции создания и уничтожения контекста потока, которые вызываются при создании и уничтожении каждого потока пула. Функция создания контекста потока ответственна за индивидуальные настройки создаваемого потока. Она возвращает «указатель на контекст» типа THREAD_POOL_PARAM_T. Однако системе такой тип неизвестен:

#ifndef THREAD_POOL_PARAM_T

 #define THREAD_POOL_PARAM_T void

#endif

В качестве контекста может использоваться любой пользовательский тип, и он будет передаваться последовательно в качестве параметра (ctp) во все последующие функции обслуживания потока.

block_func() — функция блокирования, которая вызывается в потоке сразу же после context_alloc() или после очередного этапа выполнения потоком функции обработчика handler_func(). Функция блокирования получает и возвращает далее обработчику (возможно, после модификации) структуру контекста (в приведенном выше примере контекстом является int — значение присоединенного TCP-сокета).

handler_func() — это, собственно, и есть аналог потоковой функции, в которой выполняется вся полезная работа потока. Функция вызывается библиотекой после выхода потока из блокирующей функции block_func(), при этом функция-обработчик handler_func() получит параметр контекста, возвращенный block_func().

Примечание

В текущей реализации handler_func() должна возвращать 0; все другие значения зарезервированы для дальнейших расширений. Аналогично определенная в атрибутной записи функция unblock_func() зарезервирована для дальнейших расширений, и вместо ее адреса следует устанавливать NULL .

2. После создания атрибутной записи пула, определяющей всю функциональность его дальнейшего поведения, можно приступать к непосредственному созданию пула потоков:

thread_pool_t* thread_pool_create(

 thread_pool_attr_t* attr, unsigned flags);

где attr — подробно рассмотренная (и созданная) ранее атрибутная запись пула;

flags — флаг, определяющий поведение вызывающего потока после последующего вызова thread_pool_start(). В документации описано два возможных значения флага:

 • POOL_FLAG_EXIT_SELF — после старта пула поток, вызвавший thread_pool_start() (часто это главный поток приложения), завершается;

 • POOL_FLAG_USE_SELF — после старта пула поток, вызвавший thread_pool_start(), включается в пул в качестве одного из его потоков.

И в том и в другом случае в типовом фрагменте (как и в показанном выше примере):

thread_pool_start(tpp);

exit(EXIT_SUCCESS);

управление никогда не дойдет до выполнения exit(). Но существует еще третье допустимое значение, прямо не указанное в документации, но мельком упоминаемое в других местах документации:

 • 0 — после старта пула поток, вызвавший thread_pool_start(), продолжает свое естественное выполнение.

Например, некоторый фрагмент кода мог бы выглядеть так:

thread_pool_attr_t att; // ...

thread_pool_t *tpp = thread_pool_create(&attr, 0);

thread_pool_start(tpp);

while (true) {

 // выполнять некоторую отличную от пула работу

}

exit(EXIT_SUCCESS);

Как уже понятно из описаний, thread_pool_create() возвращает указатель на управляющую структуру пула потоков, которая позже будет передана thread_pool_start(). Если создание пула завершилось неудачей, то результатом выполнения будет NULL, а в errno будет установлен код ошибки (документацией предусмотрен только один код ошибки: ENOMEM — недостаточно памяти для размещения структур данных).

Примечание

Управляющая структура пула потоков описана так:

typedef struct _thread_pool thread_pool_t;

struct _thread_pool {

 thread_pool_attr_t pool_attr;

 unsigned created;

 unsigned waiting;

 unsigned flags;

 unsigned reserved[3];

};

3. Последний шаг в процедуре запуска пула потоков:

int thread_pool_start(void* pool);

где pool — это указатель, возвращаемый thread_pool_create().

При успешном завершении (которого почти никогда не происходит, за исключением значения флага 0; об этом см. выше) функция возвращает EOK, в противном случае (что происходит гораздо чаще) — значение -1.

4. Другие, относящиеся к библиотеке динамического пула потоков функции, которые целесообразно посмотреть в документации QNX (но которые в силу различных обстоятельств используются гораздо реже):

int thread_pool_destroy(thread_pool_t* pool);

int thread_pool_control(thread_pool_t* pool, thread_pool_attr_t* attr,

 _Uint16t lower, _Uint16t upper, unsigned flags);

int thread_pool_limits(thread_pool_t* pool,

 int lowater, int hiwater, int maximum, int increment, unsigned flags);

 

Менеджеры ресурсов

 

QNX вводит технику программирования, которая единообразно проходит сквозь всю систему. Идея техники менеджеров ресурсов столь же проста, сколь и остроумна:

• Вся система построена на тщательно проработанной в теории (и редко используемой при построении реальных ОС) концепции - коммутации сообщений. Ядро (точнее «микроядро») операционной системы при таком подходе выступает в качестве компактного коммутатора сообщений между взаимодействующими программными компонентами. При этом взаимодействующие компоненты выступают в качестве клиента, запрашивающего услугу (ресурс), и сервера, обеспечивающего эту услугу (обслуживающего ресурс).

• Большинство системных вызовов API (в том числе все «привычные» POSIX-вызовы: open(), read(), write(), seek(), close()…) реально посылаются обслуживающему данный ресурс сервису (например, в файловую систему типа FAT32 — fs-dos) в виде сообщений уровня микроядра. Код сообщения при этом определяет тип операции (например, open()), а последующее тело сообщения — конкретные параметры запроса, зависящие от типа операции (параметры запроса пакуются в тело сообщения).

• Раз эта схема столь универсальна, единообразна и не зависит от конкретной природы ресурса, на котором обеспечивается обслуживание, то разработчики QNX предоставляют некоторый шаблон сервера, в котором на месте обработчиков стандартных POSIX-запросов находятся пустые программные заглушки. Этот шаблон и служит базовым элементом построения разнообразных серверов услуг, называемых при выполнении в такой технике «менеджерами ресурса».

• При запуске программа менеджера ресурса регистрирует свое имя (точнее имя управляемого ею ресурса) в пространстве имен файловой системы QNX (обычно в каталоге /dev, но это может быть любое место файловой системы). Теперь можно обращаться с запросами к данному менеджеру так же, как и к любому реальному файлу в файловой системе.

• Программисту, пишущему свой драйвер услуги, ресурса, устройства или псевдоустройства, остается только переопределить программное наполнение тех программных заглушек, которые ответственны за интересующие его вызовы (например, open(), read(), close()), никак не затрагивая вызовы, не обеспечиваемые этим ресурсом (например, write(), seek() и др.).

В наши цели не входит детальное обсуждение техники написания менеджеров ресурсов (этому посвящено специальное исчерпывающее руководство в составе технической документации QNX объемом более 80 страниц). Поэтому, как и ранее с динамическим пулом потоков, начнем с примера. Приведем простейший код менеджера ресурса, который использовался нами для тестирования наследования приоритетов в QNX (файл prior.cc):

Однопоточный менеджер ресурса

#include

#include

#include

#include

#include

#include

#include

#include

#include

// обработчик запроса от клиента read(),

// возвращающий текущий приоритет обслуживания

static int prior_read(resmgr_context_t *ctp, io_read_t *msg,

 RESMGR_OCB_T *ocb) {

 static bool odd = true;

 int status = iofunc_read_verify(ctp, msg, ocb, NULL);

 if (status != EOK) return status;

 if (msg->i.xtype & _IO_XTYPE_MASK != _ID_XTYPE_NONE)

  return ENOSYS;

 if (odd) {

  struct sched_param param;

  sched_getparam(0, ¶m);

  static char rbuf[4];

  sprintf(rbuf, "%d\n", param.sched_curpriority);

  MsgReply(ctp->rcvid, strlen(rbuf) + 1, rbuf, strlen(rbuf) + 1);

 } else MsgReply(ctp->rcvid, EOK, NULL, 0);

 odd = !odd;

 return _RESMGR_NOREPLY;

}

// главная программа запуска менеджера

main(int argc, char **argv) {

 resmgr_attr_t resmgr_attr;

 dispatch_t *dpp;

 dispatch_context_t *ctp;

 int id;

 // инициализация интерфейса диспетчеризации

 if ((dpp = dispatch_create()) == NULL)

  perror("allocate dispatch"), exit(EXIT_FAILURE);

 // инициализация атрибутов менеджера

 memset(&resmgr_attr, 0, sizeof resmgr_attr);

 resmgr_attr.nparts_max = 1;

 resmgr_attr.msg_max_size = 2048;

 // инициализация таблиц функций обработчиков

 static resmgr_connect_funcs_t connect_funcs;

 static resmgr_io_funcs_t io_funcs;

 iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs,

  _RESMGR_IO_NFUNCS, &io_funcs);

 // здесь нами дописан всего один обработчик - операции read,

 // все остальное делается менеджером по умолчанию!

 io_funcs.read = prior_read;

 // инициализация атрибутной структуры, используемой

 // устройством.

 static iofunc_attr_t attr;

 iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0);

 // здесь создается путевое имя для менеджера

 id = resmgr_attach(dpp, &resmgr_attr, "/dev/prior",

  _FTYPE_ANY, 0, &connect_funcs, &io_funcs, &attr);

 if (id == -1)

  perror("attach name"), exit(EXIT_FAILURE);

 ctp = dispatch_context_alloc(dpp);

 // старт менеджера как бесконечный цикл ожидания

 // поступающих сообщений для диспетчеризации:

 while (true) {

  if ((ctp = dispatch_block(ctp)) == NULL)

   perror("block error"), exit(EXIT_FAILURE);

  dispatch_handler(ctp);

 }

}

Здесь использован простейший однопоточный шаблон написания менеджера. Менеджер отрабатывает только одну команду read() (т.e. отрабатывает нестандартно; в целевом коде все остальные команды, например open(), он отрабатывает по умолчанию). По команде read() менеджер: а) возвращает в виде текстовой строки, завершающейся переводом строки, текущий приоритет (помните, что в QNX приоритеты «плавают»?), на котором он обрабатывает запрос, и б) делает это через один запрос, в оставшиеся разы создавая на всякий случай (почему «на всякий», сейчас станет понятно) ситуацию EOF (конца файла). Выполним несколько команд:

# prior &

# ls -l /dev/pr*

nrw-rw-rw- 1 root root 0 Dec 18 17:13 /dev/prior

Все соответствует нашим ожиданиям: менеджер ресурса запущен, он зарегистрировал в пространстве имен свое имя /dev/prior, по которому мы можем к нему обращаться. Теперь выполним обращения к нашему... «устройству». Для этого мы сознательно не станем пользоваться каким-либо специальным клиентом, запрашивающим наш созданный сервис, а воспользуемся самыми заурядными командами UNIX, которые ничего не подозревают о существовании нового сервиса:

# cat /dev/prior

10

# nice -n-5 cat /dev/prior

15

# nice -n-19 cat /dev/prior

29

Вот здесь и проявляется исключительная мощь техники написания менеджера ресурса: созданная минимальными средствами серверная служба «камуфлирует» специфичный QNX-механизм передачи сообщений микроядра под стандартные POSIX-запросы к файловой системе (open(), read() и т.д.), и стандартные команды UNIX «не видят» отличий новой серверной службы от стандартных файлов (устройств) UNIX. Вот для достижения такой полной совместимости с «привычками» команд UNIX и созданы «на всякий случай» те особенности формата, возвращаемого запросами read(), о которых упоминалось выше.

Теперь разработка, например драйвера некоторого специфичного устройства, перемещается из области шаманства «системного программиста» в область деятельности проблемного программиста, да и выполняется привычными высокоуровневыми инструментальными средствами, например С++.

Примечание

Пользуясь случаем, именно здесь уместно на примере созданного менеджера ресурсов продемонстрировать гибкость микроядерной архитектуры и техники менеджера ресурса, а заодно убедиться, что наследование приоритетов (критически важное свойство для систем реального времени) сохраняется при запросе к удаленному менеджеру ресурса, запущенному на другом узле сети (имя узла — rtp ):

# on -frtp prior &

# ls -l /net/rtp/dev/pr*

nrw-rw--rw- 1 root root 0 Dec 18 17.09 /net/rtp/dev/prior

# nice -n-5 cat /net/rtp/dev/prior

15

# nice -n-19 cat /net/rtp/dev/prior

29

 

Многопоточный менеджер

Следующим шагом развития техники менеджера ресурсов является многопоточный менеджер. Фактически это объединение техники менеджера ресурсов с динамическим пулом потоков, рассмотренным выше.

Реальный работающий многопоточный менеджер с сопутствующим ему обстоятельным обсуждением приводился нами в книге [4] в главе «Драйверы». Мы не станем полностью приводить здесь этот достаточно объемный текст, поскольку он отличается от ранее показанного однопоточного менеджера только несколькими строками после вот этого оператора регистрации префикса имени менеджера:

// здесь создается путевое имя для менеджера

id = resmgr_attach(dpp, &resmgr_attr, "/dev/prior",

 _FTYPE_ANY, 0, &connect_funcs, &io_funcs, &attr);

if (id == -1)

 perror("attach name"), exit(EXIT_FAILURE);

Вот те несколько строк, которые, собственно, и превращают однопоточный менеджер в многопоточный:

...

thread_pool_attr_t pool_attr;

memset(&pool_attr, 0, sizeof pool_attr);

pool_attr.handle = dpp;

// это всегда остается так ...:

pool_attr.context_alloc = dispatch_context_alloc;

pool_attr.block_func = dispatch_block;

pool_attr.handler_func = dispatch_handler;

pool_attr.context_free = dispatch_context_free;

// численные параметры пула:

pool_attr.lo_water = 2;

pool_attr.hi_water = 6;

pool_attr.increment = 1;

pool_attr.maximum = 50;

thread_pool_t *tpp;

// флаг создания пула, который может принимать значения:

// POOL_FLAG_EXIT_SELF, POOL_FLAG_USE_SELF или,

// наконец, 0 и который определяет, что будет

// происходить дальше с вызывающим потоком...

if ((tpp = thread_pool_create(&pool_attr, POOL_FLAG_EXIT_SELF)) == NULL)

 perror("create pool"), exit(EXIT_FAILURE);

thread_pool_start(tpp);

...

}

Но всю эту последовательность действий мы уже видели ранее при описании динамического пула потоков, и какого-то специфического отношения к созданию именно менеджера ресурса она не имеет.

Вот такими элементарными манипуляциями мы превращаем менеджер ресурса (практически любой менеджер!) в многопоточный. С другой стороны, простота трансформации одной формы в другую подсказывает простое и эффективное решение: вначале всегда пишите одно- поточный менеджер, поскольку в отладке и понимании он намного проще, и только потом при необходимости трансформируйте его в многопоточный.

 

Множественные каналы

Техника написания менеджеров ресурсов в QNX открывает перспективу для простого и ясного написания драйверов системы без необходимости «залезать» в специфические низкоуровневые детали. Тем не менее в описаниях технологии создания менеджеров ресурсов есть один аспект, который имеет непосредственное отношение к синхронизации параллельных ветвей, и нельзя сказать, что этот вопрос не освещен в технической документации, однако его составляющие детали «размазаны» по документации, и общую картину приходится восстанавливать.

Суть вопроса в следующем. Писать менеджер ресурсов как системный драйвер некоторого специфического аппаратного устройства — это удел единиц (на каждое устройство — по одному разработчику! … шутка), но менеджер ресурсов — это прекрасная альтернатива для описания чисто программных «псевдоустройств». Например, это могла бы быть некоторая оконная GUI-подсистема, в которой open() создает прорисовку окна на экране, write() вписывает некоторый текст в окно, a read() считывает из окна текст, вводимый пользователем (подобная конструкция описывалась нами в главе «Драйверы» [4]). Таким решением мы с минимальными затратами придаем POSIX-функциональность своим совершенно неожиданным программным подсистемам.

Однако для «истинных драйверов» запросы open() — read() — write(): должны, как правило, быть последовательными (право, бессмысленно пытаться писать и читать один файл одновременно из двух потоков)… Это обусловливается тем, что в конечном итоге все функции-обработчики операций менеджера ресурса выходят на единичный экземпляр оборудования, которое должно физически отработать переданный ему запрос.

Гораздо свободнее может себя чувствовать разработчик драйвера псевдоустройства (программной модели): здесь каждый запрос open() (будь то из одного последовательного потока, различных потоков процесса или даже из потоков, принадлежащих разным процессам) может порождать новый экземпляр псевдоустройства. Возвращаемый им файловый дескриптор (в QNX это дескриптор соединения) станет ссылаться на порожденный экземпляр, а вызовы read() — write(), оперирующие с различным дескриптором, будут направляться соответствующим различным экземплярам. (Понятно, что такой параллелизм операций может обеспечить только многопоточный менеджер ресурса, но нужно еще «заставить» его сделать это.)

Это настолько часто используемая модель, что она заслуживает отдельного рассмотрения. Дополнительную сложность создает то обстоятельство, что мы, как уже отмечалось, договорились писать программный код на С++, а здесь нам предстоит переопределять из своего кода определения в заголовочных файлах менеджера ресурсов, не нарушая их C-синтаксис.

Ниже показан текст простейшего многопоточного менеджера (исключены даже самые необходимые проверки), ретранслирующего по нескольким каналам независимо получаемые текстовые строки (строки кода, принципиальные для обеспечения параллельности и многоканальности, выделены жирным шрифтом):

Подмена стандартного Open Control Block

// предшествующие общие строки #include не показаны

// это переопределение нужно для исключения предупреждений

// компилятора: 'THREAD_POOL_PARAM_T' redefined

#define THREAD_POOL_PARAM_T dispatch_context_t #include

// следующее переопределение принципиально важно.

// оно предписывает вместо стандартного блока OCB (open control block),

// создаваемого вызовом клиента open() и соответствующего его файловому

// дескриптору, использовать собственную структуру данных.

// Эта структура должна быть производной от стандартной

// iofunc_ocb_t, а определение должно предшествовать

// включению

#define IOFUNC_OCB_T struct ownocb

#include

class ownocb public iofunc_ocb_t {

 static const int BUFSIZE = 1024;

public:

 char *buf;

 ownocb(void) { buf = new char[BUFSIZE]; }

 ~ownocb(void) { delete buf; }

};

IOFUNC_OCB_T *ownocb_calloc(resmgr_context_t *ctp, IOFUNC_ATTR_T *device) {

 return new ownocb;

}

void ownocb_free(IOFUNC_OCB_T *o) { delete o; }

iofunc_funcs_t ownocb_funcs = {

 _IOFUNC_NFUNCS, ownocb_calloc, ownocb_free

};

iofunc_mount_t mountpoint = { 0, 0, 0, 0, &ownocb_funcs };

// Вместо умалчиваемой операции iofunc_lock_ocb_default(),

// вызываемой перед началом обработки запросов чтения/записи

// и блокирующей атрибутную запись, мы предписываем вызывать

// "пустую" операцию и не блокировать атрибутную запись,

// чем обеспечиваем параллелизм.

static int nolock(resmgr_context_t *ctp, void *v, IOFUNC_OCB_T *ocb) {

 return EOK;

}

// обработчик запроса чтения

static int line_read(resmgr_context_t *ctp, io_read_t *msg,

 IOFUNC_OCB_T *ocb) {

 if (strlen(ocb->buf) != 0) {

  MsgReply(ctp->rcvid, strlen(ocb->buf) + 1, ocb->buf, strlen(ocb->buf) + 1);

  strcpy(ocb->buf, "");

 } else MsgReply(ctp->rcvid, EOK, NULL, 0);

 return _RESMGR_NOREPLY;

}

// обработчик запроса записи

static int line_write(resmgr_context_t *ctp, io_write_t *msg,

 IOFUNC_OCB_T *ocb) {

 resmgr_msgread(ctp, ocb->buf, msg->i.nbytes, sizeof(msg->i));

 _IO_SET_WRITE_NBYTES(ctp, msg->i.nbytes);

 return EOK;

}

// имя, под которым регистрируется менеджер:

const char sResName[_POSIX_PATH_MAX + 1] = "/dev/wmng";

// старт менеджера ресурса

static void StartResMng(void) {

 dispatch_t* dpp;

 if ((dpp = dispatch_create()) == NULL)

  perror("dispatch create"), exit(EXIT_FAILURE);

 resmgr_attr_t resmgr_attr;

 memset(&resmgr_attr, 0, sizeof resmgr_attr);

 resmgr_attr.nparts_max = 1;

 resmgr_attr.msg_max_size = 2048;

 // статичность 3-х последующих описаний принципиально важна!

 // (также они могут быть сделаны глобальными переменными файла):

 static resmgr_connect_funcs_t connect_funcs;

 static resmgr_io_funcs_t io_funcs;

 static iofunc_attr_t attr;

 iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs,

  _RESMGR_IO_NFUNCS, &io_funcs);

 // переопределение обработчиков по умолчанию

 io_funcs.read = line_read;

 io_funcs.write = line_write;

 io_funcs.lock_ocb = nolock;

 iofunc_attr_init(&attr, S_IFNAM | 0666, NULL, NULL);

 // через это поле осуществляется связь с новой

 // структурой OCB.

 attr.mount = &mountpoint;

 if (resmgr_attach(dpp, &resmgr_attr, sResName, _FTYPE_ANY, 0,

  &connect_funcs, &io_funcs, &attr) == -1)

  perror("name attach"), exit(EXIT_FAILURE);

 // создание пула потоков (многопоточность)

 thread_pool_attr_t pool_attr;

 memset(&pool_attr, 0, sizeof pool_attr);

 pool_attr.handle = dpp;

 pool_attr.context_alloc = dispatch_context_alloc;

 pool_attr.block_func = dispatch_block;

 pool_attr.handler_func = dispatch_handler;

 pool_attr.context_free = dispatch_context_free;

 pool_attr.lo_water = 2;

 pool_attr.hi_water = 6;

 pool_attr.increment = 1;

 pool_attr.maximum = 50;

 thread_pool_t* tpp;

 if ((tpp = thread_pool_create(&pool_attr, POOL_FLAG_EXIT_SELF)) == NULL)

  perror("pool create"), exit(EXIT_FAILURE);

 thread_pool_start(tpp);

 // к этой точке return управление уже никогда не подойдет...

}

int main(int argc, char *argv[]) {

 // проверка, не загружен ли ранее экземпляр менеджера,

 // 2 экземпляра нам ни к чему...

 char sDirName[_POSIX_NAME_MAX + 1];

 int nDirLen = strrchr((const char*)sResName, '/') - (char*)sResName;

 strncpy(sDirName, sResName, nDirLen);

 sDirName[nDirLen] = '\0';

 DIR *dirp = opendir(sDirName);

 if (dirp == NULL)

  perror("directory not found"), exit(EXIT_FAILURE);

 struct dirent *direntp;

 while (true) {

  if ((direntp = readdir(dirp)) == NULL) break;

  if (strcmp(direntp->d_name, strrchr(sResName, '/') + 1) == 0)

   cout << "second copy of manager" << endl, exit(EXIT_FAILURE);

 }

 closedir(dirp);

 // старт менеджера

 StartResMng();

 // ... к этой точке мы уже никогда не подойдем...

 exit(EXIT_SUCCESS);

}

В отличие от типового и привычного шаблона многопоточного менеджера, мы проделали здесь дополнительно следующее:

• Определили собственную структуру OCB, новый экземпляр которой должен создаваться для каждого нового подключающегося клиента:

class ownocb : public iofunc_ocb_t { ... };

• Переопределили описание структуры OCB, используемое библиотеками менеджера ресурсов:

#define IOFUNC_OCB_T struct ownocb

• Заполняя атрибутную запись устройства:

attr.mount = &mountpoint;

мы к точке монтирования «привязываем» функции создания и уничтожения вновь определенной структуры OCB (по умолчанию библиотека менеджера станет размещать только стандартный OCB):

iofunc_funcs_t ownocb_funcs = {

 _IOFUNC_NFUNCS, ownocb_calloc, ownocb_free

};

iofunc_mount_t mountpoint = { 0, 0, 0, 0, &ownocb_funcs };

(_IOFUNC_NFUNCS — это просто константа, определяющая число функций и равная 2.)

• Определяем собственные функции размещения и уничтожения структуры OCB с прототипами:

IOFUNC_OCB_T* ownocb_calloc(resmgr_context_t*, IOFUNC_ATTR_T*);

void ownocb_free(IOFUNC_OCB_T *o);

В нашем случае это: а) интерфейс из C-понятия «создать-удалить», в C++ — «конструктор-деструктор» и б) именно здесь создается и инициализируется сколь угодно сложная структура экземпляра OCB.

• В функциях обработки запросов клиента (операций менеджера) мы позже будем в качестве 3-го параметра вызова обработчика получать указатель именно того экземпляра, для которого требуется выполнить операцию, например:

int read(resmgr_context_t*, io_read_t*, IOFUNC_OCB_T*) {...}

Дополнительно мы проделываем еще один трюк, запрещая менеджеру блокировать атрибутную запись устройства при выполнении операций (что он делает по умолчанию; для реальных устройств это резонно, но для программного псевдоустройства это, как правило, не является необходимым). Для этого:

• В таблице операций ввода/вывода переназначаем функцию-обработчик операции блокирования атрибутной записи:

io_funcs.lock_ocb = nolock;

• В качестве такого обработчика предлагаем «пустую» операцию:

static int nolock(resmgr_context_t*, void*, IOFUNC_OCB_T*) {

 return EOK;

}

Запустим менеджер и проверим, как происходит его установка в системе:

/dev # ls -l /dev/w*

nrw-rw-rw- 1 root root 0 Nov 09 23:17 /dev/wmng

Теперь подготовим простейший клиент:

void main(int argc, char *argv[]) {

 char sResName[_POSIX_PATH_MAX + 1] = "/dev/wmng";

 if (argc > 1) strcpy(sResName, argv[1]);

 int df = open(sResName, O_RDWR | O_NONBLOCK);

 if (df < 0)

  perror("device open"), exit(EXIT_FAILURE);

 cout << open << sResName

  << " , desc. = " << df << endl;

 char ibuf[2048], obuf[2048];

 int r, w;

 while (true) {

  if ((r = read(df, obuf, sizeof(obuf))) < 0) break;

  cout << '#' << obuf << endl; cout << '>' << flush;

  cin >> ibuf;

  if (( w = write(df, ibuf, strlen(ibuf) + 1)) <= 0) break;

 }

 if (r < 0) perror("read error");

 if (w <= 0) perror("write error");

 exit(EXIT_FAILURE);

}

Запустим одновременно 2 экземпляра клиента (их, собственно, может быть сколь угодно много) и убедимся, что каждый из клиентов работает со своей отдельной копией структур данных внутри процесса менеджера ресурса:

# wmclient

open /dev/wmng , desc. = 3 #

>1234

#1234

>54321

#54321

>

# wmclient

open /dev/wmng , desc. = 3

#

>qwerty

#qwerty

>asdf

#asdf >

Отчетливо видно, что каждый клиент с получением своего файлового дескриптора (реально это дескриптор соединения) получает и свой экземпляр данных.

Полную параллельность и независимость обращений (например, возможность выполнения read() в то время, когда менеджер занят выполнением read() от другого клиента) к данному псевдоустройству отследить сложнее. Для этого в код обработчиков операций чтения/записи следует внести ощутимую задержку (например, sleep() или delay()) и воздействовать достаточно плотным потоком запросов со стороны нескольких клиентов. Такие эксперименты показывают полную независимость операций по разным файловым дескрипторам, что обеспечивается переопределением обработчика по умолчанию — iofunc_lock_ocb_default().

 

Сообщения или менеджер?

 

Этот вопрос возникает (должен возникать!) у каждого, кто приступает к разработке реального проекта, особенно если функциональность проекта распределяется между несколькими автономными процессами. Такая структуризация и вовсе не привычна разработчикам, приходящим из мира Windows. Для UNIX создание проектов, в которых порождается несколько процессов, такая структуризация уже гораздо органичнее, но и там это чаще всего лишь клонирование образа единого серверного процесса посредством fork(). QNX предоставляет возможность идти еще дальше в построении приложений, представленных (разделенных) как группа разнородных взаимодействующих процессов:

• Уже полученные нами ранее тестовые результаты времени диспетчеризации и переключений контекстов (пусть даже они и сделаны бегло, только в качестве оценочных ориентиров) показывают, что представления приложения в качестве единого, монолитного процесса или процесса, содержащего группу потоков, либо просто разбиение приложения на группу процессов по производительности если и не эквивалентны, то крайне близки. Этот фактор не должен быть определяющим, и при структурировании приложения следует руководствоваться целесообразностью и удобством.

• Процессы QNX сохраняют все качества таковых и в UNIX вообще: они являются изолированными сущностями, которые взаимодействуют, если это необходимо, используя достаточно тяжеловесные (расточительные) механизмы IPC. Собственно, в этом и ценность процессов с их изолированными адресными пространствами — это механизм обеспечения высокой надежности и живучести приложений. Но QNX, не сужая спектр общепринятых IPC-механизмов, привносит совершенно новый «слой» инструментария взаимодействия — обмен сообщениями микроядра. При этом «проницаемость» процессов как отдельных клеток живого организма становится много выше, нисколько не снижая их «защищенности».

Но у нас есть две принципиально различные альтернативы для выражения этого «слоя» взаимодействий в своем программном коде: базовый механизм обмена сообщениями (низкоуровневая техника, известная еще со времен QNX 4.X) и механизм менеджера ресурса. Делать выбор между ними приходится на этапе раннего эскизного проектирования системы, поскольку перестраивать систему с одного механизма на другой в ходе развития — достаточно трудоемкий процесс, который может потребовать пересмотра и архитектурных основ развиваемого проекта.

Поэтому, приступая к проектированию нового проекта, мы должны априорно, до начала фактической разработки, отчетливо представлять, что выигрываем и что проигрываем, используя тот или иной механизм реализации обмена сообщениями.

 

Две стороны единого механизма

 

При рассмотрении базовой для QNX (собственно, для всех микроядерных ОС) техники обмена сообщениями в сравнении с технологией написания менеджеров ресурсов не покидает ощущение поразительной схожести происходящих в обоих случаях процессов. В этом нет ничего удивительного, поскольку инструментарий менеджеров ресурсов — это только система внешних «оберток» над базовым механизмом обмена сообщениями.

Для эффективного применения той или иной альтернативной технологии мы должны иметь возможность проанализировать многие сравнительные показатели выбираемого инструментария: простота, гибкость, эффективность, трудоемкость реализации, возможности внесения изменений при развитии проекта и на этапе его последующего сопровождения. Этим мы и займемся в оставшейся части главы.

 

Простота и трудоемкость

Механизм прямого обмена сообщениями крайне просто выражается в программном коде. Когда достигнута полная ясность в значениях адресных параметров обмена, необходимо всего лишь несколько операторов, чтобы заставить все это «крутиться».

Со стороны сервера, например, это выглядит так:

int chid = ChannelCreate(0);

...

while (true) {

 struct _msg_info info;

 int rcvid = MsgReceive(chid, &bufin, sizeof(bufin), &info);

 if (rcvid < 0) exit(EXIT_FAILURE);

 if (MsgReply(rcvid, EOK, &bufou, sizeof(bufou) < 0) exit(EXIT_FAILURE);

}

Co стороны клиента:

int coid = ConnectAttach(node, pid, chid, _NTO_SIDE_CHANNEL, 0);

if (coid < 0) exit(EXIT_FAILURE);

...

while(...)

 if (MsgSend(coid, &bufou, sizeof(bufou), &bufin, sizeof(bufin)) == -1)

  exit(EXIT_FAILURE);

}

Код для реализации того же обмена, но организованного как менеджер ресурса, будет как минимум в несколько раз объемнее (образцы менеджеров мы уже видели ранее по тексту). Кроме того, по большей части он будет состоять из заполнения полей некоторых внутренних структур, используемых библиотеками менеджера ресурсов или пула потоков. На первый поверхностный взгляд такой код маловразумителен.

С другой стороны, весь достаточно объемный код любого менеджера ресурса — это очередное повторение одного и того же общего шаблона для написания менеджеров. При некоторых минимальных навыках написание самых замысловатых менеджеров ресурсов становится совершенно рутинным занятием, не превышающим по трудоемкости написание простого обмена сообщениями. Большим подспорьем здесь является наличие в комплекте технической документации QNX огромного (более 80 страниц) раздела, исчерпывающе описывающего технику создания менеджеров ресурсов; по качеству и скрупулезности изложения это одна из лучших частей всей технической документации.

 

Гибкость и мобильность

При установлении соединения техника простого обмена сообщениями в качестве адресата сообщений (сервера) использует «магическую тройку» (триплет [1]) параметров ND, PID и CHID, где:

• ND — дескриптор сетевого узла, на котором работает интересующая нас программа-сервер (узел, на который надо отсылать сообщение);

• PID — PID процесса этой программы на своем сетевом узле (кому отсылать сообщение);

• CHID — номер канала, который открыла эта программа для приема сообщений данного вида.

В этой адресации, пожалуй, и кроется самая главная причина негибкости механизма обмена сообщениями. Дескриптор сетевого узла nd, значение которого, кроме того, способно самопроизвольно изменяться в сети с течением времени, мы можем установить по сетевому имени интересующего нас хоста, используя netmgr_strtond(). (Это действие по своей сути избыточное, дополнительный уровень косвенности, так как первичным идентификатором узла для пользователя приложения является его имя, а не дескриптор.)

Гораздо хуже дело обстоит с pid и chid, особенно для процесса, выполняющегося на удаленном сетевом узле. Не существует в общем виде прямого способа установить PID удаленного процесса, а тем более номер канала, который открыл этот процесс для обмена (или вообще не открывал, если мы, например, ошиблись в определении его PID). И тогда на помощь приходят некоторые искусственные приемы, построенные либо на использовании некоторых иерархических (родительский-дочерний) соотношений процессов клиента и сервера, либо на системах совершенно условных договоренностей (произвольных и варьирующихся от случая к случаю).

Р. Кертен [1] отмечает, что существует множество способов нахождения этой адресной триады, и перечисляет некоторые из них:

1. Открыть файл с известным именем и сохранить в нем ND/PID/CHID…

2. Использовать для объявления идентификаторов ND/PID/CHID глобальные переменные программы…

3. Занять часть пространства имен путей и стать администратором ресурсов.

Не вдаваясь в подробный анализ (вы это можете сделать сами), отметим, что 1-й способ — крайне искусственный и негибкий (особенно в сетевой среде), 2-й — крайне ограничен и применим лишь к узкому кругу задач, а 3-й способ подводит нас к применению совсем другой, альтернативной технологии с используемыми ею принципами адресации.

Несколько, безусловно, интересных и заслуживающих внимания вариаций на тему техники обмена сообщениями предлагает В. Зайцев в приложении, которое следует за данной главой.

Пользуясь случаем, внесем и мы свою лепту в «копилку» способов вычисления адресной триады и увязывания клиента с соответствующим сервером. В тех нечастых случаях, когда требуется обеспечить максимально возможную плотность потока обмена (об этом см. далее), а информационный канал желательно создать на базе именно прямого обмена сообщениями, мы предлагаем оформлять серверный процесс одновременно и как специальный менеджер ресурса, и как канал прямого обмена сообщениями. При этом клиент, пользуясь адресацией пути к менеджеру, запрашивает его по read() или devctl(), на которые менеджер возвращает свой PID и открытый для прямого обмена дополнительный CHID. На этом функции менеджера заканчиваются, а весь информационный обмен далее идет обменом сообщений через указанный канал. Полный текст такого сервера будет показан в примере позже.

Теперь обратимся к технологии менеджера ресурсов. В этой технике менеджер регистрирует в пространстве имен (в файловой системе) уникальное имя, по которому клиенты, заинтересованные в его ресурсе, будут адресоваться к менеджеру. Идея не нова для мира UNIX (каталоги файловой системы /proc или /dev, как правило, вообще не содержат реальных файлов), и она находит все более последовательное расширение в новых разработках операционных систем, отталкивающихся от UNIX, например Plan9 или Inferno.

Техника менеджера ресурса вводит дополнительный уровень разрешения имен, который реализуется через менеджер процессов procnto (как это происходит, подробно и на примерах описывается в [1]). Происходящее при выполнении POSIX-оператора:

int fd = open("/net/host/dev/srv", O_RDONLY);

по внутреннему содержанию в точности соответствует тому, что происходит в процессе организации обмена сообщениями при выполнении:

int coid = ConnectAttach(node, pid, chid, 0, 0);

и может даже при определенных обстоятельствах возвратить то же значение и уж по крайней мере всегда возвращает значение той же природы, хотя мы и говорим по привычке в первом случае «файловый дескриптор», а во втором - «идентификатор соединения». Здесь отчетливо видна подмена адресной триады node, pid и chid именем пути /net/host/dev/srv.

Модель адресации менеджера ресурса в QNX, конечно, намного более универсальна, гибка и мобильна, нежели модель прямого обмена сообщениями. Например, можно написать сервер, который при запуске воспринимал бы полное имя, под которым он будет регистрироваться в пространстве имен, например (пусть даже некоторые варианты и сомнительны в своей осмысленности):

# server -n /dev/srv

# server -n /proc/srv

# server -n /fs/srv

Можно запустить несколько экземпляров такого сервера, возможно модифицированных использованием других ключей запуска:

# server -n /dev/srv1

# server -n /dev/srv2

Наконец, можно сделать это не только на своем локальном узле сети, но и на других сетевых узлах:

# on -f host1 server -n /dev/srv1

# on -f host1 server -n /dev/srv2

# on -f host2 server -n /dev/srv1

# on -f host2 server -n /dev/srv1

Теперь, если наш клиент выполнен так, что позволяет при запуске указать имя сервера, который он должен использовать, мы можем применить такой клиент для работы с самыми различными экземплярами серверов, где бы они ни находились в сети, например:

# client -s /dev/srv1

# client -s /net/host2/dev/srv1

Полный исходный код такой реализации будет показан в примере, к рассмотрению которого мы перейдем после завершения этого раздела.

В чем еще состоит различие, которое можно отнести к категории гибкости механизмов?

В краткой схеме, показанной кодом предыдущего раздела, вызовом:

MsgSend(coid, &bufou, sizeof(bufou), &bufin, sizeof(bufin));

может быть послано сообщение произвольной (в пределах абсолютных ограничений) длины (sizeof(bufou)). Это сообщение (с информацией о его фактической длине) будет принято сервером, который в свою очередь может ответить сообщением произвольной длины, которое и будет доставлено клиенту в ответ на оператор MsgSend().

При обмене с менеджером ресурсов, в силу необходимости приведения клиентских запросов в «прокрустово ложе» POSIX, картина принципиально другая: каждый запрос может оперировать только с данными той длины, которая предопределена стандартом.

1. Команды группы read() могут передать в направлении сервера только код команды, уточненный параметрами (например, длина запрашиваемых данных), но не данные. В ответ сервер может передать клиенту данные произвольной длины. Обмен данными однонаправленный, в направлении от сервера к клиенту.

2. Команды группы write() могут передать от клиента к серверу данные произвольной длины, но в ответ сервер может возвратить только код результата - число байт, фактически успешно полученных в результате операции. Обмен данными однонаправленный, в направлении от клиенту к серверу.

3. Команда devctl(), использующаяся обычно для организации канала управления (но это не обязательно), в зависимости от кода команды может передавать данные либо к серверу (подобно write()), либо от сервера (подобно read()), либо в обоих направлениях за один обмен. Таким образом, этой командой может быть организован двунаправленный обмен. Вообще говоря, принято считать, что по devctl() передаются данные фиксированной длины: длина передаваемого блока данных определяется непосредственно кодом команды. Но это не является серьезным ограничением: мы можем динамически формировать код команды перед обменом исходя из объема данных, подлежащих передаче (как это будет показано в примере следующего раздела). Такой трюк позволяет организовать обмен данными произвольной длины. Ограничение здесь состоит в другом: объемы данных, передаваемые по devctl() в обоих направлениях, должны быть равны! А это, согласитесь, не совсем то, что мы видели при простом обмене сообщениями.

4. Наконец, последним вариантом обмена с менеджером ресурса является обмен «сырыми», неформатированными сообщениями. Но это уже вариация простого обмена сообщениями, а как ее реализовать в коде, показано в приложении В. Зайцева.

С другой стороны, такая повышенная гибкость простого обмена сообщениями в отношении размеров передаваемых данных — предмет потенциальных ошибок, в то время как регламентируемое POSIX поведение обменных функций несет в себе дополнительный контроль корректности.

 

Эффективность реализации

Если техника менеджеров ресурсов — это только надстройка над базовым механизмом обмена сообщениями, то возникает совершенно естественный вопрос: какова же плата за использование этого производительного и «комфортного» механизма?

Для анализа «скоростных» характеристик альтернативных механизмов обмена сообщениями создадим группу приложений (клиентские и сервер, файлы cli.cc, clr.cc и srv.cc), а чтобы отдельно не выписывать определения, используемые приложениями, вынесем их в отдельный файл определений (файл common.h).

Общие определения проекта

const char VERSION[] = "vers. 1.03";

// имя, под которым будет регистрироваться в пространстве

// имен наш тестовый менеджер ресурса

static const char DEVNAME[_POSIX_PATH_MAX] = "/dev/srr";

// "базовая часть" команды devctl(), конкретный код команды будет

// формироваться динамически на основе этой части, но исходя

// из фактической длины блока передаваемых данных

const unsigned int DCMD_CMD = 1,

 DCMD_SRR = _POSIX_DEVDIR_TOFROM + (_DCMD_NET << 8) + DCMD_CMD;

// структура ответов менеджера ресурса по запросу read()

struct result {

 pid_t pid;

 int chid;

 uint64_t cps;

 result(void) {

  pid = getpid();

  // если уж возвращать, то и служебную информацию ;)

  cps = SYSPAGE_ENTRY(qtime)->cycles_per_sec;

 }

}

// завершение с извещением кода причины

inline void exit(const char *msg) {

 cout << '\r';

 perror(msg);

 exit(EXIT_FAILURE);

}

В этой части каких-либо комментариев заслуживает разве что структура result. Наш сервер устроен «наоборот»: информационный обмен данными он осуществляет по запросу devctl(), запрос read() нам «не нужен», и мы используем его только для возврата информации (PID + CHID) об автономном канале обмена сообщениями. Заодно мы передаем в поле cps этой структуры значение тактовой частоты процессора сервера, что позволяет знать, «с кем мы имеем дело» при экспериментах в сети.

Теперь мы вполне готовы написать код сервера. Этот сервер (файл srv.cc), в отличие от традиционных, имеет два независимых канала подключения: как менеджер ресурсов и как сервер простого обмена сообщениями. Как менеджер он по запросу read() возвращает адресные компоненты (PID, CHID) для обмена сообщениями (ND клиент должен восстановить самостоятельно). По запросу devctl(), а также по запросу по автономному каналу обмена сообщениями сервер просто ретранслирует обратно полученный от клиента блок данных (в каком-то смысле по обоим каналам обмена наш сервер является «зеркалом», отражающим данные).

Сервер запросов

result data;

//---------------------------------------------------------

// реализация обработчика read()

static int readfunc(resmgr_context_t *ctp, io_read_t *msg,

 iofunc_ocb_t *ocb) {

 int sts = iofunc_read_verify(ctp, msg, ocb, NULL);

 if (sts != EOK) return sts;

 // возвращать одни и те же статические данные...

 MsgReply(ctp->rcvid, sizeof(result), &data, sizeof(result));

 return _RESMGR_NOREPLY;

}

//---------------------------------------------------------

// реализация обработчика devctl.

static int devctlfunc(resmgr_context_t *ctp, io_devctl_t *msg,

 iofunc_ocb_t *ocb) {

 int sts = iofunc_devctl_default(ctp, msg, ocb);

 if (sts != _RESMGR_DEFAULT) return sts;

 // разбор динамически создаваемого кода devctl(),

 // извлечение из него длины принятого блока

 unsigned int nbytes = (msg->i.dcmd - DCMD_SRR) >> 16;

 msg->o.nbytes = nbytes;

 // и тут же ретрансляция блока назад

 return _RESMGR_PTR(ctp, &msg->i, sizeof(msg->i) + nbytes);

}

//---------------------------------------------------------

// установка однопоточного менеджера, выполняемая

// в отдельном потоке

static void* install(void* data) {

 dispatch_t *dpp;

 if ((dpp = dispatch_create()) == NULL)

  exit("dispatch allocate");

 resmgr_attr_t resmgr_attr;

 memset(&resmgr_attr, 0, sizeof(resmgr_attr));

 resmgr_attr.nparts_max = 1;

 resmgr_attr.msg_max_size = 2048;

 static resmgr_connect_funcs_t connect_funcs;

 static resmgr_io_funcs_t io_funcs;

 iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs,

  _RESMGR_IO_NFUNCS, &io_funcs);

 // определяем обработку, отличную от обработки по умолчанию,

 // только для двух команд: read() & devctl()

 io_funcs.read = &readfunc;

 io_funcs.devctl = &devctlfunc;

 static iofunc_attr_t attr;

 iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0);

 // связываем менеджер с его префиксным именем

 if (resmgr_attach(dpp, &resmgr_attr, DEVNAME,

  _FTYPE_ANY, 0, &connect_funcs, &io_funcs, &attr) == -1)

  exit("prefix attach");

 dispatch_context_t* ctp = dispatch_context_alloc(dpp);

 while (true) {

  if ((ctp = dispatch_block(ctp)) == NULL)

   exit("block error");

  dispatch_handler(ctp);

 }

}

// размер буфера для обмена сообщениями,

// этого нам хватит с большим запасом и надолго ;)

const int blk = 100000;

// обработчик низкоуровневых сообщений,

// также работающий в отдельном потоке

void* msginout(void* с) {

 static uint8_t bufin[blk];

 struct _msg_info info;

 while (true) {

  int rcvid = MsgReceive(data chid, &bufin, blk, &info);

  if (rcvid < 0) exit("message receive");

  if (MsgReply(rcvid, EOK, &bufin, info.msglen) < 0)

  exit("message reply");

 }

}

//--------------------------------------------------------

// "пустой" обработчик реакции на ^C (сигнал SIGINT)

inline static void empty(int signo) {}

//--------------------------------------------------------

// главная программа, которая все это "хозяйство" установит

// и будет безропотно ждать завершения по ^C ;)

int main(int argc, char *argv[]) {

 cout << "SRR server: " << VERSION << endl;

 // открывается менеджер ресурса ...

 int fd = open(DEVNAME, O_RDONLY);

 // если менеджер открылся, то это нам не нужно -

 // дубликаты не создавать!

 if (fd > 0)

  close(fd), cout << "already in use " << DEVNAME << endl, exit(EXIT_FAILURE);

 // перехватываем реакцию ^C:

 cout << ". . . . . . waiting ^C. . . . . ." << flush;

 signal(SIGINT, empty);

 // создается канал для обмена низкоуровневыми сообщениями

 data.chid = ChannelCreate(0);

 // и запускается отдельным потоком ретранслятор с этого канала

 if (pthread_create(NULL, NULL, msginout, NULL) != EOK)

  exit("message thread");

 // запускается менеджер ресурса

 if (pthread_create(NULL, NULL, install, NULL) != EOK)

  exit("manager thread");

 // ... все! Мы свое дело сделали и ожидаем ^C ...

 pause();

 cout << "\rFinalization... " << endl;

 // ... очистка, завершение ...

 ChannelDestroy(data.chid);

 return EXIT_SUCCESS;

}

Первая клиентская программа (файл cli.cc) посылает серверу блок данных указанной длины (длина может изменяться в широких пределах указанием при запуске ключа -b) и ожидает от него ретрансляции, после чего замеряет время ответа от сервера. Этот процесс повторяется многократно (ключ -m).

Первый клиентский процесс

#include "common.h"

static uint64_t *tim;

static int num = 10;

// вывод результатов с оценкой статистики: среднее, С.К.О...

static void outtim(void) {

 double m = 0., s = 0.;

 for (int i = 0; i < num; i++) {

  double d = (double)tim[i];

  m += d;

  s += d * d;

 }

 m /= num;

 s = sqrt(s / num - m * m);

 cout << '\t' << (uint64_t)floor(m + 5) << "\t~" << (uint64_t)floor(s + .5) <<

  "\t{" << (uint64_t)floor(s / m * 100 + .5) << "%}" << endl;

}

int main(int argc, char **argv) {

 cout << "SRR client: " << VERSION << endl;

 int opt, val;

 unsigned int blk = 100;

 char PATH[_POSIX_PATH_MAX] = "";

 while ((opt = getopt(argc, argv, "n:b:m:")) != -1) {

  switch (opt) {

  case 'n': // имя хоста сервера

   strcpy(PATH, "/net/");

   strcat(PATH, optarg);

   break;

  case 'b': // размер блока обмена (байт)

   if (sscanf(optarg, "%i", &blk) != 1)

    exit("parse command line failed");

   break;

  case 'm': // число повторений таких блоков

   if (sscanf(optarg, "%i", &num) != 1)

    exit("parse command line failed");

   break;

  default:

   exit(EXIT_FAILURE);

  }

 }

 // "составить" полное имя менеджера

 strcat(PATH, DEVNAME);

 cout << "server path. " << PATH << ", block size = "

  << blk << " bytes, repeat = " << num << endl;

 // при инициализации мы сразу получаем скорость процессора клиента

 result data;

 cout << "CPU speed [с.p.s ]: client = " << data.cps;

 // пытаемся подключиться к серверу-менеджеру

 int fd = open(PATH, O_RDONLY);

 if (fd < 0) exit("server not found");

 // читаем его параметры

 if (read(fd, &data, sizeof(result)) == -1)

  exit("parameter block read");

 cout << ", server = " << data.cps << endl;

 tim = new uint64_t[num];

 uint64_t tim2;

 uint8_t *bufin = new uint8_t[blk];

 *bufou = new uint8_t[blk];

 // определяем дескриптор сетевого узла

 int32_t node = netmgr_strtond(PATH, NULL);

 // это интересное место: если в имени нет сетевого префикса пути,

 // но это имя удается открыть, то это локальный хост!

 if (node == -1 && fd > 0 && errno == ENOENT)

  node = ND_LOCAL_NODE;

 // по адресным данным, полученным ранее по read(), создаем канал

 // для прямого обмена сообщениями с тем же процессом:

 int coid = ConnectAttach(node, data.pid, data.chid, _NTO_SIDE_CHANNEL, 0);

 if (coid < 0) exit("connect to message channel");

 cout << " - message exchange:" << flush;

 // обмен по каналу низкоуровневых сообщений

 for (int i = 0; i < num; i++) {

  tim[i] = ClockCycles();

  if (MsgSend(coid, bufou, blk, bufin, blk) == -1)

   exit("exchange data with channel");

  tim[i] = ClockCycles() - tim[i];

 }

 outtim();

 ConnectDetach(coid);

 // повторяем в точности тот же обмен, но по запросу devctl()

 unsigned int DCTL = (blk<<16) + DCMD_SRR;

 cout << "- manager exchange:" << flush;

 for (int i = 0; i < num; i++) {

  tim[i] = ClockCycles();

  if (devctl(fd, DCTL, bufou, blk, NULL) ! = EOK)

   exit("DEVCTL error");

  tim[i] = ClockCycles() - tim[i];

 }

 outtim();

 close(fd);

 delete [] bufin;

 delete [] bufou;

 delete [] tim;

 return EXIT_SUCCESS;

}

Смотрим локальные результаты исполнения и оценки, которые дает нам эта клиентская программа (знаком отмечено С.К.О. предшествующего ему в выводе значения измеренной средней величины, после чего в скобках — процентное отношение этого С.К.О. к измеряемой величине):

# nice -n-19 cli -b1 -m1000

SRR client: vers. 1.03

server path: /dev/srr, block size = 1 bytes, repeat = 1000

CPU speed [c.p.s.]: client = 534639500, server = 534639500

- message exchange: 2693 ~168 {6%}

- manager exchange: 6579 ~357 {5%}

# nice -n-19 cli -b10 -m1000

SRR client: vers. 1.03

server path: /dev/srr, block size = 10 bytes, repeat = 1000

CPU speed [c.p.s.]: client = 534639500, server = 534639500

- message exchange: 2726 ~258 {9%}

- manager exchange: 6725 ~378 {6%}

# nice -n-19 cli -b100 -m1000

SRR client: vers. 1.03

server path: /dev/srr, block size = 100 bytes, repeat = 1000

CPU speed [c.p.s.]: client = 534639500, server = 534639500

- message exchange: 3351 ~190 {6%}

- manager exchange: 7055 ~414 {6%}

# nice -n-19 cli -b1000 -m1000

SRR client: vers 1.03

server path: /dev/srr, block size = 1000 bytes, repeat = 1000

CPU speed [c.p.s.]: client = 534639500, server = 534639500

- message exchange: 3912 ~369  {9%}

- manager exchange: 8312 ~4024 {48%}

# nice -n-19 cli -b4000 -m1000

SRR client: vers 1.03

server path: /dev/srr, block size = 4000 bytes, repeat = 1000

CPU speed [c.p.s.] client = 534639500, server = 534639500

- message exchange: 5393  ~518 {10%}

- manager exchange: 10666 ~770 {7%}

# nice -n-19 cli -b6000 -m1000

SRR client vers 1.03

server path /dev/srr, block size = 6000 bytes, repeat = 1000

CPU speed [c.p.s.]: client = 534639500, server = 534639500

- message exchange: 7373  ~612 {8%}

- manager exchange: 12423 ~995 {8%}

# nice -n-19 cli -b1000 -m1000

SRR client: vers. 1.03

server path /dev/srr, block size = 10000 bytes, repeat = 1000

CPU speed [c.p.s.]: client = 534639500, server = 534639500

- message exchange: 14365 ~953  {7%}

- manager exchange: 16018 ~5399 {34%}

Это дает нам следующую информацию:

• Обмен с сервером, работающим на локальном хосте, происходит синхронно: клиент, переслав запрос серверу, блокируется в ожидании ответа от него. В этих условиях мы загружаем процессор на 100% совместной активностью клиента и сервера.

• Обмен в эквивалентных условиях с сервером, работающим как менеджер ресурса, требует (в сравнении с прямым обменом сообщениями) в 1,12–2,44 раз большее количество процессорных циклов на свое обслуживание, или, в относительных единицах, максимально достижимая производительность менеджера меньше на 12–144% .

• Самые неблагоприятные (144%) значения относятся к случаю обмена короткими сообщениями (1–10 байт); достаточно ощутимое (~2) значение этого соотношения сохраняется до размеров передаваемых блоков данных, равных 8–10 Кбайт.

• Накладные расходы на передачу единичного байта информации недопустимо велики (2693 циклов на байт при обмене сообщениями и 6579 циклов на байт — для менеджера) при организации обмена короткими сообщениями. С ростом объема данных, передаваемых за один цикл обмена, этот показатель очень резко падает (на блоках по 100 байт уже 33,5 и 70 соответственно, т.е. 2 порядка). Для систем с интенсивными потоками обмена необходимо стремиться максимально блокировать передаваемые данные и минимизировать число актов обмена.

Теперь выполним то же самое, но при обмене с сервером, локализованным на удаленном хосте сети (мы используем низкоскоростную сеть 10 Мбит/сек, на которой все эффекты более наглядны):

# nice -n-19 cli -nrtp -b1 -m500

SRR client: vers. 1.03

server path: /net/rtp/dev/srr, block size = 1 bytes, repeat = 500

CPU speed [c.p.s.]: client = 534639500, server = 451163200

- message exchange: 671017 ~391587 {58%}

- manager exchange: 712181 ~394844 {55%}

# nice -n-19 cli -nrtp -b10 -m500

SRR client: vers. 1.03

server path: /net/rtp/dev/srr, block size = 10 bytes, repeat = 500

CPU speed [c.p.s.]: client = 534639500, server = 451163200

- message exchange: 642456 ~380313 {59%}

- manager exchange: 743094 ~423717 {57%}

# nice -n-19 cli -nrtp -b100 -m500

SRR client: vers. 1.03

server path: /net/rtp/dev/srr, block size = 100 bytes, repeat = 500

CPU speed [c.p.s.]: client = 534639500, server = 451163200

- message exchange: 878686 ~432230 {49%}

- manager exchange: 907474 ~420140 {46%}

# nice -n-19 cli -nrtp -b1000 -m500

SRR client: vers. 1.03

server path: /net/rtp/dev/srr, block size = 1000 bytes, repeat = 500

CPU speed [c.p.s.]: client = 534639500, server = 451163200

- message exchange: 2064542 ~358333 {17%}

- manager exchange: 2113638 ~372487 {18%}

# nice -n-19 cli -nrtp -b3000 -m200

SRR client: vers. 1.03

server path: /net/rtp/dev/srr, block size = 3000 bytes, repeat = 200

CPU speed [c.p.s.]: client = 534639500, server = 451163200

- message exchange: 4134249 ~418168 {10%}

- manager exchange: 4181481 ~418139 {10%}

# nice -n-19 cli -nrtp -b5000 -m200

SRR client: vers. 1.03

server path: /net/rtp/dev/srr, block size = 5000 bytes, repeat = 200

CPU speed [c.p.s.]: client = 534639500, server = 451163200

- message exchange: 5805056 ~252663 {4%}

- manager exchange: 5825837 ~229120 {4%}

# nice -n-19 cli -nrtp -b8000 -m200

SRR client: vers. 1.03

server path /net/rtp/dev/srr, block size = 8000 bytes, repeat = 200

CPU speed [c.p.s.]: client = 534639500, server = 451163200

- message exchange: 8741090 ~446299 {5%}

- manager exchange: 8788642 ~427459 {5%}

# nice -n-19 cli -nrtp -b10000 -m200

SRR client: vers. 1.03

server path: /net/rtp/dev/srr, block size = 10000 bytes, repeat = 200

CPU speed [c.p.s.]: client = 534639500, server = 451163200

- message exchange: 8971296 ~451857 {5%}

- manager exchange: 9731224 ~301409 {3%}

В этом варианте основной компонент задержки вносится передачей данных по физическому каналу; разница между реализациями обмена сообщениями и менеджера ресурсов в значительной степени нивелирована.

Наш второй клиент (файл clr.cc), неизменно работающий с тем же сервером, весьма похож на предыдущий, но он массированно «гонит» поток данных на сервер, пользуясь только одним из механизмов (ключ -d) до прекращения его выполнения пользователем по ^C. Результат его работы — средняя плотность потока информации за весь интервал работы.

Второй клиентский процесс

#include "common.h"

static bool conti = true;

// завершение процесса по сигналу пользователя (SIGINT - ^C)

inline static void trap(int signo) { conti = false; }

int main(int argc, char **argv) {

 cout << "SRR repeater: " << VERSION << endl;

 int opt, val;

 unsigned int blk = 100;

 char PATH[_POSIX_PATH_MAX] = "";

 bool lowlvl = true;

 while ((opt = getopt(argc, argv, "n:b:d")) != -1) {

  switch(opt) {

  case 'n': // имя сетевого узла

   strcpy(PATH, "/net/");

   strcat(PATH, optarg);

   break;

  case 'b': // размер блока данных

   if (sscanf(optarg, "%i", &blk) ! = 1)

    exit("parse command line failed");

   break;

  case 'd': // обмен сообщениями

   lowlvl = false;

   break;

  default:

   exit(EXIT_FAILURE);

  }

 }

 strcat(PATH, DEVNAME);

 cout << "server path: " << PATH

  << ", block size = " << blk << " bytes" << endl;

 // при инициализации мы сразу получаем скорость процессора клиента

 result data;

 cout << "CPU speed [c.p.s.]: client = " << data.cps;

 uint64_t cps = data.cps;

 // пытаемся подключиться к серверу-менеджеру

 int fd = open(PATH, O_RDONLY);

 if (fd < 0) exit("server not found");

 // читаем его параметры

 if (read(fd, &data, sizeof(result)) == -1)

  exit("parameter block read");

 cout << ", server = " << data.cps << endl;

 // определяем дескриптор сетевого узла

 int32_t node = netmgr_strtond(PATH, NULL);

 if (node == -1 && fd > 0 && errno == ENOENT)

  node = ND_LOCAL_NODE;

 // по адресным данным, полученным ранее по read(), создаем

 // канал для прямого обмена сообщениями с тем же процессом

 int coid = ConnectAttach(node, data.pid, data.chid, _NTO_SIDE_CHANNEL, 0);

 if (coid < 0) exit("connect to message channel");

 // динамически готовим код команды devctl():

 unsigned int DCTL = (blk << 16) + DCMD_SRR;

 cout << " . . . . . waiting ^C . . . . . " << flush;

 // устанавливается реакция на пользовательский ^C

 signal(SIGINT, trap);

 uint64_t num = 0;

 uint8_t *bufin = new uint8_t[blk], *bufou = new uint8_t[blk];

 uint64_t tim = ClockCycles();

 // в зависимости от выбранного механизма передаем с его помощью данные

 if (lowlvl)

  while (true) {

   if (MsgSend(coid, bufou, blk, bufin, blk) == -1)

    exit("exchange data with channel");

   num++;

   if (!conti) break;

  }

 else {

  while (true) {

   if (devctl(fd, DCTL, bufou, blk, NULL) != EOK)

    exit("DEVCTL error");

   num++;

   if (!conti) break;

  }

 }

 tim = ClockCycles() - tim;

 cout << '\r' << (lowlvl ? "message exchange:" : "manager exchange:") <<

  " number = " << num << "; stream = "

  << (double)num * blk / ((double)tim / (double)cps) / 1E6 * 8 <<

  " Mbit/sec" << endl;

 ConnectDetach(coid);

 close(fd);

 delete [] bufin;

 delete [] bufou;

 return EXIT_SUCCESS;

}

В результате мы получаем оценки максимальной плотности потока обмена, достижимые в выбранных (при помощи ключей) условиях на данном процессоре:

# clr -b1

SRR repeater: vers. 1.03

server path: /dev/srr, block size = 1 bytes

CPU speed [c.p.s.]: client = 534639500, server = 534639500

message exchange: number = 906400; stream = 1.54088 Mbit/sec

# clr -b1 -d

SRR repeater: vers. 1.03

server path: /dev/srr, block size = 1

bytes CPU speed [c.p.s.]: client = 534639500, server = 534639500

manager exchange, number = 335725; stream = 0.617311 Mbit/sec

# clr -b10

SRR repeater: vers. 1.03

server path: /dev/srr, block size = 10 bytes

CPU speed [c.p.s.]: client = 534639500, server = 534639500

message exchange: number = 1119211; stream = 15.0758 Mbit/sec

# clr -bl0 -d

SRR repeater: vers. 1.03

server path: /dev/srr, block size = 10 bytes

CPU speed [c.p.s.]: client = 534639500, server = 534639500

manager exchange: number = 316948; stream = 6.1421 Mbit/sec

# clr -b100

SRR repeater: vers. 1.03

server path: /dev/srr, block size = 100 bytes

CPU speed [c.p.s.]: client = 534639500, server = 534639500

message exchange: number = 729460; stream = 122.617 Mbit/sec

# clr -b100 -d

SRR repeater: vers. 1.03

server path: /dev/srr, block size = 100 bytes

CPU speed [c.p.s.]: client = 534639500, server = 534639500

manager exchange: number = 318435, stream = 57.3215 Mbit/sec

# clr -b1000

SRR repeater: vers. 1.03

server path: /dev/srr, block size = 1000 bytes

CPU speed [с.p.s.]: client = 534639500, server = 534639500

message exchange: number = 823535; stream = 1054.65 Mbit/sec

# clr -b1000 -d

SRR repeater: vers. 1.03

server path: /dev/srr, block size = 1000 bytes

CPU speed [c.p.s.]: client = 534639500, server = 534639500

manager exchange: number = 367712; stream = 493.455 Mbit/sec

# clr -b10000

SRR repeater: vers. 1 03

server path: /dev/srr, block size = 10000 bytes

CPU speed [c.p.s.]: client = 534639500, server = 534639500

message exchange number = 196479, stream = 2861.27 Mbit/sec

# clr -b10000 -d

SRR repeater: vers. 1.03

server path: /dev/srr, block size = 10000 bytes

CPU speed [c.p.s.]: client = 534639500, server = 534639500

manager exchange: number = 141593, stream = 2487.18 Mbit/sec

Цифры достаточно интересные, для того чтобы рассмотреть их детальнее:

• При непрерывном потоке обмена очень короткими сообщениями (1 байт) плотность информационного потока падает до смехотворно низкой величины — 192 Кбайт/сек для обмена сообщениями и 77 Кбайт/сек для обмена с менеджером ресурса.

• При размере блока данных, передаваемого за один обмен, порядка нескольких килобайт разница скоростей информационных потоков для обмена сообщениями и менеджера ресурса практически нивелируется.

• При промежуточных размерах блока данных (от нескольких десятков до сот байт) обмен сообщениями обеспечивает плотность информационного потока до двух раз выше.

Естественно, поскольку мы рассматриваем чисто программные реализации обмена, абсолютные численные значения будут прямо пропорционально зависеть от скорости процессора (представленные результаты соответствуют процессору 533 МГц). На рис. 5.2 показана динамика загрузки процессора при работе тестовых приложений для случая локального размещения клиента и сервера. Хорошо видно, что в периоды выполнения программы clr загрузка процессора подскакивает до 100% — совместной активностью клиент и сервер забирают весь ресурс процессора.

Рис. 5.2. Динамика загрузки процессора при локальном взаимодействии клиента с сервером

Далее посмотрим выполнение той же пары приложений, но уже при разнесении их между раздельными узлами сети:

# clr -nrtp -b1

SRR repeater: vers. 1.03

server path: /net/rtp/dev/srr, block size = 1 bytes

CPU speed [c.p.s.]: client = 534639500, server = 451163200

message exchange: number = 5049, stream = 0.00670981 Mbit/sec

# clr -nrtp -b1 -d

SRR repeater: vers. 1.03

server path: /net/rtp/dev/srr, block size = 1 bytes

CPU speed [c.p.s.]: client = 534639500, server = 451163200

manager exchange: number = 4824; stream = 0.00598806 Mbit/sec

# clr -nrtp -b10

SRR repeater: vers. 1.03

server path: /net/rtp/dev/srr, block size = 10 bytes

CPU speed [c.p.s.]: client = 534639500, server = 451163200

message exchange number = 3885; stream = 0.0651842 Mbit/sec

# clr -nrtp -b10 -d

SRR repeater: vers. 1.03

server path: /net/rtp/dev/srr, block size = 10 bytes

CPU speed [c.p.s ]: client = 534639500, server = 451163200

manager exchange: number = 3102, stream = 0.0557978 Mbit/sec

# clr -nrtp -b100

SRR repeater: vers. 1.03

server path: /net/rtp/dev/srr, block size = 100 bytes

CPU speed [c.p.s.]: client = 534639500, server = 451163200

message exchange: number = 3347, stream = 0.507917 Mbit/sec

При взаимодействии клиента с сервером по сети в тех же условиях, что и на рис. 5.2, клиент уже не загружает процессор более чем на 50% (рис. 5.3). Если организовать обмен клиента с сервером в несколько потоков (2-3), то при максимальной загрузке процессора можно увеличить плотность потока еще вдвое.

Рис. 5.3. Загрузка процессора клиента при сетевом взаимодействии клиента с сервером

# clr -nrtp -b100 -d

SRR repeater: vers. 1.03

server path: /net/rtp/dev/srr, block size = 100 bytes

CPU speed [c.p.s.]: client = 534639500, server = 451163200

manager exchange: number = 2167; stream = 0.480264 Mbit/sec

# clr -nrtp -b1000

SRR repeater: vers. 1.03

server path: /net/rtp/dev/srr, block size = 1000 bytes

CPU speed [c.p.s.]: client = 534639500, server = 451163200

message exchange: number = 1400; stream = 2.0555 Mbit/sec

# clr -nrtp -b1000 -d

SRR repeater: vers. 1.03

server path: /net/rtp/dev/srr, block size = 1000 bytes

CPU speed [c.p.s.]: client = 534639500, server = 451163200

manager exchange: number = 1626; stream = 2.00553 Mbit/sec

# clr -nrtp -b10000

SRR repeater: vers. 1.03

server path: /net/rtp/dev/srr, block size = 10000 bytes

CPU speed [c.p.s.]: client = 534639500, server = 451163200

message exchange: number = 366; stream = 4.73793 Mbit/sec

# clr -nrtp -b10000 -d

SRR repeater: vers 1.03

server path: /net/rtp/dev/srr, block size = 10000 bytes

CPU speed [c.p.s.]: client = 534639500, server = 451163200

manager exchange: number = 440; stream = 4.39515 Mbit/sec

При взаимодействии по сети разница между реализациями обмена сообщениями и менеджера ресурсов не так заметна. Это и понятно: плотность потока обмена начинает ограничиваться в первую очередь задержками физической среды передачи.

Обратите внимание, что при больших блоках передаваемых данных (10 Кбайт) скорость информационного канала (4.395–4.738*2, учитывая что ретрансляция ведется в двух направлениях) сильно приближается к физической пропускной способности канала (10 Мбит/сек, как уже отмечалось выше), что попутно говорит о весьма высокой эффективности реализации обмена протоколами сети QNET.

 

Что же в итоге?

В итоге, имеющие место споры приверженцев организации обмена сообщениями и сторонников написания менеджеров ресурсов оказываются бессмысленными. В системах, обслуживающих максимально плотные потоки непрерывной входной информации (классическая постановка задачи для телефонных коммутаторов), реализация обмена сообщениями может оказаться заметно продуктивнее. С другой стороны, в системах с эпизодическим обслуживанием запросов (радиолокационные системы, системы управления технологическим оборудованием) реализация менеджера ресурса может привести к тому, что система станет намного более простой и гибкой в эксплуатации.

Два альтернативных пути не являются «взаимоисключающими», хотя это и реализации единого базового механизма. Они настолько далеко «разошлись» друг от друга, что приобрели индивидуальные, не воспроизводимые альтернативным способом черты. Более того, они могут кооперироваться в рамках даже одного процесса, как это было сделано в показанном ранее примере. Принятию того или иного решения должен предшествовать детальный анализ требований решаемой задачи.