官术网_书友最值得收藏!

Writing lock-free code

Since we have a very limited number of atomic operations, it is very hard to write lock-free code. For some common data structures, such as a double linked list, there is no lock-free implementation. Besides, it is very easy to make a mistake, and the main problem is that such code could work fine 99.9 percent of the time, which makes debugging enormously confusing.

Therefore, the best practice is to use standard implementations of such algorithms. A good place to start is by using concurrent collections from the System.Collections.Concurrent namespace that was introduced in the .NET Framework 4.0. We will review them in detail in Chapter 6, Using Concurrent Data Structures. However, now we will try to do not as advised and implement a lock-free stack and a lock-free queue from scratch.

The cornerstone of the lock-free code is the following pattern: read some data from the shared state, calculate a new value, and then write the new value back, but only if the shared state wasn't mutated by any other thread by that time. The last check and write operation must be atomic, and this is what we use Interlocked.CompareExchange for. This description looks a bit confusing, but it can be illustrated with quite an easy example. Imagine multiple threads calculating an integer sum in parallel. Consider the following line of code, for example:

_total += current;

If we use this simple code, we would get race condition here since this operation is not atomic. The easiest way to fix this is by using atomic addition with the Interlocked.Add method, but to illustrate the CompareExchange method logic, let's implement the addition like this:

int beforeValue, newValue;
do
{
  beforeValue = _total;
  newValue = beforeValue + current;
}
while (beforeValue != Interlocked.CompareExchange(ref _total, newValue, beforeValue))

First, we save the _total value in the beforeValue temporary variable. Then, we calculate a new value and store it in newValue. Finally, we're trying to save newValue in _total, but only if _total remains the same when we started the operation. If not, it means that the _total value has been changed by another thread and we have to repeat the operation with the new value of _total.

The ABA problem

Remember when we mentioned that lock-free programming is very complicated? Now, it's time to prove it. Here is another case when a seemingly right concurrent code works absolutely wrong.

Imagine that we have a lock-free stack implementation with the Interlocked.CompareExchange atomic compare-and-swap (CAS) operation. Let's assume that it contains three items: A on top, B, and C. Thread 1 calls the Pop method; it sets the old head value as A and the new head value as B. However for some reason, thread 1 gets suspended by the operating system. Meanwhile, thread 2 pops item A from the stack and saves it for later use. Then, it pushes item D on the stack. After doing this, it finally pushes item A back on top of the stack, but this time A's next item is D and our stack contains four items: A on top, D, B, and C.

Now the first thread continues to run. It compares whether the old head value and the current head value are the same, and they are! Therefore, the thread writes value B to the head of the stack. Now, the stack is corrupted and contains two items: B on the top and C.

The described process can be illustrated by the following schema:

The ABA problem

So, just having atomic CAS operations is not enough. To make this code work right, it's very important to make sure that we do not reuse references in our code or allow them to escape to our consumers. Thus, when we push item A twice, it should be different from the existing items from the stack perspective. To achieve this, it's enough to allocate a new wrapper object each time something is being pushed onto the stack.

Here is a quote from Wikipedia that describes the ABA problem very well:

Natalie is waiting in her car at a red traffic light with her children. Her children start fighting with each other while waiting, and she leans back to scold them. Once their fighting stops, Natalie checks the light again and notices that it's still red. However, while she was focusing on her children, the light had changed to green, and then back again. Natalie doesn't think the light ever changed, but the people waiting behind her are very mad and honking their horns now.

The lock-free stack

Now, we are ready to implement a lock-free stack data structure. First, we define a base abstract class for our stack implementation:

public abstract class StackBase<T>

Then we have an inner class to define an item on the stack:

private class Item
{
  private readonly T _data;
  private readonly Item _next;

  public Item(T data, Item next)
  {
    _data = data;
    _next = next;
  }

  public T Data
  {
    get { return _data; }
  }

  public Item Next
  {
    get { return _next; }
  }
}

The item class contains user data and a reference to the next element on the stack. Now, we're adding a stack top item:

private Item _head;

A property that indicates whether the stack is empty is as follows:

public bool IsEmpty
{
  get { return _head == null; }
}

Two abstract methods that store and retrieve an item from the stack:

public abstract void Push(T data);

public abstract bool TryPop(out T data);

Now we have a base for different stack implementations to compare how they perform. We start with a lock-based stack:

public class LockStack<T> : StackBase<T>

As we remember, the lock statement is translated to the Monitor class method calls by the C# compiler. The monitor class tries to avoid using OS-level locks and uses spin locks to achieve a performance boost when a lock takes a little time. We're going to illustrate this and create a stack that uses only OS-level locks with the help of the System.Threading.Mutex class, which uses the mutex synchronization primitive from the OS. We create a mutex instance:

private readonly Mutex _lock = new Mutex();

Then, implement the Push and Pop methods as follows:

public override void Push(T data)
{
  _lock.WaitOne();
  try
  {
    _head = new Item(data, _head);
  }
  finally
  {
    _lock.ReleaseMutex();
  }
}

public override bool TryPop(out T data)
{
  _lock.WaitOne();
  try
  {
    if (IsEmpty)
    {
      data = null;
      return false;
    }
    data = _head.Data;
    _head = _head.Next;
    return true;
  }
  finally
  {
    _lock.ReleaseMutex();
  }
}

This implementation puts a thread in a blocked state every time it has to wait for the lock to be released. This is the worst-case scenario, and we're going to see the test results that prove this.

Now we will implement a concurrent stack with a monitor and lock statement:

public class MonitorStack<T> : StackBase<T> where T: class
{
  private readonly object _lock = new object();

  public override void Push(T data)
  {
    lock (_lock)
      _head = new Item(data, _head);
  }

  public override bool TryPop(out T data)
  {
    lock (_lock)
    {
      if (IsEmpty)
      {
        data = null;
        return false;
      }
      data = _head.Data;
      _head = _head.Next;
      return true;
    }
  }
}

Then it's the lock-free stack implementation's turn:

public class LockFreeStack<T> where T: class

Notice that we had to add class constraint to the generic type parameter. We do this because we cannot atomically exchange values that are more than 8 bytes in size. If we look at the generic version of the Interlocked.CompareExchange method, we can make sure that its type parameter has the same class constraint.

Let's get to implementation:

public void Push(T data)
{
  Item item, oldHead;
  do
  {
    oldHead = _head;
    item = new Item(data, oldHead);
  } while (oldHead != Interlocked.CompareExchange(ref _head, item, oldHead));
}

This implementation is quite similar to a lock-free addition example. We basically do the same thing, only instead of addition, we're storing a new reference to the stack's head.

The TryPop method code is slightly more complicated:

public bool TryPop(out T data)
{
  var oldHead = _head;
  while (!IsEmpty)
  {
    if (oldHead == Interlocked.CompareExchange(ref _head, oldHead.Next, oldHead))
    {
      data = oldHead.Data;
      return true;
    }
    oldHead = _head;
  }
  data = null;
  return false;
}

Here we have to notice that the stack can be empty; in this case, we return false to indicate that we failed to retrieve a value from the stack.

Also, we would like to compare our code to the standard ConcurrentStack implementation from System.Collections.Concurrent. It is possible to use an interface to work with all collections in the same way, but in this case, it is easier to create a wrapper class that contains the source collection:

public class ConcurrentStackWrapper<T> : StackBase<T>
{
  private readonly ConcurrentStack<T> _stack;
  public ConcurrentStackWrapper()
  {
    _stack = new ConcurrentStack<T>();
  }

  public override void Push(T data)
  {
    _stack.Push(data);
  }

  public override bool TryPop(out T data)
  {
    return _stack.TryPop(out data);
  }
}

The only operation left is to compare the performances of our stack implementations:

private static long Measure(StackBase<string> stack)
{
  var threads = Enumerable
    .Range(0, _threadCount)
    .Select(
      n => new Thread(
        () =>
        {
          for (var j = 0; j < _iterations; j++)
          {
            for (var i = 0; i < _iterationDepth; i++)
            {
              stack.Push(i.ToString());
            }

            string res;

            for (var i = 0; i < _iterationDepth; i++)
            {
              stack.TryPop(out res);
            }
          }
        }))
    .ToArray();
  var sw = Stopwatch.StartNew();
  foreach (var thread in threads)
    thread.Start();
  foreach (var thread in threads)
    thread.Join();
  sw.Stop();
  if (!stack.IsEmpty)
    throw new ApplicationException("Stack must be empty!");
  return sw.ElapsedMilliseconds;
}

We run several threads and each of these threads pushes and pops items to the stack in parallel. We wait for all the threads to complete, and check whether the stack is empty, which means that the program is correct. Finally, we measure the time required for all the operations to complete.

The results can be different and greatly depend on the CPU. This one is from a 3.4 GHz quad core Intel i7-3770 CPU:

LockStack: 6718ms
LockFreeStack: 209ms
MonitorStack: 154ms
ConcurrentStack: 121ms

This one is from a hyper-v virtual machine with two CPU cores running on a 2.2 GHz quad core Intel i7-4702HQ CPU laptop with power saving mode enabled:

LockStack: 39497ms
LockFreeStack: 388ms
MonitorStack: 691ms
ConcurrentStack: 419ms

The typical results are as follows: LockStack is the slowest, the LockFreeStack and MonitorStak implementations perform about the same, and the standard ConcurrentStack shows the best results. The MonitorStack implementation works well because, in this case, operations under lock are very fast, that is, about two processor cycles, and in this situation, spin wait works very well. We'll get back to explaining these results in detail later in Chapter 6, Using Concurrent Data Structures.

The lock-free queue

Stack and queue are the simplest of basic data structures. We have implemented a lock-free stack, and we encountered several tricky problems that we had to resolve. Implementing a lock-free concurrent queue is a more sophisticated task, since we now have to perform several operations at once. For example, when we queue a new item, we must simultaneously set the old tail's next item reference to a new item and then change a tail reference that the new item is now a new tail. Unfortunately, we cannot change two objects as an atomic operation. So, we must find a way to properly synchronize access to the head and tail without locks:

public class LockFreeQueue<T>
{

We define a simple class that is going to contain data in the queue:

protected class Item
{
  public T Data;
  public Item Next;
}

We will store references to the queue's tail and head and initialize them by default:

private Item _head;
private Item _tail;

public LockFreeQueue()
{
  _head = new Item();
  _tail = _head;
}

The first challenge is to implement an Enqueue method. What we can do is set tail.Next in the CAS operation, but let the tail reference advance later, maybe by other threads. This guarantees that the linked list of queue items will always be valid, and if we see that we failed to set a new tail, just let this operation start in another thread:

public void Enqueue(T data)
{

Create a new queue item and reserve space for the local copies of the _tail and _tail.Next references:

Item item = new Item();
item.Data = data;

Item oldTail = null;
Item oldNext = null;

We repeat the queueing operation until it succeeds:

bool update = false;
while (!update) {

Copy references to local variables and acquire a full fence so that the read and write operations will not be reordered. We have to use the Next field from the local copy, because the actual _tail item may have already been changed between both the read operations:

oldTail = _tail;
oldNext = oldTail.Next;

Thread.MemoryBarrier();

The tail may remain the same as it was in the beginning of the operation:

if (_tail == oldTail)
{

In this case, the next reference was null, which means that no one changed the tail since we copied it to oldNext:

if (oldNext == null)
{

Here we can try queueing an item, and this will be the success of the whole operation:

  update = Interlocked.CompareExchange(ref _tail.Next, item, null) == null;
}
else
{

If not, it means that another thread is queueing a new item right now, so we should try to set the tail reference to point to its next node:

      Interlocked.CompareExchange(ref _tail, oldNext, oldTail);
    }
  }
}

Here we have successfully inserted a new item to the end of the queue, and now we're trying to update the tail reference. However, if we fail it is okay, since another thread will eventually do this in its Enqueue method call:

  Interlocked.CompareExchange(ref _tail, item, oldTail);
}

The main goal of dequeueing properly is to correctly work in situations when we have not yet updated the tail reference:

public bool TryDequeue(out T result)
{

We will create a loop that finishes if there is nothing to dequeue or if we have dequeued an item successfully:

result = default(T);
Item oldNext = null;
bool advanced = false;
while (!advanced)
{

We will make local copies of variables that are needed:

Item oldHead = _head;
Item oldTail = _tail;
oldNext = oldHead.Next;

Then, we will acquire a full fence to prevent read and write reordering:

Thread.MemoryBarrier();

There might be a case when the head item has not been changed yet:

if (oldHead == _head)
{

Then, we will check whether the queue is empty:

if (oldHead == oldTail)
{

In this case, this should be false. If not, it means that we have a lagging tail and we need to update it:

if (oldNext != null)

{
  Interlocked.CompareExchange(ref _tail, oldNext, oldTail);
  continue;
}

If we are here, we have an empty queue:

  result = default(T);
  return false;
}

Now we will get the dequeueing item and try to advance the head reference:

    result = oldNext.Data;
    advanced = Interlocked.CompareExchange(
    ref _head, oldNext, oldHead) == oldHead;
  }
}

We will remove any references that can prevent the garbage collector from doing its job, and then we will exit:

    oldNext.Data = default(T);
    return true;
  }

  public bool IsEmpty
  {
    get
    {
      return _head == _tail;
    }
  }
}

Then we will write the following code to unify access to queues and compare different ways to synchronize access to the queue. To write general performance measurement code, we need to write an interface:

public interface IConcurrentQueue<T>
{
  void Enqueue(T data);
  bool TryDequeue(out T data);
  bool IsEmpty { get; }
}

Both LockFreeQueue and the standard ConcurrentQueue are already implementing this interface, and all we need to do is to create a wrapper class like this:

class LockFreeQueueWrapper<T> : LockFreeQueue<T>, IConcurrentQueue<T> {}

class ConcurrentQueueWrapper<T> : ConcurrentQueue<T>, IConcurrentQueue<T> {}

We need a more advanced wrapper in the case of a non-thread-safe Queue collection:

class QueueWrapper<T> : IConcurrentQueue<T>
{
  private readonly object _syncRoot = new object();
  private readonly Queue<T> _queue = new Queue<T>();

  public void Enqueue(T data)
  {
    lock(_syncRoot)
    _queue.Enqueue(data);
  }

  public bool TryDequeue(out T data)
  {
    if (_queue.Count > 0)
    {
      lock (_syncRoot)
      {
        if (_queue.Count > 0)
        {
          data = _queue.Dequeue();
          return true;
        }
      }
    }
    data = default(T);
    return false;
  }

  public bool IsEmpty
  {
    get { return _queue.Count == 0; }
  }
}

We have used a double checked locking pattern inside the TryDequeue method. At first glance, it seems that the first if statement is not doing anything useful, and we can just remove it. If you do an experiment and run the program without the first check, it will run about 50 times slower. The goal of the first check is to see whether the queue is empty so that a lock is not acquired; the lock and other threads are allowed to access the queue. Making a lock code minimal is very important, and it is illustrated here very well.

Now we need performance measurement. We can write a generalized code and provide our different queues in a similar way:

private static long Measure(IConcurrentQueue<string> queue)
{
  var threads = Enumerable
  .Range(0, _writeThreads)
  .Select(n => new Thread(() =>
  {
    for (int i = 0; i < _iterations; i++)
    {
      queue.Enqueue(i.ToString());
      Thread.SpinWait(100);
    }
  }))
  .Concat(new[]{new Thread(() =>
  {
    var left = _iterations*_writeThreads;
    while (left > 0)
    {
      string res;
      if (queue.TryDequeue(out res))
        left--;
    }
  })
  })
  .ToArray();
  var sw = Stopwatch.StartNew();
  foreach (var thread in threads)
    thread.Start();
  foreach (var thread in threads)
    thread.Join();
  sw.Stop();
  if (!queue.IsEmpty)
    throw new ApplicationException("Queue is not empty!");
  return sw.ElapsedMilliseconds;
}

The last thing that we need is just run the program and the results are going to be like this:

private const int _iterations = 1000000;
private const int _writeThreads = 8;

public static void Main()
{
  Console.WriteLine("Queue: {0}ms", Measure(new QueueWrapper<string>()));
  Console.WriteLine("LockFreeQueue: {0}ms", Measure(new LockFreeQueueWrapper<string>()));
  Console.WriteLine("ConcurrentQueue: {0}ms", Measure(new ConcurrentQueueWrapper<string>()));
}

The output is as follows:

Queue: 3453ms
LockFreeQueue: 1868ms
ConcurrentQueue: 1162ms

These results show that our lock-free queue has an advantage over straightforward locking, but the standard ConcurrentQueue performs better. It uses complicated ways of storing data—a linked list of array segments, which allows us to organize a more optimal process of storing and reading data.

主站蜘蛛池模板: 吉水县| 黄山市| 吴忠市| 新化县| 瑞丽市| 平昌县| 三都| 邵东县| 武穴市| 平昌县| 宽甸| 临桂县| 天津市| 靖远县| 古浪县| 图们市| 嵊泗县| 枣庄市| 陵水| 沙湾县| 静宁县| 清新县| 拉萨市| 桑日县| 土默特左旗| 靖边县| 井研县| 嘉定区| 台州市| 简阳市| 如东县| 崇文区| 大同县| 阿拉善右旗| 余庆县| 小金县| 社旗县| 香格里拉县| 浦县| 安西县| 资中县|