Чистый код. Создание, анализ и рефакторинг — страница 64 из 94

Executor Framework

Как демонстрирует пример ExecutorClientScheduler.java на с. 361, представленная в Java 5 библиотека Executor предоставляет расширенные средства управления выполнением программ с использованием пулов программных потоков. Библиотека реализована в виде класса в пакете java.util.concurrent.

Если вы создаете потоки, не используя пулы, или используете ручную реализацию пулов, возможно, вам стоит воспользоваться Executor. От этого ваш код станет более чистым, понятным и компактным.

Инфраструктура Executor создает пул потоков с автоматическим изменением размера и повторным созданием потоков при необходимости. Также поддерживаются фьючерсы – стандартная конструкция многопоточного программирования. Библиотека Executor работает как с классами, реализующими интерфейс Runnable, так и с классами, реализующими интерфейс  Callable. Интерфейс Callable похож на Runnable, но может возвращать результат, а это стандартная потребность в многопоточных решениях.

Фьючерсы удобны в тех ситуациях, когда код должен выполнить несколько независимых операций и дождаться их завершения:

public String processRequest(String message) throws Exception {

    Callable makeExternalCall = new Callable() {

        public String call() throws Exception {

            String result = "";

            // Внешний запрос

            return result;

        }

    };

    Future result = executorService.submit(makeExternalCall);

    String partialResult = doSomeLocalProcessing();

    return result.get() + partialResult;

}

В этом примере метод запускает на выполнение объект makeExternalCall, после чего переходит к выполнению других действий. Последняя строка содержит вызов result.get(), который блокирует выполнение вплоть до завершения фьючерса.

Неблокирующие решения

Виртуальная машина Java 5 пользуется особенностями архитектуры современных процессоров, поддерживающих надежное неблокирующее обновление. Для примера возьмем класс, использующий синхронизацию (а следовательно, блокировку) для реализации потоково-безопасного обновления value,

public class ObjectWithValue {

    private int value;

    public void synchronized incrementValue() { ++value; }

    public int getValue() { return value; }

}

В Java 5 для этой цели появился ряд новых классов. AtomicBoolean, AtomicInteger и AtomicReference – всего лишь три примера; есть и другие. Приведенный выше фрагмент можно переписать без использования блокировки в следующем виде:

public class ObjectWithValue {

    private AtomicInteger value = new AtomicInteger(0);


    public void incrementValue() {

        value.incrementAndGet();

    }


    public int getValue() {

        return value.get();

    }

}

Хотя эта реализация использует объект вместо примитива и отправляет сообщения (например, incrementAndGet()) вместо ++, по своей производительности этот класс почти всегда превосходит предыдущую версию. Иногда приращение скорости незначительно, но ситуации, в которых он бы работал медленнее, практически не встречаются.

Как такое возможно? Современные процессоры поддерживают операцию, которая обычно называется CAS (Compare and Swap). Эта операция является аналогом оптимистичной блокировки из теории баз данных, тогда как синхронизированная версия является аналогом пессимистичной блокировки.

Ключевое слово synchronized всегда устанавливает блокировку, даже если второй поток не пытается обновлять то же значение. Хотя производительность встроенных блокировок улучшается от версии к версии, они по-прежнему обходятся недешево.

Неблокирующая версия изначально предполагает, что ситуация с обновлением одного значения множественными потоками обычно возникает недостаточно часто для возникновения проблем. Вместо этого она эффективно обнаруживает возникновение таких ситуаций и продолжает повторные попытки до тех пор, пока обновление не пройдет успешно. Обнаружение конфликта почти всегда обходится дешевле установления блокировки, даже в ситуациях с умеренной и высокой конкуренцией.

Как VM решает эту задачу? CAS является атомарной операцией. На логическом уровне CAS выглядит примерно так:

int variableBeingSet;


void simulateNonBlockingSet(int newValue) {

    int currentValue;

    do {

        currentValue = variableBeingSet

    } while(currentValue != compareAndSwap(currentValue, newValue));

}


int synchronized compareAndSwap(int currentValue, int newValue) {

    if(variableBeingSet == currentValue) {

        variableBeingSet = newValue;

        return currentValue;

    }

    return variableBeingSet;

}

Когда метод пытается обновить общую переменную, операция CAS проверяет, что изменяемая переменная все еще имеет последнее известное значение. Если условие соблюдается, то переменная изменяется. Если нет, то обновление не выполняется, потому что другой поток успел ему «помешать». Метод, пытавшийся выполнить обновление (с использованием операции CAS), видит, что изменение не состоялось, и делает повторную попытку.

Потоково-небезопасные классы

Некоторые классы в принципе не обладают потоковой безопасностью. Несколько примеров:

• SimpleDateFormat

• Подключения к базам данных.

• Контейнеры из java.util.

• Сервлеты.

Некоторые классы коллекций содержат отдельные потоково-безопасные методы. Однако любая операция, связанная с вызовом более одного метода, потоково-безопасной не является. Например, если вы не хотите заменять уже существующий элемент HashTable, можно было бы написать следующий код:

if(!hashTable.containsKey(someKey)) {

    hashTable.put(someKey, new SomeValue());

}

По отдельности каждый метод потоково-безопасен, однако другой программный поток может добавить значение между вызовами containsKey и put. У проблемы есть несколько решений:

• Установите блокировку HashTable и проследите за тем, чтобы остальные пользователи HashTable делали то же самое (клиентская блокировка):

synchronized(map) {

if(!map.conainsKey(key))

    map.put(key,value);

• Инкапсулируйте HashTable в собственном объекте и используйте другой API (серверная блокировка с применением паттерна АДАПТЕР):

public class WrappedHashtable {

    private Map map = new Hashtable();


    public synchronized void putIfAbsent(K key, V value) {

        if (map.containsKey(key))

            map.put(key, value);

    }

}

• Используйте потоково-безопасные коллекции:

ConcurrentHashMap map = new ConcurrentHashMap

String>();

map.putIfAbsent(key, value);

Для выполнения подобных операций в коллекциях пакета java.util.concurrent предусмотрены такие методы, как putIfAbsent().

Зависимости между методами могут нарушить работу многопоточного кода

Тривиальный пример введения зависимостей между методами:

public class IntegerIterator implements Iterator

    private Integer nextValue = 0;


    public synchronized boolean hasNext() {

        return nextValue < 100000;

    }

    public synchronized Integer next() {

        if (nextValue == 100000)

            throw new IteratorPastEndException();

        return nextValue++;

    }

    public synchronized Integer getNextValue() {

        return nextValue;

    }

}

Код, использующий IntegerIterator:

IntegerIterator iterator = new IntegerIterator();

while(iterator.hasNext()) {

    int nextValue = iterator.next();

    // Действия с nextValue

}

Если этот код выполняется одним потоком, проблем не будет. Но что произойдет, если два потока попытаются одновременно использовать общий экземпляр IngeterIterator в предположении, что каждый поток будет обрабатывать полученные значения, но каждый элемент списка обрабатывается только один раз? В большинстве случаев ничего плохого не произойдет; потоки будут совместно обращаться к списку, обрабатывая элементы, полученные от итератора, и завершат работу при завершении перебора. Но существует небольшая вероятность того, что в конце итерации два потока помешают работе друг друга, один поток выйдет за конечную позицию итератора, и произойдет исключение.

Проблема заключается в следующем: поток 1 проверяет наличие следующего элемента методом hasNext(), который возвращает true. Поток 1 вытесняется потоком 2; последний выдает тот же запрос, и получает тот же ответ true. Поток 2 вызывает метод next(), который возвращает значение, но с побочным эффектом: после него вызов hasNext() возвращает false. Поток 1 продолжает работу. Полагая, что hasNext() до сих пор возвращает true, он вызывает next(). Хотя каждый из отдельных методов синхронизирован, клиент использовал два метода.

Проблемы такого рода очень часто встречаются в многопоточном коде. В нашей конкретной ситуации проблема особенно нетривиальна, потому что она приводит к сбою только при завершающей итерации. Если передача управления между потоками произойдет в строго определенной последовательности, то один из потоков сможет выйти за конечную позицию итератора. Подобные ошибки часто проявляются уже после того, как система пойдет в эксплуатацию, и обнаружить их весьма нелегко.

У вас три варианта:

• Перенести сбои.

• Решить проблему, внося изменения на стороне клиента (клиентская блокировка).

• Решить проблему, внося изменения на стороне сервера, что приводит к дополнительному изменению клиента (серверная блокировка).

Перенесение сбоев

Иногда все удается устроить так, что сбой не приносит вреда. Например, клиент может перехватить исключение и выполнить необходимые действия для восстановления. Откровенно говоря, такое решение выглядит неуклюже. Оно напоминает полуночные перезагрузки, исправляющие последствия утечки памяти.

Клиентская блокировка

Чтобы класс IntegerIterator корректно работал в многопоточных условиях, измените приведенного выше клиента (а также всех остальных клиентов) следующим образом:

IntegerIterator iterator = new IntegerIterator();


    while (true) {

      int nextValue;

      synchronized (iterator) {

        if (!iterator.hasNext())

          break;

        nextValue = iterator.next();

      }

      doSometingWith(nextValue);

    }

Каждый клиент устанавливает блокировку при помощи ключевого слова synchronized. Дублирование нарушает принцип DRY, но оно может оказаться необходимым, если в коде используются инструменты сторонних разработчиков, не обладающие потоковой безопасностью.

Данная стратегия сопряжена с определенным риском. Все программисты, использующие сервер, должны помнить об установлении блокировки перед использованием и ее снятии после использования. Много (очень много!) лет назад я работал над системой, в которой использовалась клиентская блокировка общего ресурса. Ресурс использовался в сотне разных мест по всей кодовой базе. Один несчастный программист забыл установить блокировку в одном из таких мест.

Это была многотерминальная система с разделением времени, на которой выполнялись бухгалтерские программы профсоюза транспортных перевозок Local 705. Компьютер находился в зале с фальшполом и кондиционером за 50 миль к северу от управления Local 705. В управлении десятки операторов вводили данные на терминалах. Терминалы были подключены к компьютеру по выделенным телефонным линиям с полудуплексными модемами на скорости 600 бит/с (это было очень, очень давно).

Примерно раз в день один из терминалов «зависал». Никакие закономерности в сбоях не прослеживались. Зависания не были привязаны ни к конкретным терминалам, ни к конкретному времени. Все выглядело так, словно время зависания и терминал выбирались броском кубика. Иногда целые дни проходили без зависаний.

Поначалу проблема решалась только перезагрузкой, но перезагрузки было трудно координировать. Нам приходилось звонить в управление и просить всех операторов завершить текущую работу на всех терминалах. После этого мы могли отключить питание и перезагрузить систему. Если кто-то выполнял важную работу, занимавшую час или два, зависший терминал попросту простаивал.

Через несколько недель отладки мы обнаружили, что причиной зависания была десинхронизация между счетчиком кольцевого буфера и его указателем. Буфер управлял выводом на терминал. Значение указателя говорило о том, что буфер был пуст, а счетчик утверждал, что буфер полон. Так как буфер был пуст, выводить было нечего; но так как одновременно он был полон, в экранный буфер нельзя было ничего добавить для вывода на экран.

Итак, мы знали, почему зависали терминалы, но было неясно, из-за чего возникает десинхронизация кольцевого буфера. Поэтому мы реализовали обходное решение. Программный код мог прочитать состояние тумблеров на передней панели компьютера (это было очень, очень, очень давно). Мы написали небольшую функцию, которая обнаруживала переключение одного из тумблеров и искала кольцевой буфер, одновременно пустой и заполненный. Обнаружив такой буфер, функция сбрасывала его в пустое состояние. Voila! Зависший терминал снова начинал выводить информацию.

Теперь нам не приходилось перезагружать систему при зависании терминала. Когда нам звонили из управления и сообщали о зависании, мы шли в машинный зал и переключали тумблер.

Конечно, иногда управление работало по выходным, а мы – нет. Поэтому в планировщик была включена функция, которая раз в минуту проверяла состояние всех кольцевых буферов и сбрасывала те из них, которые были одновременно пустыми и заполненными. Зависший терминал начинал работать даже до того, как операторы успевали подойти к телефону.

Прошло несколько недель кропотливого просеивания монолитного ассемблерного кода, прежде чем была обнаружена причина. Мы занялись вычислениями и определили, что частота зависаний статистически соответствует одному незащищенному использованию кольцевого буфера. Оставалось только найти это одно использование. К сожалению, все это было очень давно. В те времена у нас не было функций поиска, перекрестных ссылок или других средств автоматизации. Нам просто приходилось просматривать листинги.

Тогда, холодной зимой 1971 года в Чикаго, я узнал важный урок. Клиентская блокировка — полный отстой.

Серверная блокировка

Дублирование можно устранить внесением следующих изменений в IntegerIterator:

public class IntegerIteratorServerLocked {

    private Integer nextValue = 0;

    public synchronized Integer getNextOrNull() {

        if (nextValue < 100000)

            return nextValue++;

        else

            return null;

    }

}

В клиентском коде также вносятся изменения:

while (true) {

    Integer nextValue = iterator.getNextOrNull();

    if (next == null)

        break;

    // Действия с nextValue

}

В этом случае мы изменяем API своего класса, чтобы он обладал многопоточной поддержкой[80]. Вместо проверки hasNext() клиент должен выполнить проверку null.

В общем случае серверная блокировка предпочтительна по следующим причинам:

• Она сокращает дублирование кода – клиентская блокировка заставляет каждого клиента устанавливать соответствующую блокировку сервера. Если код блокировки размещается на сервере, клиенты могут использовать объект, не беспокоясь о написании дополнительного кода блокировки.

• Она обеспечивает более высокую производительность – в случае однопоточного развертывания потоково-безопасный сервер можно заменить потоково-небезопасным, устраняя все дополнительные затраты.

• Она снижает вероятность ошибок – в случае клиентской блокировки достаточно всего одному программисту забыть установить блокировку, и работа системы будет нарушена.

• Она определяет единую политику использования – политика сосредоточена в одном месте (на сервере), а не во множестве разных мест (то есть у каждого клиента).

Она сокращает область видимости общих переменных — клиент не знает ни о переменных, ни о том, как они блокируются. Все подробности скрыты на стороне сервера. Если что-то сломается, то количество мест, в которых следует искать причину, сокращается.

Что делать, если серверный код вам неподконтролен?

• Используйте паттерн АДАПТЕР, чтобы изменить API и добавить блокировку:

public class ThreadSafeIntegerIterator {

    private IntegerIterator iterator = new IntegerIterator();

    public synchronized Integer getNextOrNull() {

        if(iterator.hasNext())

            return iterator.next();

        return null;

    }

}

• ИЛИ еще лучше – используйте потоково-безопасные коллекции с расширенными интерфейсами.

Повышение производительности