Thread-safe структуры данных .NET 4 (ч.2)

Автор: Topol Суббота, Май 5th, 2012 Нет комментариев

Рубрика: Операционные системы

BlockingCollection<T> — эта потокобезопасная коллекция называется блокирующей, поскольку действует по следующему принципу:

  • Если коллекция пустая, и некоторый код пытается извлечь элемент, то поток, в котором он выполняется, блокируется до появления хотя бы одного элемента;
  • Если коллекция уже содержит максимальное число элементов, то поток, в котором пытаются добавить новый элемент, блокируется до тех пор, пока место не будет освобождено.


Разумеется, это далеко не полное описание её возможностей. Давайте с ними ознакомимся. Начнём с базовых вещей. Изменение содержимого коллекции производится при помощи специальных методов:

Блокирующие методы Неблокирующие методы
Добавление Add(T);
Add(T, CancellationToken);
TryAdd(T)
TryAdd(T, Int32)
TryAdd(T, TimeSpan)
TryAdd(T, Int32, CancellationToken)
Извлечение Take()
Take(CancellationToken)
TryTake(out T)
TryTake(out T, Int32)
TryTake(out T, TimeSpan)
TryTake(out T, Int32, CancellationToken)

Как видно, для добавления и извлечения элементов есть в т.ч. и неблокирующие методы (те, что начинаются с префикса»Try»). Т.е. возможен неблокирующий доступ к коллекции. При этом Try* метод вернет false, если элемент невозможно добавить или извлечь, и true в противном случае. Для обоих типов методов есть перегруженные версии с маркером отмены, с помощью которого можно асинхронно прекратить ожидание.

BlockingCollection<T> имеет механизм «завершения». Коллекция считается «завершенной», когда известно, что данные в неё добавляться больше не будут. После «завершения» все попытки добавить данные приведут к генерации исключения InvalidOperationException. Если попытаться извлечь данные из пустой «завершенной» коллекции, то также будет сгенерировано исключение. Сразу же после создания коллекция является «незавершенной», а её «завершение» выполняется посредством вызова метода CompleteAdding(). Таким образом, изменение состояния коллекции выполняется вручную. Зачем был придуман такой механизм? С его помощью обеспечивается синхронизация работы производителя и потребителя, т.е. участков кода, добавляющих и извлекающих данные из коллекции. Производитель может известить о том, что новые элементы он добавлять больше не будет. При этом потребитель будет знать, что не стоит ожидать пополнения коллекции. Ниже приведен пример, демонстрирующий подобный подход.

Давайте рассмотрим немного кода, работающего с BlockingCollection<T>. Сначала создадим экземпляр коллекции, причем в конструкторе укажем её максимальную ёмкость — в данном случае это 10 элементов. Впрочем, размер можно явно не задавать, вызвав конструктор без параметров — в таком случае коллекция будет расти «неограниченно»:

Код:
// создаём коллекцию
BlockingCollection<int> collection = new BlockingCollection<int>(10);

Далее в коллекцию добавим несколько элементов, и на консоль выведем свойства коллекции (о которых чуть позже):

Код:
collection.Add(200);collection.Add(300);collection.Add(400);collection.Add(500); Console.WriteLine(«Count: » + collection.Count);Console.WriteLine(«BoundedCapacity: » + collection.BoundedCapacity);Console.WriteLine(«IsCompleted: » + collection.IsCompleted);Console.WriteLine(«IsAddingCompleted: » + collection.IsAddingCompleted);Console.WriteLine();

После этого запустим таймер, в делегате callback которого происходит добавление элементов и «завершение» коллекции:

Код:
Timer timer = new Timer(delegate
{
Console.ForegroundColor = ConsoleColor.Red;    Console.WriteLine(DateTime.Now.ToLongTimeString() + » Добавление » + » 600″);    collection.Add(600);    Thread.SpinWait(300000000);     Console.ForegroundColor = ConsoleColor.Red;    Console.WriteLine(DateTime.Now.ToLongTimeString() + » Добавление » + » 700″);    collection.Add(700);     collection.CompleteAdding();
},
null, 3000, Timeout.Infinite);

Делегат, указанный при создании таймера, будет вызван в потоке, отличном от того, в котором работает метод Main() — это позволит промоделировать доступ к коллекции со стороны нескольких потоков. Дальше в цикле начинаем перебор элементов (чтобы отличить вывод от разных потоков, используется различный цвет текста):

Код:
foreach (var item in collection.GetConsumingEnumerable())
{
Console.ForegroundColor = ConsoleColor.Gray;
Console.WriteLine(DateTime.Now.ToLongTimeString() + » Получение » + item);
}

Затем снова выведем свойства коллекции на консоль. Вот скриншот с результатами работы программы:

Работает этот пример следующим образом. Создаётся коллекция и в неё помещается 4 элемента. После чего запускается таймер, и начинают одновременно работают 2 потока:

  • основной, где выполняется метод Main(). Здесь элементы изымаются из коллекции в цикле foreach;
  • поток таймера, где выполняется делегат callback таймера. Здесь элементы добавляются;

Те четыре элемента, которые были добавлены в самом начале, извлекаются за время менее 1 секунды. После этого основной поток блокируется, ожидая добавления новых элементов. Таймер запускается с задержкой в 3 секунды — и это видно на скриншоте: элемент 500 был извлечен в момент 06 сек., а элемент 600 был добавлен в 09 сек. В ту же 09 секунду цикл в основном потоке разблокируется, и выбирает только что добавленный элемент 600. В это время в потоке таймера создаётся искусственная задержка за счёт вызова SpinWait(). По её истечении в коллекцию добавляется элемент 700, который тут же извлекается в цикле основного потока. В заключении в потоке таймера происходит вызов collection.CompleteAdding(), который запрещает дальнейшее добавление элементов, т.е. «завершает» коллекцию. Поскольку к тому моменту коллекция пуста, и ожидание новых элементов становится бессмысленным, в основном потоке блокировка снимается и происходит выход из цикла foreach. Если закомментировать строчку с вызовом метода CompleteAdding(), то основной поток программы блокируется навсегда, ведь он будет ожидать поступления новых элементов, а поток таймера тем временем завершит работу. Т.о. выполнение «завершения» позволяет синхронизировать два параллельно работающих потока, и предотвратить подобное развитие событий.

Что касается свойств коллекции, то здесь наиболее интересны следующие:

  • IsAddingCompleted — возвращает true, если коллекция помечена как завершенная, т.е. дальнейшее добавление элементов запрещено, false в противном случае;
  • IsCompleted — возвращает true, если коллекция завершенная и все элементы выбраны, иначе false;
  • BoundedCapacity — указанная при создании емкость. Возвращает -1, если коллекция «безразмерная»;

Ещё раз взглянув на скриншот, можно заметить, что в начале, когда в коллекцию добавили несколько элементов, оба логических свойства равны false, что вполне ожидаемо, ведь коллекция не «завершена». В конце работы, напротив, оба они равны true, поскольку в потоке таймера мы объявили коллекцию «завершенной» (т.е. IsAddingCompleted = true), а в основном потоке выбрали из неё все элементы (т.е. IsCompleted = true).

Возникает законный вопрос — а что с порядком элементов? Почему элементы извлекаются по принципу FIFO — это что, принцип работы коллекции? Или можно как-то на это повлиять? Ответ на эти вопросы даст более подробное рассмотрение перегруженного конструктора коллекции. В одной из его модификаций есть возможность указать ссылку на хранилище элементов — экземпляр класса, реализующего IProducerConsumerCollection<T>. Среди имеющихся в арсенале .NET 4 beta 2 средств это следующие структуры данных:

  • ConcurrentStack<T> — потокобезопасный стек;
  • ConcurrentQueue<T> — потокобезопасная очередь;
  • ConcurrentBag<T> — потокобезопасная неупорядоченная коллекция, допускающая дубликаты;

Т.е. структура данных BlockingCollection<T> реализует не хранилище, а только способ доступа с блокировкой и «завершением». Причем по умолчанию для хранения элементов используется очередь ConcurrentQueue<T>, отсюда и соблюдение принципа FIFO при работе с элементами. В рассмотренном выше примере мы можем заменить вызов конструктора следующим:

Код:
BlockingCollection<int> collection =
new BlockingCollection<int>(new ConcurrentStack<int>(), 10);

т.е. использовать стек. Тогда получим такой результат:

Очевидно, что порядок извлечения элементов поменялся, и теперь он соответствует LIFO, что характерно для стека. Т.о. есть возможность реализовать хранилище данных, наиболее подходящее в данном конкретном случае, и использовать его в BlockingCollection<T>. Для этого достаточно наследовать интерфейс IProducerConsumerCollection<T>:

Код:
public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection, IEnumerable
{
void CopyTo(T[] array, int index);
T[] ToArray();
bool TryAdd(T item);
bool TryTake(out T item);
}

В заключение хочу отметить несколько любопытных статических методов класса BlockingCollection<T>. С их помощью можно добавлять или извлекать элементы, взаимодействуя с любой из коллекций, переданных в качестве параметра. Традиционно, есть 2 версии этих методов:

  • Блокирующие:
    int BlockingCollection<T>.AddToAny(BlockingCollection<T>[], T);
    int BlockingCollection<T>.TakeFromAny(BlockingCollection<T>[], out T);
  • Неблокирующие:
    int BlockingCollection<T>.TryAddToAny(BlockingCollection<T>[], T);
    int BlockingCollection<T>.TryTakeFromAny(BlockingCollection<T>[], out T);

Как видим, они принимают массив коллекций, и параметр — добавляемый элемент или out-параметр — извлекаемый. Возвращают эти методы индекс коллекции в массиве, в которую добавили данные, или, соответственно, извлекли. При этом блокирующие методы ждут появления свободного места в коллекции или нового элемента, блокируя при этом поток, в котором выполняются. Неблокирующие, в зависимости от вызванной перегруженной версии, сразу возвращают управление, либо «пытают счастья» указанный таймаут. Разумеется, в списке выше приведены не все сигнатуры методов — есть, например, принимающие маркер отмены, CancellationToken, подробнее лучше посмотреть MSDN.

Комментарии к 1-й части этого обзора побудили меня внимательнее отнестись к рассмотрению возвращаемых методами значений, и не зря. Изучая статические методы, я подумал — когда, например, TakeAnyFrom() вернёт -1? И, честно говоря, так и не смог придумать сценария. Если все коллекции массива-параметра пусты — он будет вечно ждать их пополнения или «завершения». Однако, если хотя бы одну из них завершить, получим ArgumentException. Если же положить в хотя бы одну коллекцию данные, метод вернёт её индекс и в out-параметре те самые данные. Рефлектор подсказал, что на самом деле все статические методы BlockingCollection<T> вызывают внутри один и тот же метод. Но я так и не понял, как при задании бесконечного таймаута неблокирующий метод может вернуть -1, поэтому переадресовал вопрос на форум MSDN. Для тех, кто заинтересовался, топик здесь, а вот тут соответствующая тема на MS Connect. Пока сошлись на том, что это просто ошибка в документации — копипаст с описания метода TryTakeAnyFrom().

Другие примеры использования BlockingCollection<T> можно найти на сайте MSDN Code Gallery. Там имеется страничка с исходными кодами, на которых демонстрируется использование тех или иных средств распараллеливания кода, входящих в .NET 4. На момент публикации примеры есть только для .NET 4 beta 1, но скоро опубликуют комплект и для beta 2. В составе этих примеров есть, например, набор методов-расширений, позволяющий «преобразовать» данную коллекцию в объект типа IProducerConsumerCollection<T>.

Хочу обратить внимание, что наиболее актуальная реализация рассматриваемой коллекции находится в составе .NET 4 beta 2, который стал доступен на днях. Если возникнет желание опробовать её в деле, рекомендую установить Visual Studio 2010 beta 2. По сравнению с прошлогодней июньской CTP-версией, работающей под .NET 3.5, некоторые методы были переименованы. Впрочем, это касается всей библиотеки Parallel Extensions. Полное описание BlockingCollection<T> можно найти в соответствующем разделе MSDN.

Источник: thevista.ru

Оставить комментарий

Чтобы оставлять комментарии Вы должны быть авторизованы.

Похожие посты