(2) и возбудить исключение, если это не так.Поддержка перемещения в классе
std::thread
позволяет также хранить объекты этого класса в контейнере при условии, что класс контейнера поддерживает перемещение (как, например, модифицированный класс std::vector<>
). Это означает, что можно написать код, показанный в листинге 2.7, который запускает несколько потоков, а потом ждет их завершения.
Листинг 2.7. Запуск нескольких потоков и ожидание их завершения
void do_work(unsigned id);
void f() {
std::vector threads;
for (unsigned i = 0; i < 20; ++i) { │
Запуск threads.push_back(std::thread(do_work(i))); ←┘
потоков } │
Поочередный std::for_each(threads.begin(), threads.end(),│
вызов join() std::mem_fn(&std::thread::join)); ←┘
для каждого потока}
Если потоки применяются для разбиения алгоритма на части, то зачастую такой подход именно то, что требуется: перед возвратом управления вызывающей программе все потоки должны завершиться. Разумеется, столь простая структура, как в листинге 2.7, предполагает, что каждый поток выполняет независимую работу, а единственным результатом является побочный эффект, заключающийся в изменении разделяемых данных. Если бы функция
f()
должна была вернуть вызывающей программе значение, зависящее от результатов операций, выполненных в потоках, то при такой организации получить это значение можно было бы только путем анализа разделяемых данных по завершении всех потоков. В главе 4 обсуждаются альтернативные схемы передачи результатов работы из одного потока в другой.Хранение объектов
std::thread
в векторе std::vector
— шаг к автоматизации управления потоками: вместо тот чтобы создавать отдельные переменные для потоков и выполнять соединение напрямую, мы можем рассматривать группу потоков. Можно пойти еще дальше и создавать не фиксированное число потоков, как в листинге 2.7, а определять нужное количество динамически, во время выполнения.2.4. Задание количества потоков во время выполнения
В стандартной библиотеке С++ есть функция
std::thread::hardware_concurrency()
, которая поможет нам решить эту задачу. Она возвращает число потоков, которые могут работать по-настоящему параллельно. В многоядерной системе это может быть, например, количество процессорных ядер. Возвращаемое значение всего лишь оценка; более того, функция может возвращать 0, если получить требуемую информацию невозможно. Однако эту оценку можно с пользой применить для разбиения задачи на несколько потоков.В листинге 2.8 приведена простая реализация параллельной версии
std::accumulate
. Она распределяет работу между несколькими потоками и, чтобы не создавать слишком много потоков, задает ограничение снизу на количество элементов, обрабатываемых одним потоком. Отмстим, что в этой реализации предполагается, что ни одна операция не возбуждает исключений, хотя в принципе исключения возможны; например, конструктор std::thread
возбуждает исключение, если не может создать новый поток. Но если добавить в этот алгоритм обработку исключений, он перестанет быть таким простым; эту тему мы рассмотрим в главе 8.
Листинг 2.8. Наивная реализация параллельной версии алгоритма
std::accumulate
template
struct accumulate_block {
void operator()(Iterator first, Iterator last, T& result) {
result = std::accumulate(first, last, result);
}
};
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length) ←
(1) return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length+min_per_thread - 1) / min_per_thread; ←
(2)
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads = ←
(3) std::min(
hardware.threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads; ←
(4)
std::vector results(num_threads);
std::vector threads(num_threads - 1); ←
(5)
Iterator block_start = first;
for(unsigned long i = 0; i < (num_threads - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size); ←
(6)
threads[i] = std::thread( ←
(7) accumulate_block(),
block_start, block_end, std::ref(results(i)));
block_start = block_end; ←
(8) }
accumulate_block()(
block_start, last, results[num_threads-1]); ←
(9)
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join)); ←
(10)
return
std::accumulate(results.begin(), results.end(), init); ←
(11)}
Хотя функция довольно длинная, по существу она очень проста. Если входной диапазон пуст (1), то мы сразу возвращаем начальное значение
init
. В противном случае диапазон содержит хотя бы один элемент, поэтому мы можем разделить количество элементов на минимальный размер блока и получить максимальное число потоков (2).Это позволит избежать создания 32 потоков на 32-ядерной машине, если диапазон состоит всего из пяти элементов.
Число запускаемых потоков равно минимуму из только что вычисленного максимума и количества аппаратных потоков (3): мы не хотим запускать больше потоков, чем может поддержать оборудование (это называется превышением лимита), так как из-за контекстных переключений при большем количестве потоков производительность снизится. Если функция
std::thread::hardware_concurrency()
вернула 0, то мы берем произвольно выбранное число, я решил остановиться на 2. Мы не хотим запускать слишком много потоков, потому что на одноядерной машине это только замедлило бы программу. Но и слишком мало потоков тоже плохо, так как это означало бы отказ от возможного параллелизма.Каждый поток будет обрабатывать количество элементов, равное длине диапазона, поделенной на число потоков (4). Пусть вас не пугает случай, когда одно число нацело не делится на другое, — ниже мы рассмотрим его.
Теперь, зная, сколько необходимо потоков, мы можем создать вектор
std::vector
для хранения промежуточных результатов и вектор std::vector
для хранения потоков (5). Отметим, что запускать нужно на один поток меньше, чем num_threads
, потому что один поток у нас уже есть.Запуск потоков производится в обычном цикле: мы сдвигаем итератор
block_end
в конец текущего блока (6) и запускаем новый поток для аккумулирования результатов по этому блоку (7). Начало нового блока совпадает с концом текущего (8).После того как все потоки запущены, главный поток может обработать последний блок (9). Именно здесь обрабатывается случай деления с остатком: мы знаем, что конец последнего блока —
last
, а сколько в нем элементов, не имеет значения.Аккумулировав результаты но последнему блоку, мы можем дождаться завершения всех запущенных потоков с помощью алгоритма