Параллельное программирование на С++ в действии. Практика разработки многопоточных программ

Уильямс Энтони

Глава 2.

Управление потоками

 

 

Итак, вы решили написать параллельную программу, а конкретно — использовать несколько потоков. И что теперь? Как запустить потоки, как узнать, что поток завершился, и как отслеживать их выполнение? Средства, имеющиеся в стандартной библиотеке, позволяют относительно просто решить большинство задач управления потоками. Как мы увидим, почти все делается с помощью объекта std::thread, ассоциированного с потоком. Для более сложных задач библиотека позволяет построить то, что нужно, из простейших кирпичиком.

Мы начнем эту главу с рассмотрения базовых операций: запуск потока, ожидание его завершения, исполнение в фоновом режиме. Затем мы поговорим о передаче дополнительных параметров функции потока в момент запуска и о том, как передать владение потока от одного объекта std::thread другому. Наконец, мы обсудим вопрос о том, сколько запускать потоков и как идентифицировать отдельный поток.

 

2.1. Базовые операции управления потоками

 

В каждой программе на С++ имеется по меньшей мере один поток, запускаемый средой исполнения С++: тот, в котором исполняется функция main(). Затем программа может запускать дополнительные потоки с другими функциями в качестве точки входа. Эти потоки работают параллельно друг с другом и с начальным потоком. Мы знаем, что программа завершает работу, когда main() возвращает управление; точно так же, при возврате из точки входа в поток этот поток завершается. Ниже мы увидим, что, имея объект std::thread для некоторого потока, мы можем дождаться завершения этого потока, но сначала посмотрим, как потоки запускаются.

 

2.1.1. Запуск потока

В главе 1 мы видели, что для запуска потока следует сконструировать объект std::thread, который определяет, какая задача будет исполняться в потоке. В простейшем случае задача представляет собой обычную функцию без параметров, возвращающую void. Эта функция работает в своем потоке, пока не вернет управление, и в этом момент поток завершается. С другой стороны, в роли задачи может выступать объект-функция, который принимает дополнительные параметры и выполняет ряд независимых операций, информацию о которых получает во время работы от той или иной системы передачи сообщений. И останавливается такой поток, когда получит соответствующий сигнал, опять же с помощью системы передачи сообщений. Вне зависимости от того, что поток будет делать и откуда он запускается, сам запуск потока в стандартном С++ всегда сводится к конструированию объекта std::thread:

void do_some_work();

std::thread my_thread(do_some_work);

Как видите, все просто. Разумеется, как и во многих других случаях в стандартной библиотеке С++, класс std::thread работает с любым типом, допускающим вызов (Callable), поэтому конструктору std::thread можно передать экземпляр класса, в котором определен оператор вызова:

class background_task {

public:

 void operator()() const {

  do_something();

  do_something_else();

 }

};

background_task f;

std::thread my_thread(f);

В данном случае переданный объект-функция копируется в память, принадлежащую только что созданному потоку выполнения, и оттуда вызывается. Поэтому необходимо, чтобы с точки зрения поведения копия была эквивалентна оригиналу, иначе можно получить неожиданный результат.

При передаче объекта-функции конструктору потока нужно избегать феномена «самого досадного разбора в С++» (C++'s most vexing parse). Синтаксически передача конструктору временного объекта вместо именованной переменной выглядит так же, как объявление функции, и именно так компилятор и интерпретирует эту конструкцию. Например, в предложении

std::thread my_thread(background_task());

объявлена функция my_thread, принимающая единственный параметр (типа указателя на функцию без параметров, которая возвращает объект background_task) и возвращающая объект std::thread. Никакой новый поток здесь не запускается. Решить эту проблему можно тремя способами: поименовать объект-функцию, как в примере выше; добавить лишнюю пару скобок или воспользоваться новым универсальным синтаксисом инициализации, например:

std::thread my_thread( ( background_task() ) ); ← (1)

std::thread my_thread { background_task() } ;   ← (2)

В случае (1) наличие дополнительных скобок не дает компилятору интерпретировать конструкцию как объявление функции, так что действительно объявляется переменная my_thread типа std::thread. В случае (2) использован новый универсальный синтаксис инициализации с фигурными, а не круглыми скобками, он тоже приводит к объявлению переменной.

В стандарте С++11 имеется новый тип допускающего вызов объекта, в котором описанная проблема не возникает, — лямбда-выражение. Этот механизм позволяет написать локальную функцию, которая может захватывать некоторые локальные переменные, из-за чего передавать дополнительные аргументы просто не нужно (см. раздел 2.2). Подробная информация о лямбда-выражениях приведена в разделе А.5 приложения А. С помощью лямбда-выражений предыдущий пример можно записать в таком виде:

std::thread my_thread([](

 do_something();

 do_something_else();

});

После запуска потока необходимо явно решить, ждать его завершения (присоединившись к нему, см. раздел 2.1.2) или предоставить собственной судьбе (отсоединив его, см. раздел 2.1.3). Если это решение не будет принято к моменту уничтожения объекта std::thread, то программа завершится (деструктор std::thread вызовет функцию std::terminate()). Поэтому вы обязаны гарантировать, что поток корректно присоединен либо отсоединен, даже если возможны исключения. Соответствующая техника программирования описана в разделе 2.1.3. Отметим, что это решение следует принять именно до уничтожения объекта std::thread, к самому потоку оно не имеет отношения. Поток вполне может завершиться задолго до того, как программа присоединится к нему или отсоединит его. А отсоединенный поток может продолжать работу и после уничтожения объекта std::thread.

Если вы не хотите дожидаться завершения потока, то должны гарантировать, что данные, к которым поток обращается, остаются действительными до тех пор, пока они могут ему понадобиться. Эта проблема не нова даже в однопоточной программа доступ к уже уничтоженному объекту считается неопределенным поведением, но при использовании потоков есть больше шансов столкнуться с проблемами, обусловленными временем жизни.

Например, такая проблема возникает, если функция потока хранит указатели или ссылки на локальные переменные, и поток еще не завершился, когда произошел выход из области видимости, где эти переменные определены. Соответствующий пример приведен в листинге 2.1.

Листинг 2.1. Функция возвращает управление, когда поток имеет доступ к определенным в ней локальным переменным

struct func {

 int& i;

 func(int& i_) : i(i_){}

 void operator() () {

  for(unsigned j = 0; j < 1000000; ++j) {

   do_something(i); ←┐ Потенциальный доступ

  }                  (1) к висячей ссылке

 }

};

void oops() {

 int some_local_state = 0;        (2) He ждем завершения

 func my_func(some_local_state); ←┘ потока

 std::thread my_thread(my_func); ←┐ Новый поток, возможно,

 my_thread.detach();              (3) еще работает

}

В данном случае вполне возможно, что новый поток, ассоциированный с объектом my_thread, будет еще работать, когда функция oops вернет управление (2), поскольку мы явно решили не дожидаться его завершения, вызвав detach() (3). А если поток действительно работает, то при следующем вызове do_something(i) (1) произойдет обращение к уже уничтоженной переменной. Точно так же происходит в обычном однопоточном коде — сохранять указатель или ссылку на локальную переменную после выхода из функции всегда плохо, — но в многопоточном коде такую ошибку сделать проще, потому что не сразу видно, что произошло.

Один из распространенных способов разрешить такую ситуацию — сделать функцию потока замкнутой, то есть копировать в поток данные, а не разделять их. Если функция потока реализовала в виде вызываемого объекта, то сам этот объект копируется в поток, поэтому исходный объект можно сразу же уничтожить. Однако по-прежнему необходимо следить за тем, чтобы объект не содержал ссылок или указателей, как в листинге 2.1. В частности, не стоит создавать внутри функции поток, имеющий доступ к локальным переменным этой функции, если нет гарантии, что поток завершится до выхода из функции.

Есть и другой способ — явно гарантировать, что поток завершит исполнение до выхода из функции, присоединившись к нему.

 

2.1.2. Ожидание завершения потока

Чтобы дождаться завершения потока, следует вызвать функцию join() ассоциированного объекта std::thread. В листинге 2.1 мы можем заменить вызов my_thread.detach() перед закрывающей скобкой тела функции вызовом my_thread.join(), и тем самым гарантировать, что поток завершится до выхода из функции, то есть раньше, чем будут уничтожены локальные переменные. В данном случае это означает, что запускать функцию в отдельном потоке не имело смысла, так как первый поток в это время ничего не делает, по в реальной программе исходный поток мог бы либо сам делать что-то полезное, либо запустить несколько потоков параллельно, а потом дождаться их всех.

Функция join() дает очень простую и прямолинейную альтернативу — либо мы ждем завершения потока, либо нет. Если необходим более точный контроль над ожиданием потока, например если необходимо проверить, завершился ли поток, или ждать только ограниченное время, то следует прибегнуть к другим механизмам, таким, как условные переменные и будущие результаты, которые мы будем рассматривать в главе 4. Кроме тот, при вызове join() очищается вся ассоциированная с потоком память, так что объект std::thread более не связан с завершившимся потоком — он вообще не связан ни с каким потоком. Это значит, что для каждого потока вызвать функцию join() можно только один раз; после первого вызова объект std::thread уже не допускает присоединения, и функция joinable() возвращает false.

 

2.1.3. Ожидание в случае исключения

Выше уже отмечалось, что функцию join() или detach() необходимо вызвать до уничтожения объекта std::thread. Если вы хотите отсоединить поток, то обычно достаточно вызвать detach() сразу после его запуска, так что здесь проблемы не возникает. Но если вы собираетесь дождаться завершения потока, то надо тщательно выбирать место, куда поместить вызов join(). Важно, чтобы из-за исключения, произошедшего между запуском потока и вызовом join(), не оказалось, что обращение к join() вообще окажется пропущенным.

Чтобы приложение не завершилось аварийно при возникновении исключения, необходимо решить, что делать в этом случае. Вообще говоря, если вы намеревались вызвать функцию join() при нормальном выполнении программы, то следует вызывать ее и в случае исключения, чтобы избежать проблем, связанных с временем жизни. В листинге 2.2 приведен простой способ решения этой задачи.

Листинг 2.2. Ожидание завершения потока

struct func; ←┐ см. определение

              │ в листинге 2.1

void f() {

 int some_local_state = 0;

 func my_func(some_local_state)

 std::thread t(my_func);

 try {

  do_something_in_current_thread()

 }

 catch(...) {

  t.join(); ← (1)

  throw;

 }

 t.join();  ← (2)

}

В листинге 2.2 блок try/catch используется для того, чтобы поток, имеющий доступ к локальному состоянию, гарантированно завершился до выхода из функции вне зависимости оттого, происходит выход нормально (2) или вследствие исключения (1). Записывать блоки try/catch очень долго и при этом легко допустить ошибку, поэтому такой способ не идеален. Если необходимо гарантировать, что поток завершается до выхода из функции потому ли, что он хранит ссылки на локальные переменные, или по какой-то иной причине то важно обеспечить это на всех возможных путях выхода, как нормальных, так и в результате исключения, и хотелось бы иметь для этого простой и лаконичный механизм.

Один из способов решить эту задачу воспользоваться стандартной идиомой захват ресурса есть инициализация (RAII) и написать класс, который вызывает join() в деструкторе, например, такой, как в листинге 2.3. Обратите внимание, насколько проще стала функция f().

Листинг 2.3. Использование идиомы RAII для ожидания завершения потока

class thread_guard {

 std::threads t;

public:

 explicit thread_guard(std::thread& t_) : t(t_) {}

 ~thread_guard() {

  if (t.joinable()) ← (1)

  {

   t.join();        ← (2)

  }

 }

 thread_guard(thread_guard const&)=delete; ← (3)

 thread_guard& operator=(thread_guard const&)=delete;

};

struct func; ←┐ см.определение

              │ в листинге 2.1

void f() {

 int some_local_state;

 std::thread t(func(some_local_state));

 thread_guard g(t);

 do_something_in_current_thread();

}             ← (4)

Когда текущий поток доходит до конца f (4), локальные объекты уничтожаются в порядке, обратном тому, в котором были сконструированы. Следовательно, сначала уничтожается объект g типа thread_guard, и в его деструкторе (2) происходит присоединение к потоку Это справедливо даже в том случае, когда выход из функции f произошел в результате исключения внутри функции do_something_in_current_thread.

Деструктор класса thread_guard в листинге 2.3 сначала проверяет, что объект std::thread находится в состоянии joinable() (1) и, лишь если это так, вызывает join() (2). Это существенно, потому что функцию join() можно вызывать только один раз для данного потока, так что если он уже присоединился, то делать это вторично было бы ошибкой.

Копирующий конструктор и копирующий оператор присваивания помечены признаком =delete (3), чтобы компилятор не генерировал их автоматически: копирование или присваивание такого объекта таит в себе опасность, поскольку время жизни копии может оказаться дольше, чем время жизни присоединяемого потока. Но раз эти функции объявлены как «удаленные», то любая попытка скопировать объект типа thread_guard приведет к ошибке компиляции. Дополнительные сведения об удаленных функциях см. в приложении А, раздел А.2.

Если ждать завершения потока не требуется, то от проблемы безопасности относительно исключений можно вообще уйти, отсоединив поток. Тем самым связь потока с объектом std::thread разрывается, и при уничтожении объекта std::thread функция std::terminate() не будет вызвана. Но отсоединенный поток по-прежнему работает — в фоновом режиме.

 

2.1.4. Запуск потоков в фоновом режиме

Вызов функции-члeнa detach() объекта std::thread оставляет поток работать в фоновом режиме, без прямых способов коммуникации с ним. Теперь ждать завершения потока не получится — после того как поток отсоединен, уже невозможно получить ссылающийся на него объект std::thread, для которого можно было бы вызвать join(). Отсоединенные потоки действительно работают в фоне: отныне ими владеет и управляет библиотека времени выполнения С++, которая обеспечит корректное освобождение связанных с потоком ресурсов при его завершении.

Отсоединенные потоки часто называют потоками-демонами по аналогии с процессами-демонами в UNIX, то есть с процессами, работающими в фоновом режиме и не имеющими явного интерфейса с пользователем. Обычно такие потоки работают в течение длительного времени, в том числе на протяжении всего времени жизни приложения. Они, например, могут следить за состоянием файловой системы, удалять неиспользуемые записи из кэша или оптимизировать структуры данных. С другой стороны, иногда отсоединенный поток применяется, когда существует какой-то другой способ узнать о его завершении или в случае, когда нужно запустить задачу и «забыть» о ней.

В разделе 2.1.2 мы уже видели, что для отсоединения потока следует вызвать функцию-член detach() объекта std::thread. После возврата из этой функции объект std::thread уже не связан ни с каким потоком, и потому присоединиться к нему невозможно.

std::thread t(do_background_work);

t.detach();

assert(!t.joinable());

Разумеется, чтобы отсоединить поток от объекта std::thread, поток должен существовать: нельзя вызвать detach() для объекта std::thread, с которым не связан никакой поток. Это то же самое требование, которое предъявляется к функции join(), поэтому и проверяется оно точно так же — вызывать t.detach() для объекта t типа std::thread можно только тогда, когда t.joinable() возвращает true.

Возьмем в качестве примера текстовый редактор, который умеет редактировать сразу несколько документов. Реализовать его можно разными способами — как на уровне пользовательского интерфейса, так и с точки зрения внутренней организации. В настоящее время все чаще для этой цели используют несколько окон верхнего уровня, по одному для каждого редактируемого документа. Хотя эти окна выглядят совершенно независимыми, в частности, у каждого есть свое меню и все прочее, на самом деле они существуют внутри единственного экземпляра приложения. Один из подходов к внутренней организации программы заключается в том, чтобы запускать каждое окно в отдельном потоке: каждый такой поток исполняет один и тот же код, но с разными данными, описывающими редактируемый документ и соответствующее ему окно. Таким образом, чтобы открыть еще один документ, необходимо создать новый поток. Потоку, обрабатывающему запрос, нет дела до того, когда созданный им поток завершится, потому что он работает над другим, независимым документом. Вот типичная ситуация, когда имеет смысл запускать отсоединенный поток.

В листинге 2.4 приведен набросок кода, реализующего этот подход.

Листинг 2.4. Отсоединение потока для обработки другого документа

void edit_document(std::string const& filename) {

 open_document_and_display_gui(filename);

 while(!done_editing()) {

  user_command cmd = get_user_input();

  if (cmd.type == open_new_document) {

   std::string const new_name = get_filename_from_user();

   std::thread t(edit_document,new_name); ← (1)

   t.detach(); ← (2)

  }

  else {

   process_user_input(cmd);

  }

 }

}

Когда пользователь открывает новый документ, мы спрашиваем, какой документ открыть, затем запускаем поток, в котором этот документ открывается (1), и отсоединяем его (2). Поскольку новый поток делает то же самое, что текущий, только с другим файлом, то мы можем использовать ту же функцию (edit_document), передав ей в качестве аргумента имя только что выбранного файла.

Этот пример демонстрирует также, почему бывает полезно передавать аргументы функции потока: мы передаем конструктору объекта std::thread не только имя функции (1), но и её параметр — имя файла. Существуют другие способы добиться той же цели, например, использовать не обычную функцию с параметрами, а объект-функцию с данными-членами, но библиотека предлагает и такой простой механизм.

 

2.2. Передача аргументов функции потока

Из листинга 2.4 видно, что по существу передача аргументов вызываемому объекту или функции сводится просто к передаче дополнительных аргументов конструктору std::thread. Однако важно иметь в виду, что по умолчанию эти аргументы копируются в память объекта, где они доступны вновь созданному потоку, причем так происходит даже в том случае, когда функция ожидает на месте соответствующего параметра ссылку. Вот простой пример:

void f(int i, std::string const& s);

std::thread t(f, 3, "hello");

Здесь создается новый ассоциированный с объектом t поток, в котором вызывается функция f(3, "hello"). Отметим, что функция f принимает в качестве второго параметра объект типа std::string, но мы передаем строковый литерал char const*, который преобразуется к типу std::string уже в контексте нового потока. Это особенно важно, когда переданный аргумент является указателем на автоматическую переменную, как в примере ниже:

void f(int i, std::string const& s);

void oops(int some_param) {

 char buffer[1024];           ← (1)

 sprintf(buffer, "%i", some_param);

 std::thread t(f, 3, buffer); ← (2)

 t.detach();

}

В данном случае в новый поток передается (2) указатель на локальную переменную buffer (1), и есть все шансы, что выход из функции oops произойдет раньше, чем буфер будет преобразован к типу std::string в новом потоке. В таком случае мы получим неопределенное поведение. Решение заключается в том, чтобы выполнить преобразование в std::string до передачи buffer конструктору std::thread:

void f(int i,std::string const& s);

void not_oops(int some_param) {

 char buffer[1024];                         │ Использование

 sprintf(buffer, "%i", some_param);         │ std::string

 std::thread t(f, 3, std::string(buffer)); ←┘ позволяет избежать

 t.detach();                                  висячего указателя

}

В данном случае проблема была в том, что мы положились на неявное преобразование указателя на buffer к ожидаемому типу первого параметра std::string, а конструктор std::thread копирует переданные значения «как есть», без преобразования к ожидаемому типу аргумента.

Возможен и обратный сценарий: копируется весь объект, а вы хотели бы получить ссылку Такое бывает, когда поток обновляет структуру данных, переданную по ссылке, например:

void update_data_for_widget(widget_id w,widget_data& data); ← (1)

void oops_again(widget_id w) {

 widget_data data;

 std::thread t(update_data_for_widget, w, data); ← (2)

 display_status();

 t.join();

 process_widget_data(data);                      ← (3)

}

Здесь update_data_for_widget (1) ожидает, что второй параметр будет передан по ссылке, но конструктор std::thread (2) не знает об этом: он не в курсе того, каковы типы аргументов, ожидаемых функцией, и просто слепо копирует переданные значения. Поэтому функции update_data_for_widget будет передана ссылка на внутреннюю копию data, а не на сам объект data. Следовательно, по завершении потока от обновлений ничего не останется, так как внутренние копии переданных аргументов уничтожаются, и функция process_widget_data получит не обновленные данные, а исходный объект data (3). Для читателя, знакомого с механизмом std::bind, решение очевидно: нужно обернуть аргументы, которые должны быть ссылками, объектом std::ref. В данном случае, если мы напишем

std::thread t(update_data_for_widget, w, std::ref(data) );

то функции update_data_for_widget будет правильно передана ссылка на data, а не копия data.

Если вы знакомы с std::bind, то семантика передачи параметров вряд ли вызовет удивление, потому что работа конструктора std::thread и функции std::bind определяется в терминах одного и того же механизма. Это, в частности, означает, что в качестве функции можно передавать указатель на функцию-член при условии, что в первом аргументе передается указатель на правильный объект:

class X {

public:

 void do_lengthy_work();

};

X my_x;

std::thread t(&X::do_lengthy_work, &my_x); ← (1)

Здесь мы вызываем my_x.do_lengthy_work() в новом потоке, поскольку в качестве указателя на объект передан адрес my_x (1). Так вызванной функции-члену можно передавать и аргументы: третий аргумент конструктора std::thread  станет первым аргументом функции-члена и т.д.

Еще один интересный сценарий возникает, когда передаваемые аргументы нельзя копировать, а можно только перемещать: данные, хранившиеся в одном объекте, переносятся в другой, а исходный объект остается «пустым». Примером может служить класс std::unique_ptr, который обеспечивает автоматическое управление памятью для динамически выделенных объектов. В каждый момент времени на данный объект может указывать только один экземпляр std::unique_ptr, и, когда этот экземпляр уничтожается, объект, на который он указывает, удаляется. Перемещающий конструктор и перемещающий оператор присваивания позволяют передавать владение объектом от одного экземпляра std::unique_ptr другому (о семантике перемещения см. приложение А, раздел А.1.1). После такой передачи в исходном экземпляре остается указатель NULL. Подобное перемещение значений дает возможность передавать такие объекты в качестве параметров функций или возвращать из функций. Если исходный объект временный, то перемещение производится автоматически, а если это именованное значение, то передачу владения следует запрашивать явно, вызывая функцию std::move(). В примере ниже показано применение функции std::move для передачи владения динамическим объектом потоку:

void process_big_object(std::unique_ptr);

std::unique_ptr p(new big_object);

p->prepare_data(42);

std::thread t(process_big_object,std::move(p));

Поскольку мы указали при вызове конструктора std::thread функцию std::move, то владение объектом big_object передается объекту во внутренней памяти вновь созданного потока, а затем функции process_big_object.

В стандартной библиотеке Thread Library есть несколько классов с такой же семантикой владения, как у std::unique_ptr, и std::thread — один из них. Правда, экземпляры std::thread не владеют динамическими объектами, как std::unique_ptr, зато они владеют ресурсами: каждый экземпляр отвечает за управление потоком выполнения. Это владение можно передавать от одного экземпляра другому, поскольку экземпляры std::thread перемещаемые, хотя и не копируемые. Тем самым гарантируется, что в каждый момент времени с данным потоком будет связан только один объект, но в то же время программист вправе передавать владение от одного объекта другому

 

2.3. Передача владения потоком

Предположим, что требуется написать функцию для создания потока, который должен работать в фоновом режиме, но при этом мы не хотим ждать его завершения, а хотим, чтобы владение новым потоком было передано вызывающей функции. Или требуется сделать обратное — создать поток и передать владение им некоторой функции, которая будет ждать его завершения. В обоих случаях требуется передать владение из одного места в другое.

Именно здесь и оказывается полезной поддержка классом std::thread семантики перемещения. В предыдущем разделе отмечалось, что в стандартной библиотеке С++ есть много типов, владеющих ресурсами, например std::ifstream и std::unique_ptr, которые являются перемещаемыми, но не копируемыми, и один из них — std::thread. Это означает, что владение потоком можно передавать от одного экземпляра std::thread другому, как показано в примере ниже. В нем создается два потока выполнения, владение которыми передается между тремя объектами std::thread: t1, t2 и t3.

void some_function();

void some_other_function();

std::thread t1(some_function);         ← (1)

std::thread t2 = std::move(t1);        ← (2)

t1 = std::thread(some_other_function); ← (3)

std::thread t3;     ← (4)

t3 = std::move(t2); ← (5)

t1 = std::move(t3); ← (6) Это присваивание приводит

;                       к аварийному завершению программы

Сначала создастся новый поток (1) и связывается с объектом t1. Затем владение явно передается объекту t2 в момент его конструирования путем вызова std::move() (2). В этот момент с t1 уже не связан никакой поток выполнения: поток, в котором исполняется функция some_function, теперь связан с t2.

Далее создается еще один поток, который связывается с временным объектом типа std::thread (3). Для последующей передачи владения объекту t1 уже не требуется явный вызов std::move(), так как владельцем является временный объект, а передача владения от временных объектов производится автоматически и неявно.

Объект t3 конструируется по умолчанию (4), а это означает, что в момент создания с ним не связывается никакой поток. Владение потоком, который в данный момент связан с t2, передастся объекту t3 (5), опять-таки путем явного обращения к std::move(), поскольку t2 — именованный объект. После всех этих перемещений t1 оказывается связан с потоком, исполняющим функцию some_other_function, t2 не связан ни с каким потоком, a t3 связан с потоком, исполняющим функцию some_function .

Последнее перемещение (6) передает владение потоком, исполняющим some_function, обратно объекту t1, в котором исполнение этой функции началось. Однако теперь с t1 уже связан поток (который исполнял функцию some_other_function), поэтому вызывается std::terminate(), и программа завершается. Так делается ради совместимости с поведением деструктора std::thread. В разделе 2.1.1 мы видели, что нужно либо явно ждать завершения потока, либо отсоединить его до момента уничтожения; то же самое относится и к присваиванию: нельзя просто «прихлопнуть» поток, присвоив новое значение объекту std::thread, который им управляет.

Поддержка операции перемещения в классе std::thread означает, что владение можно легко передать при возврате из функции, как показано в листинге 2.5.

Листинг 2.5. Возврат объекта std::thread из функции

std::thread f() {

 void some_function();

 return std::thread(some_function);

}

std::thread g() {

 void some_other_function(int);

 std::thread t(some_other_function, 42);

 return t;

}

Аналогично, если требуется передать владение внутрь функции, то достаточно, чтобы она принимала экземпляр std::thread по значению в качестве одного из параметров, например:

void f(std::thread t);

void g() {

 void some_function();

 f(std::thread(some_function));

 std::thread t(some_function);

 f(std::move(t));

}

Одно из преимуществ, которые даёт поддержка перемещения в классе std::thread, заключается в том, что мы можем модифицировать класс thread_guard из листинга 2.3, так чтобы он принимал владение потоком. Это позволит избежать неприятностей в случае, когда время жизни объекта thread_guard оказывает больше, чем время жизни потока, на который он ссылается, а, кроме того, это означает, что никто другой не сможет присоединиться к потоку или отсоединить его, так как владение было передано объекту thread_guard. Поскольку основное назначение этого класса гарантировать завершение потока до выхода из области видимости, я назвал его scoped_thread. Реализация и простой пример использования приведены в листинге 2.6.

Листинг 2.6. Класс scoped_thread и пример его использования

class scoped_thread {

 std::thread t;

public:

 explicit scoped_thread(std::thread t_) : ← (1)

 t(std::move(t_)) {

 if (!t.joinable()) ← (2)

  throw std::logic_error("No thread");

 }

 ~scoped_thread() {

  t.join();         ← (3)

 }

 scoped_thread(scoped_thread const&)=delete;

 scoped_thread& operator=(scoped_thread const&)=delete;

};

struct func; ← см. листинг 2.1

void f() {

 int some_local_state;

 scoped_thread t(std::thread(func(some_local_state))); ← (4)

 do_something_in_current_thread();

}                   ← (5)

Этот пример очень похож на приведенный в листинге 2.3, только новый поток теперь передается непосредственно конструктору scoped_thread (4), вместо того чтобы создавать для него отдельную именованную переменную. Когда новый поток достигает конца f (5), объект scoped_thread уничтожается, а затем поток соединяется (3) с потоком, переданным конструктору (1). Если в классе thread_guard из листинга 2.3 деструктор должен был проверить, верно ли, что поток все еще допускает соединение, то теперь мы можем сделать это в конструкторе (2) и возбудить исключение, если это не так.

Поддержка перемещения в классе std::thread позволяет также хранить объекты этого класса в контейнере при условии, что класс контейнера поддерживает перемещение (как, например, модифицированный класс std::vector<>). Это означает, что можно написать код, показанный в листинге 2.7, который запускает несколько потоков, а потом ждет их завершения.

Листинг 2.7. Запуск нескольких потоков и ожидание их завершения

void do_work(unsigned id);

void f() {

 std::vector threads;

 for (unsigned i = 0; i < 20; ++i) {           │ Запуск

  threads.push_back(std::thread(do_work(i))); ←┘ потоков

 }                                            │ Поочередный

 std::for_each(threads.begin(), threads.end(),│ вызов join()

 std::mem_fn(&std::thread::join));           ←┘ для каждого потока

}

Если потоки применяются для разбиения алгоритма на части, то зачастую такой подход именно то, что требуется: перед возвратом управления вызывающей программе все потоки должны завершиться. Разумеется, столь простая структура, как в листинге 2.7, предполагает, что каждый поток выполняет независимую работу, а единственным результатом является побочный эффект, заключающийся в изменении разделяемых данных. Если бы функция f() должна была вернуть вызывающей программе значение, зависящее от результатов операций, выполненных в потоках, то при такой организации получить это значение можно было бы только путем анализа разделяемых данных по завершении всех потоков. В главе 4 обсуждаются альтернативные схемы передачи результатов работы из одного потока в другой.

Хранение объектов std::thread в векторе std::vector — шаг к автоматизации управления потоками: вместо тот чтобы создавать отдельные переменные для потоков и выполнять соединение напрямую, мы можем рассматривать группу потоков. Можно пойти еще дальше и создавать не фиксированное число потоков, как в листинге 2.7, а определять нужное количество динамически, во время выполнения.

 

2.4. Задание количества потоков во время выполнения

В стандартной библиотеке С++ есть функция std::thread::hardware_concurrency(), которая поможет нам решить эту задачу. Она возвращает число потоков, которые могут работать по-настоящему параллельно. В многоядерной системе это может быть, например, количество процессорных ядер. Возвращаемое значение всего лишь оценка; более того, функция может возвращать 0, если получить требуемую информацию невозможно. Однако эту оценку можно с пользой применить для разбиения задачи на несколько потоков.

В листинге 2.8 приведена простая реализация параллельной версии std::accumulate. Она распределяет работу между несколькими потоками и, чтобы не создавать слишком много потоков, задает ограничение снизу на количество элементов, обрабатываемых одним потоком. Отмстим, что в этой реализации предполагается, что ни одна операция не возбуждает исключений, хотя в принципе исключения возможны; например, конструктор std::thread возбуждает исключение, если не может создать новый поток. Но если добавить в этот алгоритм обработку исключений, он перестанет быть таким простым; эту тему мы рассмотрим в главе 8.

Листинг 2.8. Наивная реализация параллельной версии алгоритма std::accumulate

template

 struct accumulate_block {

 void operator()(Iterator first, Iterator last, T& result) {

  result = std::accumulate(first, last, result);

 }

};

template

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);

 if (!length) ← (1)

  return init;

 unsigned long const min_per_thread = 25;

 unsigned long const max_threads =

  (length+min_per_thread - 1) / min_per_thread; ← (2)

 unsigned long const hardware_threads =

  std::thread::hardware_concurrency();

 unsigned long const num_threads = ← (3)

  std::min(

   hardware.threads != 0 ? hardware_threads : 2, max_threads);

 unsigned long const block_size = length / num_threads; ← (4)

 std::vector results(num_threads);

 std::vector threads(num_threads - 1); ← (5)

 Iterator block_start = first;

 for(unsigned long i = 0; i < (num_threads - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size); ← (6)

  threads[i] = std::thread( ← (7)

   accumulate_block(),

   block_start, block_end, std::ref(results(i)));

  block_start = block_end;  ← (8)

 }

 accumulate_block()(

  block_start, last, results[num_threads-1]); ← (9)

 std::for_each(threads.begin(), threads.end(),

 std::mem_fn(&std::thread::join)); ← (10)

 return

  std::accumulate(results.begin(), results.end(), init); ← (11)

}

Хотя функция довольно длинная, по существу она очень проста. Если входной диапазон пуст (1), то мы сразу возвращаем начальное значение init. В противном случае диапазон содержит хотя бы один элемент, поэтому мы можем разделить количество элементов на минимальный размер блока и получить максимальное число потоков (2).

Это позволит избежать создания 32 потоков на 32-ядерной машине, если диапазон состоит всего из пяти элементов.

Число запускаемых потоков равно минимуму из только что вычисленного максимума и количества аппаратных потоков (3): мы не хотим запускать больше потоков, чем может поддержать оборудование (это называется превышением лимита), так как из-за контекстных переключений при большем количестве потоков производительность снизится. Если функция std::thread::hardware_concurrency() вернула 0, то мы берем произвольно выбранное число, я решил остановиться на 2. Мы не хотим запускать слишком много потоков, потому что на одноядерной машине это только замедлило бы программу. Но и слишком мало потоков тоже плохо, так как это означало бы отказ от возможного параллелизма.

Каждый поток будет обрабатывать количество элементов, равное длине диапазона, поделенной на число потоков (4). Пусть вас не пугает случай, когда одно число нацело не делится на другое, — ниже мы рассмотрим его.

Теперь, зная, сколько необходимо потоков, мы можем создать вектор std::vector для хранения промежуточных результатов и вектор std::vector для хранения потоков (5). Отметим, что запускать нужно на один поток меньше, чем num_threads, потому что один поток у нас уже есть.

Запуск потоков производится в обычном цикле: мы сдвигаем итератор block_end в конец текущего блока (6) и запускаем новый поток для аккумулирования результатов по этому блоку (7). Начало нового блока совпадает с концом текущего (8).

После того как все потоки запущены, главный поток может обработать последний блок (9). Именно здесь обрабатывается случай деления с остатком: мы знаем, что конец последнего блока — last, а сколько в нем элементов, не имеет значения.

Аккумулировав результаты но последнему блоку, мы можем дождаться завершения всех запущенных потоков с помощью алгоритма std::for_each (10), а затем сложить частичные результаты, обратившись к std::accumulate (11).

Прежде чем расстаться с этим примером, полезно отметить, что в случае, когда оператор сложения, определенный в типе T, не ассоциативен (например, если T — это float или double), результаты, возвращаемые алгоритмами parallel_accumulate и std::accumulate, могут различаться из-за разбиения диапазона на блоки. Кроме того, к итераторам предъявляются более жесткие требования: они должны быть по меньшей мере однонаправленными, тогда как алгоритм std::accumulate может работать и с однопроходными итераторами ввода. Наконец, тип T должен допускать конструирование по умолчанию (удовлетворять требованиям концепции DefaultConstructible), чтобы можно было создать вектор results. Такого рода изменения требований довольно типичны для параллельных алгоритмов: но самой своей природе они отличаются от последовательных алгоритмов, и это приводит к определенным последствиям в части как результатов, так и требований. Более подробно параллельные алгоритмы рассматриваются в главе 8. Стоит также отметить, что из-за невозможности вернуть значение непосредственно из потока, мы должны передавать ссылку на соответствующий элемент вектора results. Другой способ возврата значений из потоков, с помощью будущих результатов, рассматривается в главе 4.

В данном случае вся необходимая потоку информация передавалась в момент его запуска — в том числе и адрес, но которому необходимо сохранить результат вычисления. Так бывает не всегда; иногда требуется каким-то образом идентифицировать потоки во время работы. Конечно, можно было бы передать какой-то идентификатор, например значение i в листинге 2.7, но если вызов функции, которой этот идентификатор нужен, находится несколькими уровнями стека глубже, и эта функция может вызываться из любого потока, то поступать так неудобно. Проектируя библиотеку С++ Thread Library, мы предвидели этот случай, поэтому снабдили каждый поток уникальным идентификатором.

 

2.5. Идентификация потоков

Идентификатор потока имеет тип std::thread::id, и получить его можно двумя способами. Во-первых, идентификатор потока, связанного с объектом std::thread, возвращает функция-член get_id() этого объекта. Если с объектом std::thread не связан никакой поток, то get_id() возвращает сконструированный по умолчанию объект типа std::thread::id, что следует интерпретировать как «не поток». Идентификатор текущего потока можно получить также, обратившись к функции std::this_thread::get_id(), которая также определена в заголовке .

Объекты типа std::thread::id можно без ограничений копировать и сравнивать, в противном случае они вряд ли могли бы играть роль идентификаторов. Если два объекта типа std::thread::id равны, то либо они представляют один и тот же поток, либо оба содержат значение «не поток». Если же два таких объекта не равны, то либо они представляют разные потоки, либо один представляет поток, а другой содержит значение «не поток».

Библиотека Thread Library не ограничивается сравнением идентификаторов потоков на равенство, для объектов типа std::thread::id определен полный спектр операторов сравнения, то есть на множестве идентификаторов потоков задан полный порядок. Это позволяет использовать их в качестве ключей ассоциативных контейнеров, сортировать и сравнивать любым интересующим программиста способом. Поскольку операторы сравнения определяют полную упорядоченность различных значений типа std::thread::id, то их поведение интуитивно очевидно: если a, поэтому значения типа std::thread::id можно использовать и в качестве ключей новых неупорядоченных ассоциативных контейнеров.

Объекты std::thread::id часто применяются для того, чтобы проверить, должен ли поток выполнить некоторую операцию. Например, если потоки используются для разбиения задач, как в листинге 2.8, то начальный поток, который запускал все остальные, может вести себя несколько иначе, чем прочие. В таком случае этот поток мог бы сохранить значение std::this_thread::get_id() перед тем, как запускать другие потоки, а затем в основной части алгоритма (общей для всех потоков) сравнить собственный идентификатор с сохраненным значением.

std::thread::id master_thread;

void some_core_part_of_algorithm() {

 if (std::this_thread::get_id() == master_thread) {

  do_master_thread_work();

 }

 do_common_work();

}

Или можно было бы сохранить std::thread::id текущего потока в некоторой структуре данных в ходе выполнения какой-то операции. В дальнейшем при операциях с той же структурой данных можно было сравнить сохраненный идентификатор с идентификатором потока, выполняющего операцию, и решить, какие операции разрешены или необходимы.

Идентификаторы потоков можно было бы также использовать как ключи ассоциативных контейнеров, если с потоком нужно ассоциировать какие-то данные, а другие механизмы, например поточно-локальная память, не подходят. Например, управляющий поток мог бы сохранить в таком контейнере информацию о каждом управляемом им потоке. Другое применение подобного контейнера — передавать информацию между потоками.

Идея заключается в том, что в большинстве случаев std::thread::id вполне может служить обобщенным идентификатором потока и лишь, если с идентификатором необходимо связать какую-то семантику (например, использовать его как индекс массива), может потребоваться другое решение. Можно даже выводить объект std::thread::id в выходной поток, например std::cout:

std::cout << std::this_thread::get_id();

Точный формат вывода зависит от реализации; стандарт лишь гарантирует, что результаты вывода одинаковых идентификаторов потоков будут одинаковы, а разных — различаться. Поэтому для отладки и протоколирования это может быть полезно, но так как никакой семантики у значений идентификаторов нет, то сделать на их основе какие-то другие выводы невозможно.

 

2.6. Резюме

В этой главе мы рассмотрели основные средства управления потоками, имеющиеся в стандартной библиотеке С++: запуск потоков, ожидание завершения потока и отказ от ожидания вследствие того, что поток работает в фоновом режиме. Мы также научились передавать аргументы функции потока при запуске и передавать ответственность за управление потоком из одной части программы в другую. Кроме того, мы видели, как можно использовать группы потоков для разбиения задачи на части. Наконец, мы обсудили механизм идентификации потоков, позволяющий ассоциировать с потоком данные или поведение в тех случаях, когда использовать другие средства неудобно. Даже совершенно независимые потоки позволяют сделать много полезного, как видно из листинга 2.8, но часто требуется, чтобы работающие потоки обращались к каким-то общим данным. В главе 3 рассматриваются проблемы, возникающие при разделении данных между потоками, а в главе 4 — более общие вопросы синхронизации операций с использованием и без использования разделяемых данных.