- Mastering C# Concurrency
- Eugene Agafonov Andrew Koryavchenko
- 2633字
- 2021-07-09 21:26:07
Writing lock-free code
Since we have a very limited number of atomic operations, it is very hard to write lock-free code. For some common data structures, such as a double linked list, there is no lock-free implementation. Besides, it is very easy to make a mistake, and the main problem is that such code could work fine 99.9 percent of the time, which makes debugging enormously confusing.
Therefore, the best practice is to use standard implementations of such algorithms. A good place to start is by using concurrent collections from the System.Collections.Concurrent
namespace that was introduced in the .NET Framework 4.0. We will review them in detail in Chapter 6, Using Concurrent Data Structures. However, now we will try to do not as advised and implement a lock-free stack and a lock-free queue from scratch.
The cornerstone of the lock-free code is the following pattern: read some data from the shared state, calculate a new value, and then write the new value back, but only if the shared state wasn't mutated by any other thread by that time. The last check and write operation must be atomic, and this is what we use Interlocked.CompareExchange
for. This description looks a bit confusing, but it can be illustrated with quite an easy example. Imagine multiple threads calculating an integer sum in parallel. Consider the following line of code, for example:
_total += current;
If we use this simple code, we would get race condition here since this operation is not atomic. The easiest way to fix this is by using atomic addition with the Interlocked.Add
method, but to illustrate the CompareExchange
method logic, let's implement the addition like this:
int beforeValue, newValue; do { beforeValue = _total; newValue = beforeValue + current; } while (beforeValue != Interlocked.CompareExchange(ref _total, newValue, beforeValue))
First, we save the _total
value in the beforeValue
temporary variable. Then, we calculate a new value and store it in newValue
. Finally, we're trying to save newValue
in _total
, but only if _total
remains the same when we started the operation. If not, it means that the _total
value has been changed by another thread and we have to repeat the operation with the new value of _total
.
The ABA problem
Remember when we mentioned that lock-free programming is very complicated? Now, it's time to prove it. Here is another case when a seemingly right concurrent code works absolutely wrong.
Imagine that we have a lock-free stack implementation with the Interlocked.CompareExchange
atomic compare-and-swap (CAS) operation. Let's assume that it contains three items: A on top, B, and C. Thread 1 calls the Pop
method; it sets the old head value as A and the new head value as B. However for some reason, thread 1 gets suspended by the operating system. Meanwhile, thread 2 pops item A from the stack and saves it for later use. Then, it pushes item D on the stack. After doing this, it finally pushes item A back on top of the stack, but this time A's next item is D and our stack contains four items: A on top, D, B, and C.
Now the first thread continues to run. It compares whether the old head value and the current head value are the same, and they are! Therefore, the thread writes value B to the head of the stack. Now, the stack is corrupted and contains two items: B on the top and C.
The described process can be illustrated by the following schema:

So, just having atomic CAS operations is not enough. To make this code work right, it's very important to make sure that we do not reuse references in our code or allow them to escape to our consumers. Thus, when we push item A twice, it should be different from the existing items from the stack perspective. To achieve this, it's enough to allocate a new wrapper object each time something is being pushed onto the stack.
Here is a quote from Wikipedia that describes the ABA problem very well:
Natalie is waiting in her car at a red traffic light with her children. Her children start fighting with each other while waiting, and she leans back to scold them. Once their fighting stops, Natalie checks the light again and notices that it's still red. However, while she was focusing on her children, the light had changed to green, and then back again. Natalie doesn't think the light ever changed, but the people waiting behind her are very mad and honking their horns now.
The lock-free stack
Now, we are ready to implement a lock-free stack data structure. First, we define a base abstract class for our stack implementation:
public abstract class StackBase<T>
Then we have an inner class to define an item on the stack:
private class Item { private readonly T _data; private readonly Item _next; public Item(T data, Item next) { _data = data; _next = next; } public T Data { get { return _data; } } public Item Next { get { return _next; } } }
The item class contains user data and a reference to the next element on the stack. Now, we're adding a stack top item:
private Item _head;
A property that indicates whether the stack is empty is as follows:
public bool IsEmpty { get { return _head == null; } }
Two abstract methods that store and retrieve an item from the stack:
public abstract void Push(T data); public abstract bool TryPop(out T data);
Now we have a base for different stack implementations to compare how they perform. We start with a lock-based stack:
public class LockStack<T> : StackBase<T>
As we remember, the lock statement is translated to the Monitor
class method calls by the C# compiler. The monitor class tries to avoid using OS-level locks and uses spin locks to achieve a performance boost when a lock takes a little time. We're going to illustrate this and create a stack that uses only OS-level locks with the help of the System.Threading.Mutex
class, which uses the mutex synchronization primitive from the OS. We create a mutex instance:
private readonly Mutex _lock = new Mutex();
Then, implement the Push
and Pop
methods as follows:
public override void Push(T data) { _lock.WaitOne(); try { _head = new Item(data, _head); } finally { _lock.ReleaseMutex(); } } public override bool TryPop(out T data) { _lock.WaitOne(); try { if (IsEmpty) { data = null; return false; } data = _head.Data; _head = _head.Next; return true; } finally { _lock.ReleaseMutex(); } }
This implementation puts a thread in a blocked state every time it has to wait for the lock to be released. This is the worst-case scenario, and we're going to see the test results that prove this.
Now we will implement a concurrent stack with a monitor and lock statement:
public class MonitorStack<T> : StackBase<T> where T: class { private readonly object _lock = new object(); public override void Push(T data) { lock (_lock) _head = new Item(data, _head); } public override bool TryPop(out T data) { lock (_lock) { if (IsEmpty) { data = null; return false; } data = _head.Data; _head = _head.Next; return true; } } }
Then it's the lock-free stack implementation's turn:
public class LockFreeStack<T> where T: class
Notice that we had to add class constraint to the generic type parameter. We do this because we cannot atomically exchange values that are more than 8 bytes in size. If we look at the generic version of the Interlocked.CompareExchange
method, we can make sure that its type
parameter has the same class constraint.
Let's get to implementation:
public void Push(T data) { Item item, oldHead; do { oldHead = _head; item = new Item(data, oldHead); } while (oldHead != Interlocked.CompareExchange(ref _head, item, oldHead)); }
This implementation is quite similar to a lock-free addition example. We basically do the same thing, only instead of addition, we're storing a new reference to the stack's head.
The TryPop
method code is slightly more complicated:
public bool TryPop(out T data) { var oldHead = _head; while (!IsEmpty) { if (oldHead == Interlocked.CompareExchange(ref _head, oldHead.Next, oldHead)) { data = oldHead.Data; return true; } oldHead = _head; } data = null; return false; }
Here we have to notice that the stack can be empty; in this case, we return false
to indicate that we failed to retrieve a value from the stack.
Also, we would like to compare our code to the standard ConcurrentStack
implementation from System.Collections.Concurrent
. It is possible to use an interface to work with all collections in the same way, but in this case, it is easier to create a wrapper class that contains the source collection:
public class ConcurrentStackWrapper<T> : StackBase<T> { private readonly ConcurrentStack<T> _stack; public ConcurrentStackWrapper() { _stack = new ConcurrentStack<T>(); } public override void Push(T data) { _stack.Push(data); } public override bool TryPop(out T data) { return _stack.TryPop(out data); } }
The only operation left is to compare the performances of our stack implementations:
private static long Measure(StackBase<string> stack) { var threads = Enumerable .Range(0, _threadCount) .Select( n => new Thread( () => { for (var j = 0; j < _iterations; j++) { for (var i = 0; i < _iterationDepth; i++) { stack.Push(i.ToString()); } string res; for (var i = 0; i < _iterationDepth; i++) { stack.TryPop(out res); } } })) .ToArray(); var sw = Stopwatch.StartNew(); foreach (var thread in threads) thread.Start(); foreach (var thread in threads) thread.Join(); sw.Stop(); if (!stack.IsEmpty) throw new ApplicationException("Stack must be empty!"); return sw.ElapsedMilliseconds; }
We run several threads and each of these threads pushes and pops items to the stack in parallel. We wait for all the threads to complete, and check whether the stack is empty, which means that the program is correct. Finally, we measure the time required for all the operations to complete.
The results can be different and greatly depend on the CPU. This one is from a 3.4 GHz quad core Intel i7-3770 CPU:
LockStack: 6718ms LockFreeStack: 209ms MonitorStack: 154ms ConcurrentStack: 121ms
This one is from a hyper-v virtual machine with two CPU cores running on a 2.2 GHz quad core Intel i7-4702HQ CPU laptop with power saving mode enabled:
LockStack: 39497ms LockFreeStack: 388ms MonitorStack: 691ms ConcurrentStack: 419ms
The typical results are as follows: LockStack
is the slowest, the LockFreeStack
and MonitorStak
implementations perform about the same, and the standard ConcurrentStack
shows the best results. The MonitorStack
implementation works well because, in this case, operations under lock are very fast, that is, about two processor cycles, and in this situation, spin wait works very well. We'll get back to explaining these results in detail later in Chapter 6, Using Concurrent Data Structures.
The lock-free queue
Stack and queue are the simplest of basic data structures. We have implemented a lock-free stack, and we encountered several tricky problems that we had to resolve. Implementing a lock-free concurrent queue is a more sophisticated task, since we now have to perform several operations at once. For example, when we queue a new item, we must simultaneously set the old tail's next item reference to a new item and then change a tail reference that the new item is now a new tail. Unfortunately, we cannot change two objects as an atomic operation. So, we must find a way to properly synchronize access to the head and tail without locks:
public class LockFreeQueue<T> {
We define a simple class that is going to contain data in the queue:
protected class Item { public T Data; public Item Next; }
We will store references to the queue's tail and head and initialize them by default:
private Item _head; private Item _tail; public LockFreeQueue() { _head = new Item(); _tail = _head; }
The first challenge is to implement an Enqueue
method. What we can do is set tail.Next
in the CAS operation, but let the tail reference advance later, maybe by other threads. This guarantees that the linked list of queue items will always be valid, and if we see that we failed to set a new tail, just let this operation start in another thread:
public void Enqueue(T data) {
Create a new queue item and reserve space for the local copies of the _tail
and _tail.Next
references:
Item item = new Item(); item.Data = data; Item oldTail = null; Item oldNext = null;
We repeat the queueing operation until it succeeds:
bool update = false; while (!update) {
Copy references to local variables and acquire a full fence so that the read and write operations will not be reordered. We have to use the Next
field from the local copy, because the actual _tail
item may have already been changed between both the read operations:
oldTail = _tail; oldNext = oldTail.Next; Thread.MemoryBarrier();
The tail may remain the same as it was in the beginning of the operation:
if (_tail == oldTail) {
In this case, the next reference was null, which means that no one changed the tail since we copied it to oldNext
:
if (oldNext == null) {
Here we can try queueing an item, and this will be the success of the whole operation:
update = Interlocked.CompareExchange(ref _tail.Next, item, null) == null; } else {
If not, it means that another thread is queueing a new item right now, so we should try to set the tail reference to point to its next node:
Interlocked.CompareExchange(ref _tail, oldNext, oldTail); } } }
Here we have successfully inserted a new item to the end of the queue, and now we're trying to update the tail reference. However, if we fail it is okay, since another thread will eventually do this in its Enqueue
method call:
Interlocked.CompareExchange(ref _tail, item, oldTail); }
The main goal of dequeueing properly is to correctly work in situations when we have not yet updated the tail reference:
public bool TryDequeue(out T result) {
We will create a loop that finishes if there is nothing to dequeue or if we have dequeued an item successfully:
result = default(T); Item oldNext = null; bool advanced = false; while (!advanced) {
We will make local copies of variables that are needed:
Item oldHead = _head; Item oldTail = _tail; oldNext = oldHead.Next;
Then, we will acquire a full fence to prevent read and write reordering:
Thread.MemoryBarrier();
There might be a case when the head item has not been changed yet:
if (oldHead == _head) {
Then, we will check whether the queue is empty:
if (oldHead == oldTail) {
In this case, this should be false
. If not, it means that we have a lagging tail and we need to update it:
if (oldNext != null) { Interlocked.CompareExchange(ref _tail, oldNext, oldTail); continue; }
If we are here, we have an empty queue:
result = default(T); return false; }
Now we will get the dequeueing item and try to advance the head reference:
result = oldNext.Data; advanced = Interlocked.CompareExchange( ref _head, oldNext, oldHead) == oldHead; } }
We will remove any references that can prevent the garbage collector from doing its job, and then we will exit:
oldNext.Data = default(T); return true; } public bool IsEmpty { get { return _head == _tail; } } }
Then we will write the following code to unify access to queues and compare different ways to synchronize access to the queue. To write general performance measurement code, we need to write an interface:
public interface IConcurrentQueue<T> { void Enqueue(T data); bool TryDequeue(out T data); bool IsEmpty { get; } }
Both LockFreeQueue
and the standard ConcurrentQueue
are already implementing this interface, and all we need to do is to create a wrapper class like this:
class LockFreeQueueWrapper<T> : LockFreeQueue<T>, IConcurrentQueue<T> {} class ConcurrentQueueWrapper<T> : ConcurrentQueue<T>, IConcurrentQueue<T> {}
We need a more advanced wrapper in the case of a non-thread-safe Queue collection:
class QueueWrapper<T> : IConcurrentQueue<T> { private readonly object _syncRoot = new object(); private readonly Queue<T> _queue = new Queue<T>(); public void Enqueue(T data) { lock(_syncRoot) _queue.Enqueue(data); } public bool TryDequeue(out T data) { if (_queue.Count > 0) { lock (_syncRoot) { if (_queue.Count > 0) { data = _queue.Dequeue(); return true; } } } data = default(T); return false; } public bool IsEmpty { get { return _queue.Count == 0; } } }
We have used a double checked locking pattern inside the TryDequeue
method. At first glance, it seems that the first if
statement is not doing anything useful, and we can just remove it. If you do an experiment and run the program without the first check, it will run about 50 times slower. The goal of the first check is to see whether the queue is empty so that a lock is not acquired; the lock and other threads are allowed to access the queue. Making a lock code minimal is very important, and it is illustrated here very well.
Now we need performance measurement. We can write a generalized code and provide our different queues in a similar way:
private static long Measure(IConcurrentQueue<string> queue) { var threads = Enumerable .Range(0, _writeThreads) .Select(n => new Thread(() => { for (int i = 0; i < _iterations; i++) { queue.Enqueue(i.ToString()); Thread.SpinWait(100); } })) .Concat(new[]{new Thread(() => { var left = _iterations*_writeThreads; while (left > 0) { string res; if (queue.TryDequeue(out res)) left--; } }) }) .ToArray(); var sw = Stopwatch.StartNew(); foreach (var thread in threads) thread.Start(); foreach (var thread in threads) thread.Join(); sw.Stop(); if (!queue.IsEmpty) throw new ApplicationException("Queue is not empty!"); return sw.ElapsedMilliseconds; }
The last thing that we need is just run the program and the results are going to be like this:
private const int _iterations = 1000000; private const int _writeThreads = 8; public static void Main() { Console.WriteLine("Queue: {0}ms", Measure(new QueueWrapper<string>())); Console.WriteLine("LockFreeQueue: {0}ms", Measure(new LockFreeQueueWrapper<string>())); Console.WriteLine("ConcurrentQueue: {0}ms", Measure(new ConcurrentQueueWrapper<string>())); }
The output is as follows:
Queue: 3453ms LockFreeQueue: 1868ms ConcurrentQueue: 1162ms
These results show that our lock-free queue has an advantage over straightforward locking, but the standard ConcurrentQueue
performs better. It uses complicated ways of storing data—a linked list of array segments, which allows us to organize a more optimal process of storing and reading data.
- Design Principles for Process:driven Architectures Using Oracle BPM and SOA Suite 12c
- Mobile Application Development:JavaScript Frameworks
- What's New in TensorFlow 2.0
- PyTorch自動駕駛視覺感知算法實戰
- 劍指JVM:虛擬機實踐與性能調優
- 精通軟件性能測試與LoadRunner實戰(第2版)
- Building a Quadcopter with Arduino
- Instant RubyMotion App Development
- Microsoft System Center Orchestrator 2012 R2 Essentials
- Protocol-Oriented Programming with Swift
- Learning jQuery(Fourth Edition)
- Maker基地嘉年華:玩轉樂動魔盒學Scratch
- STM8實戰
- 交互設計師成長手冊:從零開始學交互
- C#程序設計基礎與實踐