[пост получился достаточно длинный, поэтому сразу скажу о бонусе — это простой и эффективный алгоритм очереди на основе буфера фиксированного размера — нетерпеливые могут сразу перемещаться к концу поста ]

Последние дни на RSDN шла своего рода распределенная игра по написанию lock-free контейнеров. Вначале vf запостил lock-free стек для .Net:
http://www.rsdn.ru/forum/dotnet/3719883.aspx
Автор: vf
Дата: 27.02.10

Я указал, что в стеке содержится достаточно изощренная ошибка:
http://www.rsdn.ru/forum/dotnet/3721436.1.aspx
Автор: remark
Дата: 01.03.10


Потом Caracrist запостил lock-free очередь для С++:
http://www.rsdn.ru/forum/src/3725942.1.aspx
Автор: Caracrist
Дата: 05.03.10

Я указал на ряд ошибок:
http://www.rsdn.ru/forum/src/3726327.1.aspx
Автор: remark
Дата: 05.03.10


Потом дали ссылку на блог некого товарища из Яндекса с описанием lock-free стека для С++:
http://users.livejournal.com/_foreseer/34284.html
Я тоже указал на ошибку:
http://www.rsdn.ru/forum/dotnet/3721636.1.aspx
Автор: remark
Дата: 01.03.10


К чему я это? Что не надо писать lock-free контейнеры? Это смотрите сами, мне всё равно. Это пусть говорят другие, те которые сами не умеют писать lock-free алгоритмы В целом lock-free алгоритмы писать вполне реалистично, единственное надо немного сноровки, ревью коллег, и соответствующие инструменты.
Кстати, по поводу инструментов, тут я не могу не упомянуть в качестве рекламы свой Relacy Race Detector. Если последние 2 ошибки удалось найти при просмотре кода, то первую смог вскрыть только Relacy Race Detector (кстати, если кто следит за такими тулзами, то CHESS от Microsoft не смог её найти — ограничение планировщика потоков).

Я должен отметить, что Caracrist в итоге запостил корректный алгоритм очереди, ну по-крайней мере я не вижу ошибок:
http://www.rsdn.ru/forum/src/3727286.1.aspx
Автор: Caracrist
Дата: 07.03.10

Однако алгоритм получился достаточно сложный — 6 независимых переменных состояния, плюс достаточно не эффективный — 6 Interlocked инструкций на операцию. Для сравнения аналогичный алгоритм с применением спин-мьютекса будет исполнять по 1 Interlocked инструкций на операцию, и соотв. Будет в 6 раз быстрее в случае низкой конкуренции.

В чём сложность создания таких алгоритмов? В том, что последовательность атомарных операций сама по себе не является атомарной операций, это по-прежнему разрозненная последовательность (в отличие от алгоритмов, основанных на мьютексах). Наибольшую сложность представляют промежуточные состояния структуры данных, которые возникают в процессе выполнения последовательности. Каждое такое состояние должно быть полностью целостным. Целостным в том плане, что если придёт другой поток, то он должен посмотреть на структуру; определить, что это за состояние; понять, что ему делать дальше; и при этом не порушить планы первого потока. Даже хуже — прерываний потоков может быть несколько, т.е. первый поток перевёл структуру в какое промежуточное состояние; потом пришёл второй поток, и тоже выполнил часть операции; потом пришёл третий поток, выполнил ещё несколько незаконченных действий, и т.д. И после всего этого структура всё ещё должна оставаться в понятном консистентном состоянии.
В идеале поток целиком выполняет операцию за одну атомарную операцию, т.е. считывает состояние структуры, проверяет его, вычисляет новое состояние, и пытается атомарно перевести структуру из исходного состояния в новое с помощью CAS (InterlockedCompareExchange). Так работает известный алгоритм lock-free стека:
http://www.rsdn.ru/forum/dotnet/3721740.1.aspx
Автор: remark
Дата: 01.03.10

В таком алгоритме нет промежуточных (читай — потенциально проблематичных) состояний, структура либо не модифицирована вообще, либо операция полностью выполнена.
Однако это применимо только для простейших случаев, в более сложных случаях невозможно выполнить операцию за одну атомарную операцию — как в случае с очередью на основе фиксированного буфера, тут надо и записать данные и сместить позицию записи.

Кстати, к вопросу о сложных операциях с множеством промежуточных состояний, есть такая интересная презентация Cliff Click (сейчас он работает в Azul Systems, ранее в Sun – и там и там он был ключевой фигурой в разработке JVM) о реализации полностью lock-free хэш-мап (код доступен в сети):
http://www.azulsystems.com/events/javaone_2007/2007_LockFreeHash.pdf
Представьте себе хэш-мап, в котором несколько потоков одновременно добавляют один и тот же элемент, несколько пытаются его считать, несколько удалить, и плюс одновременно с этим происходит увеличение размера таблицы. Каково? Клиф подошёл к вопросу фундаментально — он нарисовал в явном виде стейт-машину для ячейки таблицы; для каждого состояния описал как его можно определить (в смысле понять, что ячейка сейчас находится именно в этом состоянии); для каждой пары (состояние, тип операции (чтение, добавление, удаление)) описал, что потоку делать дальше; и потом реализовал все переходы между состояниями с помощью атомарных CAS.
Это если поверхностно, в реальности там много деталей и есть некоторые негативные моменты, которые он сам же внёс дабы реализовать хэш полностью без блокировок. Например, элементы никогда полностью не удаляются из таблицы, они просто помечаются как удалённые. Как следствие, даже если количество элементов в таблице остаётся постоянным, но происходят вставки и удаления элементов, ему приходится делать периодические фиктивные ресайзы таблицы, что бы «подчистить мусор». Ещё там могут быть проблематичными рекурсивные ресайзы таблицы... ну ладно, это я уже удаляюсь от темы.

Теперь обещанный бонус — алгоритм bounded multi-producer/multi-consumer очереди без блокировок. Контейнер не использует динамического выделения/управления памятью в процессе работы (за исключением изначального выделения фиксированного буфера).
Каждая операция содержит всего по 1 Interlocked инструкции, и как следствие будет достаточно быстрой.
Тут ситуация существенно попроще, чем у Клифа с его таблицей (и это к лучшему). Каждая операция (чтение/запись) состоит из 2 атомарных действий. (1) поток проверяет и резервирует элемент для чтения/записи, и (2) после выполнения фактического чтения/записи, помечает элемент как доступный для следующей операции (записи/чтения соответственно). Тут есть только одно промежуточное состояние — между (1) и (2) — которое должным образом обрабатывается.
Далее собственно код с комментариями:
template<typename T>
class mpmc_bounded_queue
{
public:
    mpmc_bounded_queue(size_t buffer_size)
        : buffer_(new cell_t [buffer_size])
        , buffer_mask_(buffer_size - 1)
    {
        typedef char assert_nothrow [__has_nothrow_assign(T) || __has_trivial_assign(T) || !__is_class(T) ? 1 : -1];
        assert((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0));
        for (size_t i = 0; i != buffer_size; i += 1)
            buffer_[i].sequence_.store(i, std::memory_order_relaxed);
        enqueue_pos_.store(0, std::memory_order_relaxed);
        dequeue_pos_.store(0, std::memory_order_relaxed);
    }

    ~mpmc_bounded_queue()
    {
        delete [] buffer_;
    }

    bool enqueue(T const& data)
    {
        cell_t* cell;
        // загружаем текущую позицию для добавления в очередь
        size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
        for (;;)
        {
            // находим текущий элемент
            cell = &buffer_[pos & buffer_mask_];
            // загружаем статус (sequence) текущего элемента
            size_t seq = cell->sequence_.load(std::memory_order_acquire);
            intptr_t dif = (intptr_t)seq - (intptr_t)pos;
            // элемент готов для записи
            if (dif == 0)
            {
                // пытаемся сдвинуть позицию для добавления
                if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
                    break;
                // если не получилось, то начинаем сначала
            }
            // элемент ещё не готов для записи (очередь полна или типа того)
            else if (dif < 0)
                return false;
            // нас кто-то опередил
            // перезагружаем текущий элемент и начинаем сначала
            else /* if (dif > 0) */
                pos = enqueue_pos_.load(std::memory_order_relaxed);
        }

        // в данной точке мы зарезервировали элемент для записи

        // пишем данные
        cell->data_ = data;
        // помечаем элемент как готовый для потребления
        cell->sequence_.store(pos + 1, std::memory_order_release);

        return true;
    }

    bool dequeue(T& data)
    {
        cell_t* cell;
        // загружаем текущую позицию для извлечения из очереди
        size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
        for (;;)
        {
            // находим текущий элемент
            cell = &buffer_[pos & buffer_mask_];
            // загружаем статус (sequence) текущего элемента
            size_t seq = cell->sequence_.load(std::memory_order_acquire);
            intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
            // элемент готов для извлечения
            if (dif == 0)
            {
                // пытаемся сдвинуть позицию для извлечения
                if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
                    break;
                // если не получилось, то начинаем сначала
            }
            // элемент ещё не готов для потребления (очередь пуста или типа того)
            else if (dif < 0)
                return false;
            // нас кто-то опередил
            // перезагружаем текущий элемент и начинаем сначала
            else /* if (dif > 0) */
                pos = dequeue_pos_.load(std::memory_order_relaxed);
        }

        // в данной точке мы зарезервировали элемент для чтения

        // читаем данные
        data = cell->data_;
        // помечаем элемент как готовый для следующей записи
        cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release);

        return true;
    }

private:
    struct cell_t
    {
        std::atomic<size_t>     sequence_;
        T                       data_;
    };

    static size_t const         cacheline_size = 64;
    typedef char                cacheline_pad_t [cacheline_size];

    cacheline_pad_t             pad0_;
    cell_t* const               buffer_;
    size_t const                buffer_mask_;
    cacheline_pad_t             pad1_;
    std::atomic<size_t>         enqueue_pos_;
    cacheline_pad_t             pad2_;
    std::atomic<size_t>         dequeue_pos_;
    cacheline_pad_t             pad3_;

    mpmc_bounded_queue(mpmc_bounded_queue const&);
    void operator = (mpmc_bounded_queue const&);
};


И вот реализация подмножества C++0x std::atomic, необходимого для компиляции очереди (MSVC, x86-32):
#include <intrin.h>

enum memory_order
{
    memory_order_relaxed,
    memory_order_consume,
    memory_order_acquire,
    memory_order_release,
    memory_order_acq_rel,
    memory_order_seq_cst,
};

class atomic_uint
{
public:
    unsigned load(memory_order mo) const volatile
    {
        (void)mo;
        assert(mo == memory_order_relaxed
            || mo == memory_order_consume
            || mo == memory_order_acquire
            || mo == memory_order_seq_cst);
        unsigned v = val_;
        _ReadWriteBarrier();
        return v;
    }

    void store(unsigned v, memory_order mo) volatile
    {
        assert(mo == memory_order_relaxed
            || mo == memory_order_release
            || mo == memory_order_seq_cst);

        if (mo == memory_order_seq_cst)
        {
            _InterlockedExchange((long volatile*)&val_, (long)v);
        }
        else
        {
            val_ = v;
            _ReadWriteBarrier();
        }
    }

    bool compare_exchange_weak(unsigned& cmp, unsigned xchg, memory_order mo) volatile
    {
        unsigned prev = (unsigned)_InterlockedCompareExchange((long volatile*)&val_, (long)xchg, (long)cmp);
        if (prev == cmp)
            return true;
        cmp = prev;
        return false;
    }

private:
    unsigned volatile           val_;
};

template<typename T>
class atomic;

template<>
class atomic<unsigned> : public atomic_uint
{
};


Автор: remark    Оценить