Пул управляемых потоков
Характеристики пула потоков
Потоки из пула являются фоновыми. Для каждого потока используется размер стека по умолчанию, поток запускается с приоритетом по умолчанию и находится в многопотоковом подразделении. Когда поток в пуле завершает свою задачу, он возвращается в очередь потоков в состоянии ожидания. С этого момента его можно использовать вновь. Повторное использование позволяет приложениям избежать дополнительных затрат на создание новых потоков для каждой задачи.
Для каждого процесса существует только один пул потоков.
Исключения в потоках из пула потоков
Необработанные исключения в потоках из пула приводят к завершению процесса. Есть три исключения из этого правила:
Максимальное число потоков в пуле потоков
Число операций, которое можно поставить в очередь в пуле потоков, ограничено только доступной памятью. Однако пул потоков имеет ограничение на число потоков, которое можно активировать в процессе одновременно. Если все потоки в пуле заняты, дополнительные рабочие элементы помещаются в очередь и ожидают их освобождения. Размер по умолчанию пула потоков для процесса зависит от нескольких факторов, таких как размер виртуального адресного пространства. Процесс может вызвать метод ThreadPool.GetMaxThreads для определения количества потоков.
Вы можете управлять максимальным количеством потоков с помощью методов ThreadPool.GetMaxThreads и ThreadPool.SetMaxThreads.
Минимальные значения пула потоков
Пул потоков предоставляет новые рабочие потоки или потоки завершения ввода-вывода по запросу, пока не будет достигнут заданный минимум для каждой категории. Для получения этих минимальных значений можно использовать метод ThreadPool.GetMinThreads.
Если потребность низкая, фактическое количество потоков из пула потоков может быть ниже минимальных значений.
При достижении минимума пул потоков может создавать дополнительные потоки или ожидать завершения некоторых задач. Пул потоков создает и уничтожает рабочие потоки в целях оптимизации пропускной способности, которая определяется как количество задач, завершаемых за единицу времени. Слишком малое количество потоков может препятствовать оптимальному использованию доступных ресурсов, тогда как слишком большое их количество может усиливать конкуренцию за ресурсы.
Для увеличения минимального количества бездействующих потоков можно использовать метод ThreadPool.SetMinThreads. Однако необоснованное увеличение этих значений может привести к снижению производительности. Если одновременно запускается слишком много задач, все они могут выполняться слишком медленно. В большинстве случаев пул потоков работает наилучшим образом, если он использует собственный алгоритм выделения потоков.
Использование пула потоков
Пул потоков также можно использовать путем вызова ThreadPool.QueueUserWorkItem из управляемого кода (или ICorThreadpool::CorQueueUserWorkItem из неуправляемого кода) и передачи делегата System.Threading.WaitCallback, представляющего метод, который выполняет задачу.
Другим способом использования пула потоков является помещение в очередь рабочих элементов, которые имеют отношение к операции ожидания, с помощью метода ThreadPool.RegisterWaitForSingleObject и передача дескриптора System.Threading.WaitHandle, который вызывает метод, представленный делегатом System.Threading.WaitOrTimerCallback, при получении сигнала или истечении времени ожидания. Потоки из пула потоков используются для вызова методов обратного вызова.
Примеры см. по ссылкам на страницы API.
Пропуск проверок безопасности
Пул потоков также предоставляет методы ThreadPool.UnsafeQueueUserWorkItem и ThreadPool.UnsafeRegisterWaitForSingleObject. Используйте эти методы только в том случае, если вы уверены, что стек вызывающего объекта не важен для проверок безопасности, осуществляемых во время выполнения задачи в очереди. ThreadPool.QueueUserWorkItem и ThreadPool.RegisterWaitForSingleObject перехватывают стек вызывающего объекта, который объединяется со стеком потока из пула потоков, когда поток начинает выполнять задачу. Если требуется проверка безопасности, проверяется весь стек. Несмотря на обеспечение безопасности, такая проверка также влияет на производительность.
Когда не следует использовать потоки из пула потоков
Существует ряд сценариев, в которых следует создавать собственные потоки и работать с ними, а не использовать потоки из пула:
Пулы потоков
Пул потоков — это коллекция рабочих потоков, которые эффективно выполняют асинхронные обратные вызовы от имени приложения. Пул потоков в основном используется для сокращения числа потоков приложения и обеспечения управления рабочими потоками. Приложения могут ставить в очередь рабочие элементы, связывать работу с ожидающими дескрипторами, автоматически ставить в очередь на основе таймера и выполнять привязку с помощью операций ввода-вывода.
Архитектура пула потоков
Использование пула потоков может быть выгодным для следующих приложений:
исходный пул потоков был полностью изменен в Windows Vista. Новый пул потоков улучшен, так как он предоставляет один тип рабочего потока (поддерживает операции ввода-вывода и без ввода-вывода), не использует поток таймера, предоставляет одну очередь таймера и предоставляет выделенный постоянный поток. Он также предоставляет группы очистки, более высокую производительность, несколько пулов для каждого процесса, которые планируются независимо, и новый API пула потоков.
Архитектура пула потоков состоит из следующих компонентов:
Рекомендации
Новый API пула потоков обеспечивает большую гибкость и управление, чем Исходный API пула потоков. Однако существует несколько незначительных, но важных отличий. В исходном API ожидание сброса было автоматическим; в новом API ожидание должно быть явным образом сброшено каждый раз. Исходный API, автоматически обрабатывающий олицетворение, передает контекст безопасности вызывающего процесса в поток. В новом API приложение должно явно задать контекст безопасности.
Ниже приведены рекомендации по использованию пула потоков.
Потоки процесса совместно используют пул потоков. Один рабочий поток может выполнять несколько функций обратного вызова по одной за раз. Эти рабочие потоки управляются пулом потоков. Поэтому не следует завершать поток из пула потоков путем вызова TerminateThread в потоке или путем вызова ExitThread из функции обратного вызова.
Очистите все ресурсы, созданные в функции обратного вызова перед возвратом из функции. К ним относятся TLS, контексты безопасности, приоритет потоков и регистрация COM. Функции обратного вызова также должны восстанавливать состояние потока перед возвратом.
Отслеживайте дескрипторы ожидания и связанные с ними объекты до тех пор, пока пул потоков не сообщит о завершении дескриптора.
Пометьте все потоки, ожидающие длительные операции (такие как очистка ввода-вывода или очистка ресурсов), чтобы пул потоков мог выделить новые потоки вместо ожидания этого.
Перед выгрузкой библиотеки DLL, использующей пул потоков, отмените все рабочие элементы, операции ввода-вывода, ожидания и таймеры и дождитесь завершения выполнения ответных вызовов.
Избегайте взаимоблокировок, удалив зависимости между рабочими элементами и обратными вызовами, убедившись, что обратный вызов не ждет завершения и, сохраняя приоритет потока.
Не ставить слишком много элементов слишком быстро в процесс с другими компонентами, использующими пул потоков по умолчанию. Существует один пул потоков по умолчанию для каждого процесса, включая Svchost.exe. По умолчанию каждый пул потоков имеет максимум 500 рабочих потоков. Пул потоков пытается создать больше рабочих потоков, когда число рабочих потоков в состоянии «Готово» или «выполняется» должно быть меньше числа процессоров.
Избегайте модели однопотокового апартамента COM, так как она несовместима с пулом потоков. STA создает состояние потока, которое может повлиять на следующий рабочий элемент для потока. Как правило, STA является длительным и имеет сходство потоков, что является противоположностью пула потоков.
Создание нового пула потоков для управления приоритетом и изоляцией потоков, создание пользовательских характеристик и, возможно, повышение скорости реагирования. Однако для дополнительных пулов потоков требуются дополнительные системные ресурсы (потоки, память ядра). Слишком большое количество пулов повышает вероятность состязаний за использование ЦП.
По возможности используйте ожидающий объект, а не механизм на основе APC для сигнализации потока пула потоков. APC не работают с потоками пула потоков в качестве других механизмов сигнализации, поскольку система управляет временем существования потоков пула потоков, поэтому поток может быть завершен до доставки уведомления.
Используйте расширение отладчика пула потоков,! TP. Эта команда имеет следующие сведения об использовании:
Для пула, ожидания и рабочей роли, если адрес равен нулю, команда создает дампы всех объектов. Для ожидающих и рабочих ролей пропуск адреса в дампе текущего потока. Определены следующие флаги: 0x1 (однострочный вывод), 0x2 (элементы дампа) и 0x4 (Рабочая очередь пула дампа).
Пулы потоков
Потоки (thread) в приложении можно разделить на три категории:
Нагружающие процессор (CPU bound).
Блокирующие ввод-вывод (Blocking IO).
Неблокирующие ввод-вывод (Non-blocking IO).
У каждой из этих категорий своя оптимальная конфигурация и применение.
Для задач, требующих процессорного времени, нужен пул с заранее созданными потоками с количеством потоков равным числу процессоров. Единственная работа, которая будет выполняться в этом пуле, — вычисления на процессоре, и поэтому нет смысла превышать их количество, если только у вас не какая-то специфическая задача, способная использовать Hyper-threading (в таком случае вы можете использовать удвоенное количество процессоров). Обратите внимание, что в старом подходе «количество процессоров + 1» речь шла о смешанной нагрузке, когда объединялись CPU-bound и IO-bound задачи. Мы не будем такого делать.
Проблема с фиксированным пулом потоков заключается в том, что любая блокирующая операция ввода-вывода (да и вообще любая блокирующая операция) «съест» поток, а поток — очень ценный ресурс. Получается, что нам нужно любой ценой избегать блокировки CPU-bound пула. Но к сожалению, это не всегда возможно (например, при использовании библиотек с блокирующим вводом-выводом). В этом случае всегда следует переносить блокирующие операции (ввод-вывод и другие) в отдельный пул. Этот отдельный пул должен быть кэшируемым и неограниченным, без предварительно созданных потоков. Честно говоря, такой пул очень опасен. Он не ограничивает вас и позволяет создавать все больше и больше потоков при блокировке других, что очень опасно. Обязательно стоит убедиться, что есть внешние ограничения, то есть существуют высокоуровневые проверки, гарантирующие выполнение в каждый момент времени только фиксированного количества блокирующих операций (это часто делается с помощью неблокирующей ограниченной очереди).
Последняя категория потоков (если у вас не Swing / SWT) — это асинхронный ввод-вывод. Эти потоки в основном просто ожидают и опрашивают ядро на предмет уведомлений асинхронного ввода-вывода, и пересылают эти уведомления в приложение. Для этой задачи лучше использовать небольшое число фиксированных, заранее выделенных потоков. Многие приложения для этого используют всего один поток! У таких потоков должен быть максимальный приоритет, поскольку производительность приложения будет ограничена ими. Однако вы должны быть осторожны и никогда не выполнять какую-либо работу в этом пуле! Никогда, никогда, никогда. При получении уведомления вы должны немедленно переключиться обратно на CPU-пул. Каждая наносекунда, потраченная на поток (потоки) асинхронного ввода-вывода, добавляет задержки в ваше приложение. Поэтому производительность некоторых приложений можно немного улучшить, сделав пул асинхронного ввода-вывода в 2 или 4 потока, а не стандартно 1.
Глобальные пулы потоков
Относитесь осторожно к любому фреймворку или библиотеке, затрудняющему настройку пула потоков или устанавливающему по умолчанию пул, которым вы не можете управлять.
Многопоточность в Java. Лекция 4: пулы потоков
Продолжаем публикацию краткого курса наших коллег: после общих сведений, основ многопоточных программ, блокировок и других методов синхронизации потоков речь пойдет о пулах потоков и очереди задач.
4.1 Пулы потоков Runnable и Callable
Создавать потоки для выполнения большого количества задач очень трудоемко: создание потока и освобождение ресурсов — дорогостоящие операции. Для решения проблемы ввели пулы потоков и очереди задач, из которых берутся задачи для пулов. Пул потоков — своего рода контейнер, в котором содержатся потоки, которые могут выполнять задачи, и после выполнения одной самостоятельно переходить к следующей.
Вторая причина создания пулов потоков — возможность разделить объект, выполняющий код, и непосредственно код задачи, которую необходимо выполнить. Использование пула потоков обеспечивает лучший контроль создания потоков и экономит ресурсы создания потоков. Также использование пула потоков упрощает разработку многопоточных программ, упрощая создание и манипулирование потоками. За созданием и управлением пулом потоков отвечают несколько классов и интерфейсов, которые называются Executor Framework in Java.
Рис 1: Упрощенное схематическое представление классов, отвечающих за пул потоков
Примечания к рисунку 1. Это не схема наследования классов и не UML-диаграмма, а простая структурная схема, которая показывает, кто что использует, какие есть методы и что получается в результате.
Рассмотрим основные интерфейсы и классы, входящие в этот фреймворк. Его основные интерфейсы: Executor, ExecutorService и фабрика Executors. Объекты, которые реализуют интерфейс Executor, могут выполнять runnable-задачу. Интерфейс Executor имеет один метод void execute(Runnable command). После вызова этого метода и передачи задачи на выполнение задача в будущем будет выполнена асинхронно. Также этот интерфейс разделяет, кто будет выполнять задачу и что будет выполняться, — в отличии от класса Thread.
Класс Executors — утилитный клас, как например, класс Collections. Класс Executors создает классы, которые реализуют интерфейсы Executor и ExecutorService. Основные реализации пула потоков, т. е. реализации интерфейсов Executor и ExecutorServcie:
4.2 ThreadFactory
4.3 Завершение выполнения пула потоков
Для завершения работы пула потоков у интерфейса ExecutorService есть несколько методов: shutdown(), shutdownNow() и awaitTermination(long timeout, TimeUnit unit).
После вызова метода shutdown() пул потоков продолжит работу, но при попытке передать на выполнение новые задачи они будут отклонены, и будет сгенерирован RejectedExecutionException.
Метод shutdownNow() не запускает задачи, которые были уже установлены на выполнение, и пытается завершить уже запущенные.
Метод awaitTermination(long timeout, TimeUnit unit) блокирует поток, который вызвал этот метод, пока все задачи не выполнят работу, или пока не истечет таймаут, который передан при вызове метода, или пока текущий ожидающий поток не будет прерван. В общем, пока какое-то из этих условий не выполнится первым.
4.4 Отмена задач в Executors
После передачи Runnable или Callable возвращается объект Future. Этот объект имеет метод cancel(), который может использоваться для отмены задания. Вызов этого метода имеет разный эффект в зависимости от того, когда был вызван метод. Если метод был вызван, когда задача еще не начала выполняться, задача просто удаляется из очереди задач. Если
задача уже выполнилась, вызов метода cancel() не приведет ни к каким результатам.
Самый интересный случай возникает, когда задача находится в процессе выполнения. Задача может не остановиться, потому что в Java задачи полагаются на механизм называемый прерыванием потока. Если поток не проигнорирует этот сигнал, поток остановится. Однако он может и не отреагировать на сигнал прерывания.
Иногда необходимо реализовать нестандартную отмену выполнения задачи. Например, задача выполняет блокирующий метод, скажем, ServerSocket.accept(), который ожидает подключения какого-то клиента. Этот метод игнорирует любые проверки флага interrupted. В представленном выше случае для остановки задачи необходимо закрыть сокет, при этом возникнет исключение, которое следует обработать. Есть два способа реализации нестандартного завершения потока. Первый — переопределение метода interrupt() в классе Thread, который не рекомендуется использовать. Второй — переопределение метода Future.cancel(). Однако объект Future — интерфейс, и объекты, которые реализуют этот интерфейс, пользователь не создает вручную. Значит, надо найти способ, который позволит это сделать. И такой способ есть. Объект Future возвращается после вызова метода submit(). Под капотом ExecutorService вызывает метод newTaskFor(Callable c) для получения объекта Future. Метод newTaskFor стоит переопределить, чтобы он вернул объект Future с нужной функциональностью метода cancel().
Листинг 1:
import java.util.concurrent.BlockingQueue;
public class CustomFutureReturningExecutor extends ThreadPoolExecutor <
public CustomFutureReturningExecutor(int corePoolSize,
int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue) <
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
>
@Override
protected RunnableFuture newTaskFor(Callable callable) <
if (callable instanceof IdentifiableCallable) <
return ((IdentifiableCallable ) callable).newTask();
> else <
return super.newTaskFor(callable);
>
>
>
Листинг 2:
import java.util.concurrent.*;
public interface IdentifiableCallable extends Callable <
int getId();
RunnableFuture newTask();
void cancelTask();
>
Дальше необходимо определить класс FutureWarapper, для того чтобы можно было переопределить метод cancel();
Листинг 3:
import java.util.concurrent.*;
public abstract class FutureTaskWrapper extends FutureTask <
public FutureTaskWrapper(Callable c) <
super(c);
>
abstract int getTaskId();
>
Класс FutureTask реализует одновременно Runnable и Callable. Этот класс представляет базовую реализацию интерфейса Future и предназначен для добавления новой функциональности. Дальше следует определить задание, которое будет выполняться в в Executor:
Листинг 4:
class Task implements IdentifiableCallable <
private final int id;
volatile boolean cancelled; // Cancellation flag
public Task(int id) <
this.id = id;
>
@Override
public synchronized int getId() <
return id;
>
@Override
public RunnableFuture newTask() <
return new FutureTaskWrapper (this) <
@Override
public boolean cancel(boolean mayInterruptIfRunning) <
Task.this.cancelTask();
return super.cancel(mayInterruptIfRunning);
>
@Override
public int getTaskId() <
return getId();
>
>;
>
@Override
public synchronized void cancelTask() <
cancelled = true;
>
@Override
public Boolean call() <
while (!cancelled) <
// Do Samba
>
System.out.println(«bye»);
return true;
>
>
В листинге 4 из метода newTask() возвращается класс, который унаследован от класса FutureTaskWrapper. В конструктор этого класса передается ссылка this (объект Callable), необходимая для корректного создания объекта Futuretask. В листинге 5 приведен код главного класса программы, который запускает усовершенствованный ExecutorService.
Листинг 5:
import java.util.concurrent.*;
public class FutureTaskWrapperConcept <
public static void main(String[] args) throws Exception <
ExecutorService exec = new CustomFutureReturningExecutor(1, 1,
Long.MAX_VALUE, TimeUnit.DAYS,
new LinkedBlockingQueue ());
Future f = exec.submit(new Task(0));
FutureTaskWrapper ftw = null;
if (f instanceof FutureTaskWrapper) <
ftw = (FutureTaskWrapper ) f;
> else <
throw new Exception(«wtf»);
>br
try <
Thread.sleep(2000);
> catch (InterruptedException ignored) <
>br System.out.println(«Task Id: » + ftw.getTaskId());
ftw.cancel(true);
exec.shutdown();
>
>
Теперь при вызове метода cancel() будет выполнена нестандартная логика по отмене задачи.
4.5 Обработка исключений.
Для обработки исключений, которые возникают при выполнении объектов Runnable, устанавливается обработчик исключений в ThreadFactory, затем ThreadFactory устанавливает потоку:
Листинг 6:
public class ExceptionHandler implements Thread.UncaughtExceptionHandler <
@Override
public void uncaughtException(Thread thread, Throwable t) <
System.out.println(«Uncaught exception is detected! » + t + » st: » +
Arrays.toString(t.getStackTrace()));
>
>
public class CustomThreadFactory implements ThreadFactory <
private final Thread.UncaughtExceptionHandler handler;
public CustomThreadFactory(Thread.UncaughtExceptionHandler handler) <
this.handler = handler;
>
@Override
public Thread newThread(Runnable run) <
Thread thread = Executors.defaultThreadFactory().newThread(run);
thread.setUncaughtExceptionHandler(handler);
return thread;
>
>
Листинг 7:
void beforeExecute(Thread t, Runnable r) и void afterExecute(Thread t, Runnable r). Эти методы выполняются тем потоком, который будет выполнять непосредственно само задание. Если переопределить метод afterExecute(), исключения, которые будут сгенерированы в процессе выполнения задания, можно будет обработать в методе afterExecute. Пример в Листинге 8.
Листинг 8:
4.6 Класс ThreadPollExecutor
Один из основных классов, которые генерирует фабрика Executors, — класс ThreadPoolExecutor. Рассмотрим основные параметры этого класса.
Параметры core and maximum pool size. ThreadPoolExecutor автоматически настроит размер пула потоков в соответствии с установленными значениями corePoolSize и maximumPoolSize. Когда пулу потоков передается новая задача, а количество работающих потоков меньше, чем corePoolSize, создается новый поток, даже когда другие потоки ничего не делают. Если количество запущенных потоков больше, чем corePoolSize, но меньше, чем maximumPoolSize, новый поток будет создан, если очередь задач заполнена. Если значения параметров corePoolSize и maximumPoolSize равны, создается пул потоков фиксированного размера. Если в качестве параметра maximumPoolSize передается неограниченное значение, например, Integer.MAX_VALUE, это позволяет пулу потоков выполнять произвольное количество задач. Класс ThreadPoolExecutor, как и другие классы пула потоков, использует очередь задач для передачи и удержания задачи для пула потоков.
При работе с очередью задач используют следующие правила:
4.7 Fork/Join Pool
С выходом Java 7 в арсенале разработчиков появился новый фреймворк Fork/Join Poll. В Java 8 Fork/Join pool создается по умолчанию, когда мы вызываем метод parallel() для параллельной обработки данных. Также Fork/Join pool используется в классе CompletableFuture. Класс ForkJoinPool реализует интерфейсы Executor, ExecutorService. Класс ForkJoinPool можно создать через ключевое слово new и через класс Executors.newWorkStealingPool().
ForkJoinPool использует способ, когда одна задача разделяется на несколько мелких, которые выполняются по отдельности, а затем полученные ответы объединяются в единый результат. В Fork/Join Pool есть много методов, однако используются в основном три: fork(), compute() и join(). Метод compute() содержит изначально большую задачу, которую необходимо выполнить. В методе compute() используется один и тот же шаблон: если задача слишком большая, она разбивается на две или большее количество подзадач, если задача достаточно маленькая, согласно условиям, заданным программистом, она выполняется в методе compute(). Пример псевдокода — в Листинге 9.
Листинг 9:
if(Task is small) <
Execute the task
> else <
//Split the task into smaller chunks
ForkJoinTask first = getFirstHalfTask();
first.fork();
ForkJoinTask second = getSecondHalfTask();
second.compute();
first.join();
>
К каждому потоку, который правильнее было бы называть воркером, в Fork/Join пуле назначена очередь — dequeue. Изначально очереди пустые, и потоки в пуле без работы. Переданная в Fork/Join pool основная задача (top-level) помещается в очередь, предназначенную для top-level задач. Для этого процесса существует отдельный поток, который запускает еще несколько потоков, которые будут непосредственно участвовать в обработке подзадач. Это сделано, чтобы не тратить время на запуск потоков Fork/Join пула в дальнейшем. Усредненное время запуска потока на операционных системах типа Linux на JVM — около 50 мкс. Однако если запускать несколько потоков одновременно, времени требуется больше, а для некоторых приложений даже совсем небольшая задержка оказывается критической.
После вызова метода fork() задача будет разбита на две или более подзадач и помещена в очередь текущего потока. Полученные задачи кладутся в голову очереди. Текущий поток также получает задачи из головы очереди. Этот подход применен, чтобы поток работал со своей очередью без синхронизации. Другие потоки, которые хотят украсть задачи из очереди, получают задачи из хвоста очереди — там используется синхронизация.
В псевдокоде в Листинге 9 вызывается метод compute() для выполнения второй части задачи.
Предположим, что первая часть задачи будет украдена другим потоком, он, в свою очередь, разобьет первую подзадачу еще на две подзадачи, и процесс будет повторяться, пока задачи не станут достаточно малыми для их выполнения без разбивки. Порог, до которого следует разбивать задачи, рекомендуется выбирать исходя из следующего условия:
ThreshHold = N / (C*L), где N — это размер задачи, L — так называемый load factor. Это число имеет порядок от 10 до 100 — по сути это количество задач, которое выполнит один воркер, умноженное на С — количество воркеров в пуле.
Есть несколько базовых подходов для распределения задач в threadpoll`ах:
Листинг 10:
import java.util.List;
import java.util.Random;
import java.util.concurrent.RecursiveAction;
public class Task extends RecursiveAction <
private List
products;
private int first;
private int last;
public Task(List
Пример программы использования CountedCompleter приведен в листинге 12.

