Lock-free очередь

SOFT FOR YOU
Дата: 10.11.2017 20:26:43
Всем привет
Я тут в разделе Delphi пытался совместными усилиями покопать мою реализацию lock-free очереди. Она сбоит, но не понятно где. Разные эксперименты к результату не привели. Поэтому я решил обратиться к более широкой аудитории.

Большинство "ответчиков" не пытались вникнуть в ситуацию и понять суть алгоритма. К примеру, один из форумчан упрекал меня в том, что код содержит AtomicExchange, а не цикл с AtomicCmpExchange.

Я же пробовал много разных подходов, но на тестах алгоритм иногда давал сбой
Критерий нахождения ошибки простой: нужно описать ситуацию (сценарий) при котором алгоритм сработает не корректно
Я всю голову сломал, придумать такой вариант не смог, может быть у вас получится

Суть в следующем:
1) Очередь представляет собой односвязный список от Head к Tail
2) Очередь пуста если Tail не указывает ни на что
3) Поток, изменивший Tail, важнее того, что модифицирует Head
4) Список может быть временно разорван (Next = nil, ещё не инициализирован) - в этом случае просто ждём
5) TSyncPointer - это "tagged pointer", т.е. указатель, в котором меняется счётчик при Atomic-операциях

Отвечу на любые вопросы. Надеюсь на помощь.
На всякий случай - вот репозиторий.
+
  TSyncQueue<T> = class(TSyncObject, ISyncQueue<T>)
  private
    FHead: TSyncPointer;
    FTail: TSyncPointer;

    type
      TNode = TAllocatorNode<Pointer,TNothing,T>;
      PNode = ^TNode;

    function GetEmpty: Boolean; inline;
  public
    constructor Create;
    destructor Destroy; override;
    procedure Clear;

    function Enqueue(const Value: T): Boolean;
    function Dequeue(var Value: T): Boolean;

    property Empty: Boolean read GetEmpty;
  end;

function TSyncQueue<T>.Enqueue(const Value: T): Boolean;
var
  Node: PNode;
  Tail: TSyncPointer;
begin
  // новый узел
  Node := TSyncAllocator<Pointer,TNothing,T>.New;
  Node.Value := Value;
  Node.Next := nil;

  // подменяем FTail на узел
  Tail := FTail.AtomicExchange(Node);
  // что если значение, которые было по FTail - уже не корректное?

  // инициализируем Next предущего узла или сам FHead
  if (Tail.Empty) then
  begin
    FHead.Value := Node; //FHead.Fill(Node);
    // что если Node портит какую-то малину?
    // что если не пересчёт инкремента портит чью-то логику?
  end else
  begin
    PNode(Tail.Value).Next := Node;
  end;
end;

function TSyncQueue<T>.Dequeue(var Value: T): Boolean;
var
  Head, Tail: TSyncPointer;
  Next: Pointer;
begin
  repeat
    Tail := FTail.Copy;
    Head := FHead.Copy;

    if (Head.Empty) then
    begin
      // очередь пустая если Tail пустой - выходим
      // может быть задан Tail, а Head ещё не проинициализирован корректным значением
      if (Tail.Empty) then
        Exit(False);

      // что если на момент Head равен Empty, Tail.Empty, а FTail не Empty?
    end else
    begin
      // заменяем FHead на Next, при необходимости заменяем FTail
      // Next пустой если ещё не инициализирован (ждём) или последний (обнуляем FTail)
      Next := PNode(Head.Value).Next;
      // что если Head.Value и в Next лежит мусорный nil???
      if (Next = nil) then
      begin     // что если Head.Value невалидный, но равен Tail.Value?
        if (Head.Value = Tail.Value) then
        begin
          // извлекаем единственный элемент - обнуляем FTail
          // и по возможности обнуляем FHead
          if (FTail.AtomicCmpExchange(nil, Tail)) then
          begin
            FHead.AtomicCmpExchange(nil, Head);
            Break;
          end;
        end else
        begin
          // Next не инициализирован
          // просто ждём
        end;
      end else
      begin
        // Next содержит валидный указатель на следующий узел - кладём его в FHead
        // что если здесь происходит ABA из-за некорректного счётчика в Head?
        if (FHead.AtomicCmpExchange(Next, Head)) then
          Break;
      end;
    end;
  until (False);

  // результат
  Value := PNode(Head.Value).Value;
  TSyncAllocator<Pointer,TNothing,T>.Dispose(Head.Value);
  Result := True;
end;
SOFT FOR YOU
Дата: 10.11.2017 20:28:18
Сори, вот код с родной разметкой

+
  TSyncQueue<T> = class(TSyncObject, ISyncQueue<T>)
  private
    FHead: TSyncPointer;
    FTail: TSyncPointer;

    type
      TNode = TAllocatorNode<Pointer,TNothing,T>;
      PNode = ^TNode;

    function GetEmpty: Boolean; inline;
  public
    constructor Create;
    destructor Destroy; override;
    procedure Clear;

    function Enqueue(const Value: T): Boolean;
    function Dequeue(var Value: T): Boolean;

    property Empty: Boolean read GetEmpty;
  end;

function TSyncQueue<T>.Enqueue(const Value: T): Boolean;
var
  Node: PNode;
  Tail: TSyncPointer;
begin
  // новый узел
  Node := TSyncAllocator<Pointer,TNothing,T>.New;
  Node.Value := Value;
  Node.Next := nil;

  // подменяем FTail на узел
  Tail := FTail.AtomicExchange(Node);
  // что если значение, которые было по FTail - уже не корректное?

  // инициализируем Next предущего узла или сам FHead
  if (Tail.Empty) then
  begin
    FHead.Value := Node; //FHead.Fill(Node);
    // что если Node портит какую-то малину?
    // что если не пересчёт инкремента портит чью-то логику?
  end else
  begin
    PNode(Tail.Value).Next := Node;
  end;
end;

function TSyncQueue<T>.Dequeue(var Value: T): Boolean;
var
  Head, Tail: TSyncPointer;
  Next: Pointer;
begin
  repeat
    Tail := FTail.Copy;
    Head := FHead.Copy;

    if (Head.Empty) then
    begin
      // очередь пустая если Tail пустой - выходим
      // может быть задан Tail, а Head ещё не проинициализирован корректным значением
      if (Tail.Empty) then
        Exit(False);

      // что если на момент Head равен Empty, Tail.Empty, а FTail не Empty?
    end else
    begin
      // заменяем FHead на Next, при необходимости заменяем FTail
      // Next пустой если ещё не инициализирован (ждём) или последний (обнуляем FTail)
      Next := PNode(Head.Value).Next;
      // что если Head.Value и в Next лежит мусорный nil???
      if (Next = nil) then
      begin     // что если Head.Value невалидный, но равен Tail.Value?
        if (Head.Value = Tail.Value) then
        begin
          // извлекаем единственный элемент - обнуляем FTail
          // и по возможности обнуляем FHead
          if (FTail.AtomicCmpExchange(nil, Tail)) then
          begin
            FHead.AtomicCmpExchange(nil, Head);
            Break;
          end;
        end else
        begin
          // Next не инициализирован
          // просто ждём
        end;
      end else
      begin
        // Next содержит валидный указатель на следующий узел - кладём его в FHead
        // что если здесь происходит ABA из-за некорректного счётчика в Head?
        if (FHead.AtomicCmpExchange(Next, Head)) then
          Break;
      end;
    end;
  until (False);

  // результат
  Value := PNode(Head.Value).Value;
  TSyncAllocator<Pointer,TNothing,T>.Dispose(Head.Value);
  Result := True;
end;
mikron
Дата: 10.11.2017 20:47:17
SOFT FOR YOU,

Опиши ошибку. Что значит - сбой?
Dima T
Дата: 10.11.2017 20:55:30
Ты взялся за очень сложную задачу. Ее либо решают, либо не решают. Помощи просить бесполезно.

Почитай посты этого чела, он достаточно подробно расписывает почему не стоит пытаться писать свое lock-free. Он написал, но также описывает все встреченные им грабли, по которым пришлось пройти.
mikron
Дата: 10.11.2017 20:59:53
Dima T,

Интересно, кто их тогда пишет, инопланетяне?
mikron
Дата: 10.11.2017 21:23:42
SOFT FOR YOU,

PNode(Tail.Value).Next := Node;
Эта операция атомарна?
SOFT FOR YOU
Дата: 10.11.2017 21:59:13
mikron
SOFT FOR YOU,

Опиши ошибку. Что значит - сбой?


Есть тестовое приложение. Там два пишущих потока и два читающих.
Числа (вроде бы 10 млн) последовательно закидываются в очередь и тут же читаются
А потом происходит анализ прочитанного.
Сбой происходит если одно число извлечено несколько раз или наоборот ни разу

Есть второй тест
Там 5 потоков, каждый делает одно и то же: сначала записывает число, потом тут же его извлекает
В этом тесте сбой - когда запись произошла, а прочитать не удалось (очередь считается пустой)


mikron
SOFT FOR YOU,

PNode(Tail.Value).Next := Node;
Эта операция атомарна?


Да, Next это обычный указатель
Когда узел кладётся в FTail - его Next равен nil
Когда кладётся ещё один узел - тогда пишется Next предыдущего

Здесь идёт обычное присвоение [volatile] указателя
mayton
Дата: 10.11.2017 23:53:31
SOFT FOR YOU, я "ответчик" который сходу, нечитал то что ты написал.

Но вброшу несколько поинтов.

1) Dephi-разработка в большинстве случаев ориентирована на Windows-среду исполнения. Я готов принять тот факт что
существуют всякие там Лазарусы но все таки основной сегмент потребления это OS от Microsoft. Поэтому нас наверное
будет интересовать тюнинг этой структуры данных под Windows и ее "родной" API. Если вопрос тюнинга для нас не стоит
то тогда не нужен и lock-free и прочее.

2) Совершенно нет никакого смысла делать "классический связный" список. Нам достаточно массива.
Завёрнутого в "бублик". Ну и два поинтера которые будут указывать на голову и хвост.
Рискну предположить что это оптимально и экономно по месту и когерентно по кешу.

3) До кучи можно добавить эвристику которая будет "растягивать" бублик как только есть потребность.
Растягивать экспоненциально. И эвристику которая будет "уплотнять" бублик если он давно не использовался.
Над правилами можно подумать. Во времени или в циклах.

Вот как-то так.
SOFT FOR YOU
Дата: 11.11.2017 00:06:25
Atomic-функции кроссплатформенные. Под Windows, Mac, Linux, iOS или Android
Есть реализации очереди в ограниченном массиве. Есть определённые плюсы в такой реализации
Но какой смысл переписывать реализацию, если даже в коде выше мы не можем найти ошибку
mikron
Дата: 11.11.2017 00:23:01
SOFT FOR YOU,

Тесты хорошие. Я не знаком с Делфи и наверняка глупость, но проверь
Tail := FTail.Copy;
Head := FHead.Copy;

Тип у Tail и Head почему не Node?