Всем привет
Я тут в разделе
Delphi пытался совместными усилиями покопать мою реализацию lock-free очереди. Она сбоит, но не понятно где. Разные эксперименты к результату не привели. Поэтому я решил обратиться к более широкой аудитории.
Большинство "ответчиков" не пытались вникнуть в ситуацию и понять суть алгоритма. К примеру, один из форумчан упрекал меня в том, что код содержит AtomicExchange, а не цикл с AtomicCmpExchange.
Я же пробовал много разных подходов, но на тестах алгоритм иногда давал сбой
Критерий нахождения ошибки простой: нужно описать ситуацию (сценарий) при котором алгоритм сработает не корректно
Я всю голову сломал, придумать такой вариант не смог, может быть у вас получится
Суть в следующем:
1) Очередь представляет собой односвязный список от Head к Tail
2) Очередь пуста если Tail не указывает ни на что
3) Поток, изменивший Tail, важнее того, что модифицирует Head
4) Список может быть временно разорван (Next = nil, ещё не инициализирован) - в этом случае просто ждём
5) TSyncPointer - это "tagged pointer", т.е. указатель, в котором меняется счётчик при Atomic-операциях
Отвечу на любые вопросы. Надеюсь на помощь.
На всякий случай - вот
репозиторий.
+ |
TSyncQueue<T> = class(TSyncObject, ISyncQueue<T>)
private
FHead: TSyncPointer;
FTail: TSyncPointer;
type
TNode = TAllocatorNode<Pointer,TNothing,T>;
PNode = ^TNode;
function GetEmpty: Boolean; inline;
public
constructor Create;
destructor Destroy; override;
procedure Clear;
function Enqueue(const Value: T): Boolean;
function Dequeue(var Value: T): Boolean;
property Empty: Boolean read GetEmpty;
end;
function TSyncQueue<T>.Enqueue(const Value: T): Boolean;
var
Node: PNode;
Tail: TSyncPointer;
begin
// новый узел
Node := TSyncAllocator<Pointer,TNothing,T>.New;
Node.Value := Value;
Node.Next := nil;
// подменяем FTail на узел
Tail := FTail.AtomicExchange(Node);
// что если значение, которые было по FTail - уже не корректное?
// инициализируем Next предущего узла или сам FHead
if (Tail.Empty) then
begin
FHead.Value := Node; //FHead.Fill(Node);
// что если Node портит какую-то малину?
// что если не пересчёт инкремента портит чью-то логику?
end else
begin
PNode(Tail.Value).Next := Node;
end;
end;
function TSyncQueue<T>.Dequeue(var Value: T): Boolean;
var
Head, Tail: TSyncPointer;
Next: Pointer;
begin
repeat
Tail := FTail.Copy;
Head := FHead.Copy;
if (Head.Empty) then
begin
// очередь пустая если Tail пустой - выходим
// может быть задан Tail, а Head ещё не проинициализирован корректным значением
if (Tail.Empty) then
Exit(False);
// что если на момент Head равен Empty, Tail.Empty, а FTail не Empty?
end else
begin
// заменяем FHead на Next, при необходимости заменяем FTail
// Next пустой если ещё не инициализирован (ждём) или последний (обнуляем FTail)
Next := PNode(Head.Value).Next;
// что если Head.Value и в Next лежит мусорный nil???
if (Next = nil) then
begin // что если Head.Value невалидный, но равен Tail.Value?
if (Head.Value = Tail.Value) then
begin
// извлекаем единственный элемент - обнуляем FTail
// и по возможности обнуляем FHead
if (FTail.AtomicCmpExchange(nil, Tail)) then
begin
FHead.AtomicCmpExchange(nil, Head);
Break;
end;
end else
begin
// Next не инициализирован
// просто ждём
end;
end else
begin
// Next содержит валидный указатель на следующий узел - кладём его в FHead
// что если здесь происходит ABA из-за некорректного счётчика в Head?
if (FHead.AtomicCmpExchange(Next, Head)) then
Break;
end;
end;
until (False);
// результат
Value := PNode(Head.Value).Value;
TSyncAllocator<Pointer,TNothing,T>.Dispose(Head.Value);
Result := True;
end;
|