Структура передачи последнего стейта потоку

2019.10.29

Intro и постановка задачи

Возникла у меня задача передавать данные между потоками со следующими условиями:

  • Любой пакет не зависит от предыдущего.
  • Новый пакет отменяет значение предыдущего.
  • Пакет может быть сформирован лишь в определенный момент времени (не по запросу потребителя).

Другими словами есть некоторый снепшот состояния подсистемы, который уникален для каждого отрезка времени. Другой подсистеме нужно получать последнее доступное состояние (в любой момент времени). Решение получилось достаточно интересным, чем и решил поделиться с общественностью.

Реализация

Основная проблема в том, что “производитель” не может быть приостановлен или опрошен (у него просто нет механизмов для коммуникации типа event-loop). Усугубляется дело тем, что пакеты он генерирует с очень высокой частотой (порядка сотни в секунду).

“Потребитель” работает в N раз медленнее, но, при необходимости, должен получать последнее состояние и обработать его (вычитка, преобразование, сжатие) т.е. обработка полученных данных может занять больше времени, чем их генерация “производителем”.

Для решения потребовалось 3 буфера, один std::atomic<Buffer*> (SyncPtr) и две операции std::atomic_exchange.

Сценарий работы следующий:

  1. SyncPtr - доступен всем потокам (двум, в данном случае).
  2. “Производитель” берет несинхронизированный указатель на буфер (специально для него) и пишет туда данные.
  3. Атомарно обменивает свой (сырой указатель) с SyncPtr, делая его видимым для “потребителя” и скрывая текущий видимый буфер. И отправляется считать следующий пакет (или по любым его личным делам).
  4. В потоке “получателя” запрашивается текущий видимый буфер путем атомарного обмена указателей со своим буфером т.е. он выводит видимый буфер из обращения.
  5. Обрабатывает его сколько его душе угодно.

Пункты 2-3 - цикл первого потока, 4-5 - второго.

Теперь тоже самое в коде:

class buffer {
    std::array<data, 3> reserved;
    data* reader_ = &reserved[0];
    data* writer_ = &reserved[1];
    std::atomic<data*> visible_{&reserved[2]};

public:
    data* writer() noexcept { return writer_; }

    void publish(data* writer_buffer) noexcept {
        assert(writer_buffer == writer_);
        writer_ = std::atomic_exchange(
            &visible_,
            writer_,
            std::memory_order_acq_rel);
    }

    data* reader() noexcept {
        return std::atomic_exchange(
            &visible_,
            reader_,
            std::memory_order_acq_rel);
    }
};


// Поток "производителя"
while (true) {
    const auto result = calculate(...);
    data* b = buffer.writer();
    result.serialize(b);
    buffer.publish(b); // atomic operation
}

// Поток "потребителя"
while (true) {
    data* b = buffer.reader(); // atomic operation
    send(compress(deserialize(b)));
}

В итоге мы имеем всего две точки синхронизации - запрос буфера и его публикация. Причина, почему я пишу все это, в том, что понимание необходимости хранения буфера читателя заняло у меня некоторое время. Я думал хранить nullptr в buffer::visible_ как сигнал, что обновлений нет, но это лишь переусложняло алгоритм (и делало его медленнее). Для определения наличия обновлений (если “производитель” где-то сильно задумался), можно очищать data, если это какой-то контейнер, или явно завести в нем bool new_frame, выставляя его при публикации и сбрасывая, после обработки.

Эпилог

В общем получился достаточно компактный и инкапсулированный в класс wait-free алгоритм. Возможно, я открыл Америку, но я не нашел в интернете ничего похожего (возможно от незнания ключевого слова для такой структуры или в силу ее тривиальности). Надеюсь, кто-нибудь найдет эту статью полезной.