Параллельное программирование на С++ в действии. Практика разработки многопоточных программ

Уильямс Энтони

Глава 9.

Продвинутое управление потоками

 

 

В предыдущих главах мы управляли потоками явно — путем создания объектов std::thread для каждого потока. В нескольких местах мы видели, что это не всегда желательно, так как приходится самостоятельно управлять временем жизни этих объектов, определять, сколько потоков создать для решения данной задачи с учетом имеющегося оборудования и т.д. В идеале хотелось бы просто разбить код на максимально мелкие блоки, которые можно выполнить параллельно, а потом передать их компилятору и библиотеке, сказав: «Распараллель и обеспечь оптимальную производительность».

В ряде примеров нам встречалась еще одна повторяющаяся тема — мы можем использовать несколько потоков для решения задачи, но хотим, чтобы они завершались досрочно, если выполнено некоторое условие, например: результат уже получен, или произошла ошибка, или пользователь потребовал отменить операцию. В любом случае потокам нужно отправить запрос «Прекратить работу», который означает, что они должны прервать выполняемое задание, прибраться за собой и как можно скорее завершиться.

В этой главе мы рассмотрим различные механизмы управления потоками и задачами и начнем с автоматического выбора числа потоков и распределения задач между ними.

 

9.1. Пулы потоков

 

Во многих компаниях сотрудники, которые обычно работают в офисе, время от времени должны выезжать к клиентам или поставщикам, посещать выставки и конференции. Хотя такие поездки могут считаться обязательными, и в любой день в командировке может находиться сразу несколько человек, но для любого конкретного работника промежуток между поездками может составлять месяцы, а то и годы. В таких условиях резервировать машину для каждого работника было бы дорого и непрактично, поэтому компании содержат парк, или пул машин, то есть ограниченное количество машин, предоставляемых в распоряжении всем работникам. Когда работнику нужно съездить в командировку, он заказывает машину на определенное время, а по завершении поездки возвращает ее в общий пул. Если в какой-то день свободных машин в пуле не оказалось, то командировку придется перенести на другой день.

В основе пула потоков лежит аналогичная идея, только в общее распоряжение предоставляются не машины, а потоки. Во многих системах бессмысленно заводить отдельный поток для каждой задачи, которая потенциально может выполняться одновременно с другими, но тем не менее хотелось бы воспользоваться преимуществами параллелизма там, где это возможно. Именно это и позволяет сделать пул потоков: задачи, которые в принципе могут выполняться параллельно, отправляются в пул, а тот ставит их в очередь ожидающих работ. Затем один из рабочих потоков забирает задачу из очереди, исполняет ее и идет за следующей задачей.

При разработке пула потоков нужно принять несколько важных проектных решений, например: сколько потоков будет в пуле, как эффективнее всего назначать потоки задачам, можно ли будет ждать завершения задачи. В этом разделе мы рассмотрим несколько реализаций пула потоков, начав с самого простого.

 

9.1.1. Простейший пул потоков

В простейшем случае пул состоит из фиксированного числа рабочих потоков (обычно равного значению, которое возвращает функция std::thread::hardware_concurrency()). Когда у программы появляется какая-то работа, она вызывает функцию, которая помещает эту работу в очередь. Рабочий поток забирает работу из очереди, выполняет указанную в ней задачу, после чего проверяет, есть ли в очереди другие работы. В этой реализации никакого механизма ожидания завершения задачи не предусмотрело. Если это необходимо, то вы должны будете управлять синхронизацией самостоятельно.

В следующем листинге приведена реализация такого пула потоков.

Листинг 9.1. Простой пул потоков

class thread_pool {

 std::atomic_bool done;

 thread_safe_queue > work_queue;← (1)

 std::vector threads; ← (2)

 join_threads joiner; ← (3)

 void worker_thread() {

  while (!done) { ← (4)

   std::function task;

   if (work_queue.try_pop(task)) { ← (5)

    task(); ← (6)

   } else {

    std::this_thread::yield(); ← (7)

   }

  }

 }

public:

 thread_pool():

  done(false), joiner(threads) {

  unsigned const thread_count =

   std::thread::hardware_concurrency();← (8)

  try {

   for (unsigned i = 0; i < thread_count; ++i) {

    threads.push_back(

     std::thread(&thread_pool::worker_thread, this)); ← (9)

   }

  } catch (...) {

   done = true; ← (10)

   throw;

  }

 }

 ~thread_pool() {

  done = true; ← (11)

 }

 template

 void submit(FunctionType f) {

  work_queue.push(std::function(f)); ← (12)

 }

};

Здесь мы определили вектор рабочих потоков (2) и используем одну из потокобезопасных очередей из главы 6 (1) для хранения очереди работ. В данном случае пользователь не может ждать завершения задачи, а задача не может возвращать значения, поэтому для инкапсуляции задач можно использовать тип std::function. Функция submit() обертывает переданную функцию или допускающий вызов объект в объект std::function и помещает его в очередь (12).

Потоки запускаются в конструкторе; их количество равно значению, возвращаемому функцией std::thread::hardware_concurrency(), то есть мы создаем столько потоков, сколько может поддержать оборудование (8). Все эти потоки исполняют функцию-член нашего класса worker_thread() (9).

Запуск потока может завершиться исключением, поэтому необходимо позаботиться о том, чтобы уже запущенные к этому моменту потоки корректно завершались. Для этого мы включили блок try-catch, который в случае исключения поднимает флаг done (10). Кроме того, мы воспользовались классом join_threads из главы 8 (3), чтобы обеспечить присоединение всех потоков. То же самое происходит в деструкторе: мы просто поднимаем флаг done (11), а объект join_threads гарантирует, что потоки завершатся до уничтожения пула. Отметим, что порядок объявления членов важен: и флаг done и объект worker_queue должны быть объявлены раньше вектора threads, который, в свою очередь, должен быть объявлен раньше joiner. Только тогда деструкторы членов класса будут вызываться в правильном порядке; в частности, нельзя уничтожать очередь раньше, чем остановлены все потоки.

Сама функция worker_thread проста до чрезвычайности: в цикле, который продолжается, пока не поднят флаг done (4), она извлекает задачи из очереди (5) и выполняет их (6). Если в очереди нет задач, функция вызывает std::this_thread::yield() (7), чтобы немного отдохнуть и дать возможность поработать другим потокам.

Часто даже такого простого пула потоков достаточно, особенно если задачи независимы, не возвращают значений и не выполняют блокирующих операций. Но бывает и по-другому: во-первых, у программы могут быть более жесткие требования, а, во-вторых, в таком пуле возможны проблемы, в частности, из-за взаимоблокировок. Кроме того, в простых случаях иногда лучше прибегнуть к функции std::async, как неоднократно демонстрировалось в главе 8. В этой главе мы рассмотрим и более изощренные реализации пула потоков с дополнительными возможностями, которые призваны либо удовлетворить особые потребности пользователя, либо уменьшить количество потенциальных ошибок. Для начала разрешим ожидать завершения переданной пулу задачи.

 

9.1.2. Ожидание задачи, переданной пулу потоков

В примерах из главы 8, где потоки запускались явно, главный поток после распределения работы между потоками всегда ждал завершения запущенных потоков. Тем самым гарантировалось, что вызывающая программа получит управление только после полного завершения задачи. При использовании пула потоков ждать нужно завершения задачи, переданной пулу, а не самих рабочих потоков. Это похоже на то, как мы ждали будущих результатов при работе с std::async в главе 8. В случае простого пула потоков, показанного в листинге 9.1, организовывать ожидание придется вручную, применяя механизмы, описанные в главе 4: условные переменные и будущие результаты. Это усложняет код; намного удобнее было бы ждать задачу напрямую.

За счет переноса сложности в сам пул потоков мы сумеем добиться желаемого. Функция submit() могла бы возвращать некий описатель задачи, по которому затем можно было бы ждать ее завершения. Сам описатель должен был бы инкапсулировать условную переменную или будущий результат. Это упростило бы код, пользующийся пулом потоков.

Частный случай ожидания завершения запущенной задачи возникает, когда главный поток нуждается в вычисленном ей результате. Мы уже встречались с такой ситуацией выше, например, в функции parallel_accumulate() из главы 2. В таком случае путем использования будущих результатов мы можем объединить ожидание с передачей результата. В листинге 9.2 приведен код модифицированного пула потоков, который разрешает ожидать завершения задачи и передает возвращенный ей результат ожидающему потоку. Поскольку экземпляры класса std::packaged_task<> допускают только перемещение, но не копирование, мы больше не можем воспользоваться классом std::function<> для обертывания элементов очереди, потому что std::function<> требует, чтобы в обернутых объектах-функциях был определён копирующий конструктор. Вместо этого мы напишем специальный класс-обертку, умеющий работать с объектами, обладающими только перемещающим конструктором. Это простой маскирующий тип класс (type-erasure class), в котором определён оператор вызова. Нам нужно поддержать функции, которые не принимают параметров и возвращают void, поэтому оператор всего лишь вызывает виртуальный метод call(), который в свою очередь вызывает обернутую функцию.

Листинг 9.2. Пул потоков, ожидающий завершения задачи

class function_wrapper {

 struct impl_base {

  virtual void call() = 0;

  virtual ~impl_base() {}

 };

 std::unique_ptr impl;

 template

 struct impl_type: impl_base {

  F f;

  impl_type(F&& f_): f(std::move(f_)) {}

  void call() { f(); }

 };

public:

 template function_wrapper(F&& f):

  impl(new impl_type(std::move(f))) {}

 void operator()() { impl->call(); }

 function_wrapper() = default;

 function_wrapper(function_wrapper&& other):

  impl(std::move(other.impl)) {}

 function_wrapper& operator=(function_wrapper&& other) {

  impl = std::move(other.impl);

  return *this;

 }

 function_wrapper(const function_wrapper&) = delete;

 function_wrapper(function_wrapper&) = delete;

 function_wrapper& operator=(const function_wrapper&) = delete;

};

class thread_pool {

 thread_safe_queue work_queue;←┐

                                                 │ Используем

 void worker_thread()                            │ function_

 {                                               │ wrapper

  while (!done)                                  │ вместо std::

  {                                              │ function

   function_wrapper task;                       ←┘

   if (work_queue.try_pop(task))

    task();

   else

    std::this_thread::yield();

  }

 }

public:

 template

 std::future::type>← (1)

 submit(FunctionType f) {

  typedef typename std::result_of::type

   result_type;                                        ← (2)

  std::packaged_task task(std::move(f));← (3)

  std::future res(task.get_future());     ← (4)

  work_queue.push(std::move(task));                    ← (5)

  return res;                                          ← (6)

 }

 // остальное, как и раньше

};

Прежде всего отметим, что модифицированная функция submit() (1) возвращает объект std::future<>, который будет содержать возвращенное задачей значение и позволит вызывающей программе ждать ее завершения. Для этого нам необходимо знать тип значения, возвращаемого переданной функцией f, и здесь на помощь приходит шаблон std::result_of<>: std::result_of::type — это тип результата, возвращенного вызовом объекта типа FunctionType (например, f) без аргументов. Выражение std::result_of<> мы используем также в определении псевдонима типа result_type (2) внутри функции.

Затем f обертывается объектом std::packaged_task (3), потому что f — функция или допускающий вызов объект, который не принимает параметров и возвращает результат типа result_type. Теперь мы можем получить будущий результат из std::packaged_task<> (4), перед тем как помещать задачу в очередь (5) и возвращать будущий результат (6). Отметим, что при помещении задачи в очередь мы должны использовать функцию std::move(), потому что класс std::packaged_task<> не допускает копирования. Именно поэтому в очереди хранятся объекты function_wrapper, а не объекты типа.

Этот пул позволяет ожидать завершения задач и получать возвращаемые ими результаты. В листинге ниже показано, как выглядит функция parallel_accumulate, работающая с таким пулом потоков.

Листинг 9.3. Функция parallel_accumulate, реализованная с помощью пула потоков, допускающего ожидание задач

template

T parallel_accumulate(Iterator first, Iterator last, T init) {

 unsigned long const length = std::distance(first, last);

 if (!length)

  return init;

 unsigned long const block_size = 25;

 unsigned long const num_blocks =

  (length + block_size - 1) / block_size; ← (1)

 std::vector > futures(num_blocks-1);

 thread_pool pool;

 Iterator block_start = first;

 for (unsigned long i = 0; i < (num_blocks - 1); ++i) {

  Iterator block_end = block_start;

  std::advance(block_end, block_size);

  futures[i] = pool.submit(accumulate_block());← (2)

  block_start = block_end;

 }

 T last_result =

  accumulate_block()(block_start, last);

 T result = init;

 for (unsigned long i = 0; i < (num_blocks - 1); ++i) {

  result += futures[i].get();

 }

 result += last_result;

 return result;

}

Сравнивая этот код с листингом 8.4, следует обратить внимание на две вещи. Во-первых, мы работаем с количеством блоков (num_blocks (1)), а не потоков. Чтобы в полной мере воспользоваться масштабируемостью пула потоков, мы должны разбить работу на максимально мелкие блоки, с которыми имеет смысл работать параллельно. Если потоков в пуле немного, то каждый поток будет обрабатывать много блоков, но по мере роста числа потоков, поддерживаемых оборудованием, будет расти и количество блоков, обрабатываемых параллельно.

Но, выбирая «максимально мелкие блоки, с которыми имеет смысл работать параллельно», будьте осторожны. Отправка задачи пулу потоков, выбор ее рабочим потоком из очереди и передача возвращенного значения с помощью std::future<> — всё это операции не бесплатные, и для совсем мелких задач они не окупятся. Если размер задачи слишком мал, то программа, в которой используется пул потоков, может работать медленнее, чем однопоточная.

В предположении, что размер блока выбран разумно, вам не надо заботиться об упаковке задач, получении будущих результатов и хранении объектов std::thread, чтобы впоследствии их можно было присоединить; все это пул берет на себя. Вам остается лишь вызвать функцию submit(), передав ей свою задачу (2).

Пул потоков обеспечивает также безопасность относительно исключений. Любое возбужденное задачей исключение передается через будущий результат, возвращенный submit(), а, если выход из функции происходит в результате исключения, то деструктор пула потоков снимет еще работающие задачи и дождется завершения потоков, входящих в пул.

Эта схема работает для простых случаев, когда задачи независимы. Но не годится, когда одни задачи зависят от других, также переданных пулу.

 

9.1.3. Задачи, ожидающие других задач

В этой книге я уже неоднократно приводил пример алгоритма Quicksort. Его идея проста — подлежащие сортировке данные разбиваются на две части: до и после опорного элемента (в смысле заданной операции сравнения). Затем обе части рекурсивно сортируются и объединяются для получения полностью отсортированной последовательности. При распараллеливании алгоритма надо позаботиться о том, чтобы рекурсивные вызовы задействовали имеющийся аппаратный параллелизм.

В главе 4, где этот пример впервые был представлен, мы использовали std::async для выполнения одного из рекурсивных вызовов на каждом шаге и оставляли библиотеке решение о том, запускать ли новый поток или сортировать синхронно при обращении к get(). Этот подход неплохо работает — каждая задача либо выполняется в отдельном потоке, либо в тот момент, когда нужны ее результаты.

В главе 8 мы переработали эту реализацию, продемонстрировав альтернативный подход, когда количество потоков фиксировано и определяется уровнем аппаратного параллелизма. В данном случае мы воспользовались стеком ожидающих сортировки блоков. Разбивая на части предложенные для сортировки данные, каждый поток помещал один блок в стек, а второй сортировал непосредственно. Бесхитростное ожидание завершения сортировки второго блока могло бы закончиться взаимоблокировкой, потому что число потоков ограничено, и некоторые из них ждут. Очень легко оказаться в ситуации, когда все потоки ждут завершения сортировки блоков, и ни один ничего не делает. Тогда мы решили эту проблему, заставив поток извлекать блоки из стека и сортировать их, пока тот конкретный блок, которого он ждет, еще не отсортирован.

Точно такая же проблема возникает при использовании одного из рассмотренных выше простых пулов потоков вместо std::async, как в примере из главы 4. Число потоков тоже ограничено, и может случиться так, что все они будут ждать задач, которые еще запланированы, так как нет свободных потоков. И решение должно быть таким же,

как в главе 8: обрабатывать стоящие в очереди блоки во время ожидания завершения сортировки своего блока. Но если мы применяем пул потоков для управления списком задач и их ассоциациями с потоками — а именно в этом и состоит смысл пула потоков, — то доступа к списку задач у нас нет. Поэтому необходимо модифицировать сам пул, чтобы он делал это автоматически.

Проще всего будет добавить в класс thread_pool новую функцию, чтобы исполнять задачу из очереди и управлять циклом самостоятельно. Так мы и поступим. Более развитые реализации пула могли бы включить дополнительную логику в функцию ожидания или добавить другие функции ожидания для обработки этого случая, быть может, даже назначая приоритеты ожидаемым задачам. В листинге ниже приведена новая функция run_pending_task(), а модифицированный алгоритм Quicksort, в котором она используется, показан в листинге 9.5.

Листинг 9.4. Реализация функции run_pending_task()

void thread_pool::run_pending_task() {

 function_wrapper task;

 if (work_queue.try_pop(task)) {

  task();

 } else {

  std::this_thread::yield();

 }

}

Код run_pending_task() вынесен из главного цикла внутри функции worker_thread(), которую теперь можно будет изменить, так чтобы она вызывала run_pending_task(). Функция run_pending_task() пытается получить задачу из очереди и в случае успеха выполняет ее; если очередь пуста, то функция уступает управление ОС, чтобы та могла запланировать другой поток. Показанная ниже реализация Quicksort гораздо проще, чем версия в листинге 8.1, потому что вся логика управления потоками перенесена в пул.

Листинг 9.5. Реализация Quicksort на основе пула потоков

template

struct sorter {    ← (1)

 thread_pool pool; ← (2)

 std::list do_sort(std::list& chunk_data) {

  if (chunk_data.empty()) {

   return chunk_data;

  }

  std::list result;

  result.splice(result.begin(), chunk_data, chunk_data.begin());

  T const& partition_val = *result.begin();

  typename std::list::iterator divide_point =

   std::partition(chunk_data.begin(), chunk_data.end(),

    [&](T const& val){ return val < partition_val; });

  std::list new_lower_chunk;

  new_lower_chunk.splice(new_lower_chunk.end(),

   chunk_data, chunk_data.begin(),

   divide_point);

  std::future > new_lower = ← (3)

  pool.submit(std::bind(&sorter::do_sort, this,

   std::move(new_lower_chunk)));

  std::list new_higher(do_sort(chunk_data));

  result.splice(result.end(), new_higher);

  while (!new_lower.wait_for(std::chrono::seconds(0)) ==

   std::future_status::timeout) {

   pool.run_pending_task(); ← (4)

  }

  result.splice(result.begin(), new_lower.get());

  return result;

 }

};

template

std::list parallel_quick_sort(std::list input) {

 if (input.empty()) {

  return input;

 }

 sorter s;

 return s.do_sort(input);

}

Как и в листинге 8.1, реальная работа делегируется функции-члену do_sort() шаблона класса sorter (1), хотя в данном случае этот шаблон нужен лишь для обертывания экземпляра thread_pool (2).

Управление потоками и задачами теперь свелось к отправке задачи пулу (3) и исполнению находящихся в очереди задач в цикле ожидания (4). Это гораздо проще, чем в листинге 8.1, где нужно было явно управлять потоками и стеком подлежащих сортировке блоков. При отправке задачи пулу мы используем функцию std::bind(), чтобы связать указатель this с do_sort() и передать подлежащий сортировке блок. В данном случае мы вызываем std::move(), чтобы данные new_lower_chunk перемещались, а не копировались.

Мы решили проблему взаимоблокировки, возникающую из- за того, что одни потоки ждут других, но этот пул все еще далек от идеала. Отметим хотя бы, что все вызовы submit() и run_pending_task() обращаются к одной и той же очереди. В главе 8 мы видели, что модификация одного набора данных из разных потоков может негативно сказаться на производительности, стало быть, с этим нужно что-то делать.

 

9.1.4. Предотвращение конкуренции за очередь работ

Всякий раз, как поток вызывает функцию submit() экземпляра пула потоков, он помещает новый элемент в единственную разделяемую очередь работ. А рабочие потоки постоянно извлекают элементы из той же очереди. Следовательно, по мере увеличения числа процессоров будет возрастать конкуренция за очередь работ. Это может ощутимо отразиться на производительности; даже при использовании свободной от блокировок очереди, в которой нет явного ожидания, драгоценное время может тратиться на перебрасывание кэша.

Чтобы избежать перебрасывания кэша, мы можем завести по одной очереди работ на каждый поток. Тогда каждый поток будет помещать новые элементы в свою собственную очередь и брать работы из глобальной очереди работ только тогда, когда в его очереди работ нет. В следующем листинге приведена реализация с использованием переменной типа thread_local, благодаря которой каждый поток обладает собственной очередью работ в дополнение к глобальной.

Листинг 9.6. Пул с очередями в поточно-локальной памяти

class thread_pool {

 thread_safe_queue pool_work_queue;

 typedef std::queue local_queue_type;← (1)

 static thread_local std::unique_ptr

  local_work_queue; ← (2)

 void worker_thread() {

  local_work_queue.reset(new local_queue_type);← (3)

  while (!done) {

   run_pending_task();

  }

 }

public:

 template

 std::future::type>

 submit(FunctionType f) {

  typedef typename std::result_of::type

   result_type;

  std::packaged_task task(f);

  std::future res(task.get_future());

  if (local_work_queue) { ← (4)

   local_work_queue->push(std::move(task));

  } else {

   pool_work_queue.push(std::move(task)); ← (5)

  }

  return res;

 }

 void run_pending_task() {

  function_wrapper task;

  if (local_work_queue && !local_work_queue->empty()) {← (6)

   task = std::move(local_work_queue->front());

   local_work_queue->pop();

   task();

  } else if(pool_work_queue.try_pop(task)) { ← (7)

   task();

  } else {

   std::this_thread::yield();

  }

 }

 // остальное, как и раньше

};

Для хранения очереди работ в поточно-локальной памяти мы воспользовались указателем std::unique_ptr<> (2), потому что не хотим, чтобы у потоков, не входящих в пул, была очередь; этот указатель инициализируется в функции worker_thread() до начала цикла обработки (3). Деструктор std::unique_ptr<> позаботится об уничтожении очереди работ по завершении потока.

Затем функция submit() проверяет, есть ли у текущего потока очередь работ (4). Если есть, то это поток из пула, и мы можем поместить задачу в локальную очередь. В противном случае задачу следует помещать в очередь пула, как и раньше (5).

Аналогичная проверка имеется в функции run_pending_task() (6), только на этот раз нужно еще проверить, есть ли что-нибудь в локальной очереди. Если есть, то можно извлечь элемент из начала очереди и обработать его. Обратите внимание, что локальная очередь может быть обычным объектом std::queue<> (1), так как к ней обращается только один поток. Если в локальной очереди задач нет, то мы проверяем очередь пула, как и раньше (7).

Таким образом мы действительно уменьшаем конкуренцию, но если распределение работ неравномерно, то может случиться, что в очереди одного потока скопится много задач, тогда как остальным будет нечем заняться. Например, в случае Quicksort только самый первый блок попадает в очередь пула, а остальные окажутся в локальной очереди того потока, который этот блок обработал. Это сводит на нет всю идею пула потоков.

К счастью, у этой проблемы есть решение: позволить потоку занимать (steal) работы из очередей других потоков, если ни в его собственной, ни в глобальной очереди ничего нет.

 

9.1.5. Занимание работ

Если мы хотим, чтобы «безработный» поток мог брать работы из очереди другого потока, то эта очередь должна быть доступна занимающему потоку в run_pending_tasks(). Для этого каждый поток должен зарегистрировать свою очередь в пуле или получать очередь от пула. Кроме того, необходимо позаботиться о надлежащей синхронизации и защите очереди работ, чтобы не нарушались инварианты.

Можно написать свободную от блокировок очередь, которая позволит потоку-владельцу помещать и извлекать элементы с одного конца, а другим потокам — занимать элементы с другого конца, однако реализация такой очереди выходит за рамки данной книги. Чтобы продемонстрировать идею, мы поступим проще — воспользуемся мьютексом для защиты данных очереди. Мы надеемся, что занимание работ — редкое событие, поэтому конкуренция за мьютекс будет невелика, и накладные расходы на такую очередь окажутся минимальны. Ниже приведена простая реализация с блокировками.

Листинг 9.7. Очередь с блокировкой, допускающей занимание работ

class work_stealing_queue {

private:

 typedef function_wrapper data_type;

 std::deque the_queue; ← (1)

 mutable std::mutex the_mutex;

public:

 work_stealing_queue() {}

 work_stealing_queue(const work_stealing_queue& other)=delete;

 work_stealing_queue& operator=(

  const work_stealing_queue& other)=delete;

 void push(data_type data) { ← (2)

  std::lock_guard lock(the_mutex);

  the_queue.push_front(std::move(data));

 }

 bool empty() const {

  std::lock_guard lock(the_mutex);

  return the_queue.empty();

 }

 bool try_pop(data_type& res) { ← (3)

  std::lock_guard lock(the_mutex);

  if (the_queue.empty()) {

   return false;

  }

  res = std::move(the_queue.front());

  the_queue.pop_front();

  return true;

 }

 bool try_steal(data_type& res) { ← (4)

  std::lock_guard lock(the_mutex);

  if (the_queue.empty()) {

   return false;

  }

  res = std::move(the_queue.back());

  the_queue.pop_back();

  return true;

 }

};

Этот класс является простой оберткой вокруг std::deque (1), которая защищает все операции доступа к очереди с помощью мьютекса. Функции push() (2) и try_ pop() (3) работают с началом очереди, а функция try_steal() — с концом (4).

Получается, что эта «очередь» для потока-владельца на самом деле является стеком, обслуживаемым согласно дисциплине «последним пришёл, первым обслужен», — задача, которая была помещена последней, извлекается первой. С точки зрения кэш-памяти это даже может повысить производительность, так как относящиеся к последней задаче данные с большей вероятностью окажутся в кэше, чем данные, относящиеся к предыдущей задаче. К тому же, такая дисциплина прекрасно подходит для алгоритмов типа Quicksort. В предшествующих реализациях каждое обращение к do_sort() помещает элемент в очередь, а затем ждет его. Обрабатывая последний помещенный в очередь элемент первым, мы гарантируем, что блок, необходимый текущему вызову для завершения работы, будет обработан раньше блоков, нужных другим ветвям, а, значит, уменьшается как количество активных задач, так и занятый размер стека. Функция try_steal() извлекает элементы из противоположного по сравнению с try_pop() конца очереди, чтобы минимизировать конкуренцию; в принципе, можно было бы применить технику, обсуждавшуюся в главах 6 и 7, чтобы поддержать одновременные обращения к try_pop() и try_steal().

Итак, теперь у нас есть замечательная очередь работ, допускающая занимание. Но как воспользоваться ей в пуле потоков? Ниже приведена одна из возможных реализаций.

Листинг 9.8. Пул потоков с использованием занимания работ

class thread_pool {

 typedef function_wrapper task_type;

 std::atomic_bool done;

 thread_safe_queue pool_work_queue;

 std::vector > queues;← (1)

 std::vector threads;

 join_threads joiner;

 static thread_local work_stealing_queue* local_work_queue;← (2)

 static thread_local unsigned my_index;

 void worker_thread(unsigned my_index_) {

  my_index = my_index_;

  local_work_queue = queues[my_index].get(); ← (3)

  while (!done) {

   run_pending_task();

  }

 }

 bool pop_task_from_local_queue(task_type& task) {

  return local_work_queue && local_work_queue->try_pop(task);

 }

 bool pop_task_from_pool_queue(task_type& task) {

  return pool_work_queue.try_pop(task);

 }

 bool pop_task_from_other_thread_queue(task_type& task) { ← (4)

  for (unsigned i = 0; i < queues.size(); ++i) {

   unsigned const index = (my_index + i + 1) % queues.size();← (5)

   if (queues[index]->try_steal(task)) {

    return true;

   }

  }

  return false;

 }

public:

 thread_pool() :

  done(false), joiner(threads) {

  unsigned const thread_count =

   std::thread::hardware_concurrency();

  try {

   for (unsigned i = 0; i < thread_count; ++i) {

    queues.push_back(std::unique_ptr (← (6)

     new work_stealing_queue));

    threads.push_back(

     std::thread(&thread_pool::worker_thread, this, i));

   }

  } catch (...) {

   done = true;

   throw;

  }

 }

 ~thread_pool() {

  done = true;

 }

 template

 std::future::type> submit(

  FunctionType f) {

  typedef typename std::result_of::type

   result_type;

  std::packaged_task task(f);

  std::future res(task.get_future());

  if (local_work_queue) {

   local_work_queue->push(std::move(task));

  } else {

   pool_work_queue.push(std::move(task));

  }

  return res;

 }

 void run_pending_task() {

  task_type task;

  if (pop_task_from_local_queue(task) || ← (7)

      pop_task_from_pool_queue (task) || ← (8)

      pop_task_from_other_thread_queue(task)) { ← (9)

   task();

  } else {

   std::this_thread::yield();

  }

 }

};

Этот код очень похож на код из листинга 9.6. Первое отличие состоит в том, что локальная очередь каждого потока — объект класса work_stealing_queue, а не просто std::queue<> (2). Новый поток не выделяет очередь для себя самостоятельно; это делает конструктор пула потоков (6), и он же сохраняет новую очередь в списке очередей для данного пула (1). Индекс очереди в списке передаётся функции потока и используется затем для получения указателя на очередь (3). Это означает, что пул потоков может получить доступ к очереди, когда пытается занять задачу для потока, которому нечего делать. Новая версия run_pending_task() сначала пытается получить задачу из очереди исполняемого потока (7), затем из очереди пула (8) и, наконец, из очереди другого потока (9).

Функция pop_task_from_other_thread_queue() (4) обходит очереди, принадлежащие всем потокам пула, пытаясь занять задачу у каждой. Чтобы не случилось так, что все потоки занимают задачи у первого потока в списке, каждый поток начинает просмотр с позиции, равной его собственному индексу (5).

Теперь у нас имеется пул потоков, пригодный для самых разных целей. Разумеется, есть масса способов улучшить его для работы в конкретной ситуации, но это я оставляю в качестве упражнения для читателя. В частности, мы совсем не исследовали идею динамического изменения размера пула, так чтобы обеспечить оптимальное использование процессоров, даже когда потоки блокированы в ожидании какого-то события, например, завершения ввода/вывода или освобождения мьютекса.

Следующим в нашем списке «продвинутых» приёмов управления потоками стоит прерывание потоков.

 

9.2. Прерывание потоков

 

Часто бывает необходимо сообщить долго работающему потоку о том, что пришло время остановиться. Например, потому что это рабочий поток пула, а мы собираемся уничтожить сам пул, или потому что пользователь отменил работу, выполняемую этим потоком. Причин миллион. Но идея в любом случае одна и та же: послать из одного потока другому сигнал с требованием прекратить работу до ее естественного завершения, и сделать это так, чтобы поток завершился корректно, а не просто выбить почву у него из-под ног.

Можно было бы придумывать такой механизм специально для каждого случая, но это, пожалуй, перебор. Мало того что общий механизм в дальнейшем упростит написание кода, так он еще и позволит писать код, допускающий прерывание, не заботясь о том, где конкретно он используется. Стандарт C++11 такого механизма не предоставляет, но реализовать его самостоятельно не слишком сложно. Я покажу, как это сделать, но сначала взгляну на проблему с точки зрения интерфейса запуска и прерывания потока, а не с точки зрения самого прерываемого потока.

 

9.2.1. Запуск и прерывание другого потока

Начнем с рассмотрения внешнего интерфейса. Что нам нужно от допускающего прерывание потока? На самом элементарном уровне интерфейс должен быть таким же, как у std::thread, но с дополнительной функцией interrupt():

class interruptible_thread {

public:

 template

 interruptible_thread(FunctionType f);

 void join();

 void detach();

 bool joinable() const;

 void interrupt();

};

В реализации можно было бы использовать std::thread для управления потоком и какую-то структуру данных для обработки прерывания. А как это выглядит с точки зрения самого потока? Как минимум, нужна возможность сказать: «Меня можно прерывать здесь», то есть нам требуется точка прерывания. Чтобы не передавать дополнительные данные, соответствующая функция должна вызываться без параметров: interruption_point(). Отсюда следует, что относящаяся к прерываниям структура данных должна быть доступна через переменную типа thread_local, которая устанавливается при запуске потока. Поэтому, когда поток обращается к функции interruption_point(), та проверяет структуру данных для текущего исполняемого потока. С реализацией interruption_point() мы познакомимся ниже.

Флаг типа thread_local — основная причина, по которой мы не можем использовать для управления потоком просто класс std::thread; память для него нужно выделить таким образом, чтобы к ней имел доступ как экземпляр interruptible_thread, так и вновь запущенный поток. Для этого функцию, переданную конструктору, можно специальным образом обернуть перед тем, как передавать конструктору std::thread. Как это делается, показано в следующем листинге.

Листинг 9.9. Простая реализация interruptible_thread

class interrupt_flag {

public:

 void set();

 bool is_set() const;

};

thread_local interrupt_flag this_thread_interrupt_flag; ← (1)

class interruptible_thread {

 std::thread internal_thread;

 interrupt_flag* flag;

public:

 template

 interruptible_thread(FunctionType f) {

  std::promise p; ← (2)

  internal_thread = std::thread([f,&p] { ← (3)

   p.set_value(&this_thread_interrupt_flag);

   f(); ← (4)

  });

  flag = p.get_future().get(); ← (5)

 }

 void interrupt() {

  if (flag) {

   flag->set(); ← (6)

  }

 }

};

Переданная функция f обертывается лямбда-функцией (3), которая хранит копию f и ссылку на локальный объект-обещание p (2). Перед тем как вызывать переданную функцию (4), лямбда-функция устанавливает в качестве значения обещания адрес переменной this_thread_interrupt_flag (объявленной с модификатором thread_local (1)) в новом потоке. Затем вызывающий поток дожидается готовности будущего результата, ассоциированного с обещанием, и сохраняет этот результат в переменной-члене flag (5). Отметим, что лямбда-функция исполняется в новом потоке и хранит висячую ссылку на локальную переменную p, но ничего страшного в этом нет, так как конструктор interruptible_thread ждет, пока на p не останется ссылок в новом потоке, и только потом возвращает управление. Еще отметим, что эта реализация не обрабатывает присоединение или отсоединение потока. Мы сами должны позаботиться об очистке переменной flag в случае выхода или отсоединения потока, чтобы избежать появления висячего указателя.

Теперь написать функцию interrupt() несложно: имея указатель на флаг прерывания, мы знаем, какой поток прерывать, поэтому достаточно просто поднять этот флаг (6). Что делать дальше, решает сам прерываемый поток. О том, как принимается это решение, мы и поговорим ниже.

 

9.2.2. Обнаружение факта прерывания потока

Итак, мы умеем устанавливать флаг прерывания, но толку от него чуть, если поток не проверяет, что его хотят прервать. В простейшем случае это можно сделать с помощью функции interruption_point(), которую можно вызывать в точке, где прерывание безопасно. Если флаг прерывания установлен, то эта функция возбуждает исключение thread_interrupted:

void interruption_point() {

 if (this_thread_interrupt_flag.is_set()) {

  throw thread_interrupted();

 }

}

Обращаться к этой функции можно там, где нам удобно:

void fоо() {

 while (!done) {

  interruption_point();

  process_next_item();

 }

}

Такое решение работает, но оно не идеально. Лучше всего прерывать поток тогда, когда он блокирован в ожидании чего-либо, но именно в этот момент поток как раз и не работает, а, значит, не может вызвать interruption_point()! Нам требуется какой-то механизм прерываемого ожидания.

 

9.2.3. Прерывание ожидания условной переменной

Итак, мы можем обнаруживать прерывание в подходящих местах программы с помощью обращений к функции interruption_point(), но это ничем не помогает в случае, когда поток блокирован в ожидании какого-то события, например сигнала условной переменной. Нам необходима еще одна функция, interruptible_wait(), которую можно будет перегрузить для различных ожидаемых событий, и нужно придумать, как вообще прерывать ожидание. Я уже говорил, что среди прочего ожидать можно сигнала условной переменной, поэтому с нее и начнем. Что необходимо для того, чтобы можно было прервать поток, ожидающий условную переменную? Проще всего было бы известить условную переменную в момент установки флага и поставить точку прерывания сразу после ожидания. Но в этом случае придётся разбудить все потоки, ждущие эту условную переменную, хотя заинтересован в этом только прерываемый поток. Впрочем, потоки, ждущие условную переменную, в любом случае должны обрабатывать ложные пробуждения, а отличить посланный нами сигнал от любого другого они не могут, так что ничего страшного не случится. В структуре interrupt_flag нужно будет сохранить указатель на условную переменную, чтобы при вызове set() ей можно было послать сигнал. В следующем листинге показана возможная реализация функции interruptible_wait() для условных переменных.

Листинг 9.10. Неправильная реализация interruptible_wait() для std::condition_variable

void interruptible_wait(std::condition_variable& cv,

 std::unique_lock& lk) {

 interruption_point();← (1)

 this_thread_interrupt_flag.set_condition_variable(cv);

 cv.wait(lk);         ← (2)

 this_thread_interrupt_flag.clear_condition_variable();← (3)

 interruption_point();

}

В предположении, что существуют функции, которые устанавливают и разрывают ассоциацию условной переменной с флагом прерывания, этот код выглядит просто и понятно. Он проверяет, не было ли прерывания, ассоциирует условную переменную с флагом interrupt_flag для текущего потока (1), ждет условную переменную (2), разрывает ассоциацию с условной переменной (3) и снова проверяет, не было ли прерывания. Если поток прерывается во время ожидания условной переменной, то прерывающий поток пошлёт этой переменной сигнал, что пробудит нас и даст возможность проверить факт. К сожалению, этот код не работает, в нем есть две проблемы. Первая довольно очевидна: функция std::condition_variable::wait() может возбуждать исключения, поэтому из interruptible_wait() возможен выход без разрыва ассоциации флага прерывания с условной переменной. Это легко исправляется с помощью структуры, которая разрывает ассоциацию в ее деструкторе.

Вторая, не столь очевидная, проблема связана с гонкой. Если поток прерывается после первого обращения к interruption_point(), но до обращения к wait(), то не имеет значения, ассоциирована условная переменная с флагом прерывания или нет, потому что поток еще ничего не ждет и, следовательно, не может быть разбужен сигналом, посланным условной переменной. Мы должны гарантировать, что потоку не может быть послан сигнал между последней проверкой прерывания и обращением к wait(). Если не залезать в код класса std::condition_variable, то сделать это можно только одним способом: использовать для защиты мьютекс, хранящийся в lk, который, следовательно, нужно передавать функции set_condition_variable(). К сожалению, при этом возникают новые проблемы: мы передаём ссылку на мьютекс, о времени жизни которого ничего не знаем, другому потоку (тому, который выполняет прерывание), чтобы тот его захватил (внутри interrupt()). Но может случиться, что этот поток уже удерживает данный мьютекс, и тогда возникнет взаимоблокировка. К тому же, появляется возможность доступа к уже уничтоженному мьютексу. В общем, это решение не годится. Но если мы не можем надежно прерывать ожидание условной переменной, то нечего было и затевать это дело — почти того же самого можно было бы добиться и без специальной функции interruptible_wait(). Так какие еще есть варианты? Можно, к примеру, задать таймаут ожидания; использовать вместо wait() функцию wait_for() с очень коротким таймаутом (скажем, 1 мс). Это ограничивает сверху время до момента, когда поток обнаружит прерывание (с учетом промежутка между тактами часов). Если поступить так, что ожидающий поток будет видеть больше ложных пробуждений из-за срабатывания таймера, но тут уж ничего не попишешь. Такая реализация показана в листинге ниже вместе с соответствующей реализацией interrupt_flag.

Листинг 9.11. Реализация interruptible_wait() для std::condition_variable с таймаутом

class interrupt_flag {

 std::atomic flag;

 std::condition_variable* thread_cond;

 std::mutex set_clear_mutex;

public:

 interrupt_flag(): thread_cond(0) {}

 void set() {

  flag.store(true, std::memory_order_relaxed);

  std::lock_guard lk(set_clear_mutex);

  if (thread_cond) {

   thread_cond->notify_all();

  }

 }

 bool is_set() const {

  return flag.load(std::memory_order_relaxed);

 }

 void set_condition_variable(std::condition_variable& cv) {

  std::lock_guard lk(set_clear_mutex);

  thread_cond = &cv;

 }

 void clear_condition_variable() {

  std::lock_guard lk(set_clear_mutex);

  thread_cond = 0;

 }

 struct clear_cv_on_destruct {

  ~clear_cv_on_destruct() {

   this_thread_interrupt_flag.clear_condition_variable();

  }

 };

};

void interruptible_wait(std::condition_variable& cv,

 std::unique_lock& lk) {

 interruption_point();

 this_thread_interrupt_flag.set_condition_variable(cv);

 interrupt_flag::clear_cv_on_destruct guard;

 interruption_point();

 cv.wait_for(lk, std::chrono::milliseconds(1));

 interruption_point();

}

Если мы ждем какой-то предикат, то таймаут продолжительностью 1 мс можно полностью скрыть внутри цикла проверки предиката:

template

void interruptible_wait(std::condition_variable& cv,

 std::unique_lock& lk,

 Predicate pred) {

 interruption_point();

 this_thread_interrupt_flag.set_condition_variable(cv);

 interrupt_flag::clear_cv_on_destruct guard;

 while (!this_thread_interrupt_flag.is_set() && !pred()) {

  cv.wait_for(lk, std::chrono::milliseconds(1));

 }

 interruption_point();

}

Правда, предикат при этом проверяется чаще, чем необходимо, но зато эту функцию легко использовать вместо простого вызова wait(). Легко реализовать и другие варианты функций с таймаутом, например: ждать в течение указанного времени или 1 мс в зависимости от того, что меньше.

Ну хорошо, с ожиданием std::condition_variable мы разобрались, а что сказать о std::condition_variable_any? Всё точно так же или можно сделать лучше?

 

9.2.4. Прерывание ожидания

std::condition_variable_any

Класс std::condition_variable_any отличается от std::condition_variable тем, что работает с любым типом блокировки, а не только с std::unique_lock. Как выясняется, это сильно упрощает дело, так что мы сможем добиться более впечатляющих результатов, чем получилось с std::condition_variable. Раз допустим любой тип блокировки, то можно написать и свой собственный класс, который захватывает (освобождает) как внутренний мьютекс set_clear_mutex в классе interrupt_flag, так и блокировку, переданную при вызове wait(). Соответствующий код приведён в листинге ниже.

Листинг 9.12. Реализация interruptible_wait() для std::condition_variable_any

class interrupt_flag {

 std::atomic flag;

 std::condition_variable* thread_cond;

 std::condition_variable_any* thread_cond_any;

 std::mutex set_clear_mutex;

public:

 interrupt_flag():

  thread_cond(0), thread_cond_any(0) {}

 void set() {

  flag.store(true, std::memory_order_relaxed);

  std::lock_guard lk(set_clear_mutex);

  if (thread_cond) {

   thread_cond->notify_all();

  } else if (thread_cond_any) {

   thread_cond_any->notify_all();

  }

 }

 template

 void wait(std::condition_variable_any& cv, Lockable& lk) {

  struct custom_lock {

   interrupt_flag* self;

   Lockable& lk;

   custom_lock(interrupt_flag* self_,

    std::condition_variable_any& cond,

    Lockable& lk_): self(self_), lk(lk_) {

    self->set_clear_mutex.lock();  ← (1)

    self->thread_cond_any = &cond; ← (2)

   }

   void unlock() { ← (3)

    lk.unlock();

    self->set_clear_mutex.unlock();

   }

   void lock() {

    std::lock(self->set_clear_mutex, lk); ← (4)

   }

   ~custom_lock() {

    self->thread_cond_any = 0; ← (5)

    self->set_clear_mutex.unlock();

   }

  };

  custom_lock cl(this, cv, lk);

  interruption_point();

  cv.wait(cl);

  interruption_point();

 }

 // остальное, как и раньше

};

template

void interruptible_wait(std::condition_variable_any& cv,

 Lockable& lk) {

 this_thread_interrupt_flag.wait(cv, lk);

}

Наш класс блокировки должен захватить внутренний мьютекс set_clear_mutex на этапе конструирования (1) и затем записать в переменную thread_cond_any указатель на объект std::condition_variable_any, переданный конструктору (2). Ссылка на объект Lockable сохраняется для последующего использования; он должен быть уже заблокирован. Теперь проверять, был ли поток прерван, можно, не опасаясь гонки. Если в этой точке флаг прерывания установлен, то это было сделано до захвата мьютекса set_clear_mutex. Когда условная переменная вызывает нашу функцию unlock() внутри wait(), мы разблокируем объект Lockable и внутренний мьютекс set_clear_mutex (3). Это позволяет потокам, которые пытаются нас прервать, захватить set_clear_mutex и проверить указатель thread_cond_any, когда мы уже находимся в wait(), но не раньше. Это именно то, чего мы хотели (но не смогли) добиться в случае std::condition_variable. После того как wait() завершит ожидание (из-за получения сигнала или вследствие ложного пробуждения), она вызовет нашу функцию lock(), которая снова захватит внутренний мьютекс set_clear_mutex и заблокирует объект Lockable (4). Теперь можно еще раз проверить, не было ли прерываний, пока мы находились в wait(), и только потом обнулить указатель thread_cond_any в деструкторе custom_lock (5), где также освобождается set_clear_mutex.

 

9.2.5. Прерывание других блокирующих вызовов

Теперь с прерыванием ожидания условной переменной всё ясно, но как быть с другими блокирующими операциями ожидания: освобождения мьютекса, готовности будущего результата и т.д.? Вообще говоря, приходится прибегать к тому же трюку с таймаутом, который мы использовали для std::condition_variable, потому что, если не влезать в код мьютекса или будущего результата, то нет никакого другого способа прервать ожидание, кроме как обеспечить выполнение ожидаемого условия. Но в отличие от условных переменных, мы точно знаем, чего ждем, поэтому можем организовать цикл внутри функции interruptible_wait().

Вот, например, как выглядит перегрузка interruptible_wait() для std::future<>:

template

void interruptible_wait(std::future& uf) {

 while (!this_thread_interrupt_flag.is_set()) {

  if (uf.wait_for(lk, std::chrono::milliseconds(1) ==

   std::future_status::ready)

  break;

 }

 interruption_point();

}

Здесь мы ждем, пока либо будет установлен флаг прерывания, либо готов будущий результат, но блокирующее ожидание будущего результата продолжается в течение 1 мс на каждой итерации цикла. Это означает, что в среднем запрос на прерывание будет обнаружен с задержкой 0,5 мс, в предположении, что разрешение часов достаточно высокое. Функция wait_for обычно ожидает в течение как минимум одного такта, поэтому если такт системных часов составляет 15 мс, то ждать придётся не одну, а 15 мс. Приемлемо это или нет, зависит от конкретных условий. Таймаут при необходимости можно уменьшить (если часы позволяют), но тогда поток будет чаще просыпаться для проверки флага, что увеличит накладные расходы на переключение задач.

На данный момент мы знаем, как можно обнаружить прерывание с помощью функций interruption_point() и interruptible_wait(), но что с этим потом делать?

 

9.2.6. Обработка прерываний

С точки зрения прерываемого потока, прерывание — это просто исключение типа thread_interrupted, которое, следовательно, можно обработать, как любое другое исключение. В частности, его можно перехватить в стандартном блоке catch:

try {

 do_something();

} catch (thread_interrupted&) {

 handle_interruption();

}

Таким образом, прерывание можно перехватить, каким-то образом обработать и потом спокойно продолжить работу. Если мы так поступим, а потом другой поток еще раз вызовет interrupt(), то поток будет снова прерван, когда в очередной раз вызовет функцию interruption_point(). Возможно, это и неплохо, если вы выполняете последовательность независимых задач; текущая задача будет прервана, а поток благополучно перейдёт к следующей задаче в списке.

Поскольку thread_interrupted — исключение, то при вызове кода, который может быть прерван, следует принимать все обычные для исключений меры предосторожности, чтобы не было утечки ресурсов, и структуры данных оставались согласованными. Часто желательно завершать поток в случае прерывания, так чтобы исключение можно было просто передать вызывающей функции. Но если позволить исключению выйти за пределы функции потока, переданной конструктору std::thread, то будет вызвана функция std::terminate(), что приведёт к завершению всей программы. Чтобы не помещать обработчик catch(thread_interrupted) в каждую функцию, которая передаётся interruptible_thread, можно включить блок catch в обертку, служащую для инициализации interrupt_flag. Тогда распространять необработанное исключение будет безопасно, так как завершится лишь отдельный поток. Инициализация потока в конструкторе interruptible_thread при таком подходе выглядит следующим образом:

internal_thread = std::thread([f, &p] {

 p.set_value(&this_thread_interrupt_flag);

  try {

  f();

 } catch(thread_interrupted const&) {}

});

А теперь рассмотрим конкретный пример, когда прерывание оказывается полезно.

 

9.2.7. Прерывание фоновых потоков при выходе из приложения

Представьте себе приложение для поиска в файловой системе настольного ПК. Оно должно не только взаимодействовать с пользователем, но и следить за состоянием файловой системы, обнаруживать изменения и обновлять свой индекс. Обычно такие операции поручаются фоновому потоку, чтобы пользовательский интерфейс мог реагировать на действия пользователя. Фоновый поток должен работать на протяжении всего времени жизни приложения; он запускается на этапе инициализации и трудится, пока приложение не завершится. Обычно это происходит при останове операционной системы, так как приложение должно постоянно поддерживать индекс в актуальном состоянии. Как бы то ни было, когда приложение завершается, надо аккуратно остановить и фоновые потоки, например, прервав их.

В следующем листинге показана возможная реализация управления потоками в такой программе.

Листинг 9.13. Фоновый мониторинг файловой системы

std::mutex config_mutex;

std::vector background_threads;

void background_thread(int disk_id) {

 while (true) {

  interruption_point(); ← (1)

  fs_change fsc = get_fs_changes(disk_id); ← (2)

  if (fsc.has_changes()) {

   update_index(fsc); ← (3)

  }

 }

}

void start_background_processing() {

 background_threads.push_back(

  interruptible_thread(background_thread, disk_1));

 background_threads.push_back(

  interruptible_thread(background_thread, disk_2));

}

int main() {

 start_background_processing(); ← (4)

 process_gui_until_exit(); ← (5)

 std::unique_lock lk(config_mutex);

 for (unsigned i = 0; i < background_threads.size(); ++i) {

  background_threads[i].interrupt(); ← (6)

 }

 for (unsigned i = 0; i < background_threads.size(); ++i) {

  background_threads[i].join(); ← (7)

 }

}

В самом начале запускаются фоновые потоки (4). Затем главный поток продолжает обслуживать пользовательский интерфейс (5). Когда пользователь хочет выйти из приложения, фоновые потоки прерываются (6), после чего главный поток ждет их завершения (7), и только потом выходит сам. Каждый фоновый поток исполняет цикл, в котором следит за изменениями на диске (2) и обновляет индекс (3). На каждой итерации цикла поток проверяет, не прервали ли его, вызывая функцию interruption_point() (1).

Почему мы прерываем все потоки до того, как начинать ждать их завершения? Почем нельзя прервать один поток, дождаться его, потом прервать следующий и так далее? Все из-за параллелизма. Поток не завершается сразу после прерывания, так как должен добраться до очередной точки прерывания, а затем, перед выходом, выполнить все деструкторы и код обработки исключений. Если главный поток будет присоединять прерванные потоки сразу после прерывания, то ему придётся ждать, хотя в это время он мог бы делать полезную работу — прерывать другие потоки. Поэтому мы поступаем по-другому — начинаем ждать только тогда, когда больше никакой работы не осталось (все потоки уже прерваны). Заодно это позволяет прерываемым потокам обрабатывать прерывания параллельно, так что общее время завершения, возможно, уменьшится.

Описанный механизм прерывания легко развить, добавив дополнительные прерываемые вызовы или запретив прерывания на определенном участке кода, но это я оставляю читателю в качестве упражнения.

 

9.3. Резюме

В этой главе мы рассмотрели «продвинутые» способы управления потоками: пулы и прерывание потоков. Мы видели, как использование локальных очередей работ и занимания работ может снизить накладные расходы на синхронизацию и потенциально увеличить пропускную способность пула потоков. Кроме того, мы поняли, что выполнение других выбранных из очереди задач во время ожидания завершения подзадачи устраняет возможность взаимоблокировки.

Мы также познакомились с различными способами прерывания одного потока другим, в частности, с точками прерывания и функциями, которые допускают прерывание во время блокирующего ожидания.