Skip to content

BlockingCollection.TakeFromAny suggestion #76128

@NikolyKozemiakin

Description

@NikolyKozemiakin

Description

BlockingCollecction.TakeFromAny do not check state of cancellationToken (3rd argument).

inside of this method :

    // Check if the collection is not completed, and potentially has at least one element by checking the semaphore count
    if (!collections[i].IsCompleted && collections[i]._occupiedNodes.CurrentCount > 0 && **collections[i].TryTake(out item))**

in call of TryTake(out item)) do not used cancellationToken from method call and inside of TryTake(out item)) used call with empty CancellationToken:

      public bool TryTake([MaybeNullWhen(false)] out T item)
        {
            return TryTake(out item, 0, **CancellationToken.None**);
        }

I propose use here :
// Check if the collection is not completed, and potentially has at least one element by checking the semaphore count
if (!collections[i].IsCompleted && collections[i]._occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item, cancellationToken ))

Reproduction Steps

    internal class Program
    {
        static void Main(string[] args)
        {
            int value = 0;
            CancellationTokenSource cts = new CancellationTokenSource(5000);
            BlockingCollection<int> collection = new BlockingCollection<int>(new ConcurrentStack<int>(), 10);
            BlockingCollection<int> priority = new BlockingCollection<int>(new ConcurrentStack<int>(), 3);
            BlockingCollection<int>[] list = new BlockingCollection<int>[] { priority, collection };
            var producer = Task.Run(() =>
            {
                try
                {
                    while (value < 20)
                    {
                        collection.Add(value++, cts.Token);
                        Console.WriteLine("+++added.");
                        Task.Delay(500).Wait();
                    }
                }
                catch(OperationCanceledException)
                {
                    Console.WriteLine("Canceled");
                }
                
            });
            var producer2 = Task.Run(() =>
            {
                Task.Delay(2500).Wait();
                try
                {
                    while (value < 20)
                    {
                        priority.Add(value++ + 100, cts.Token);
                        Console.WriteLine("+++added pr.");
                        Task.Delay(500).Wait();
                    }
                }
                catch (OperationCanceledException)
                {
                    Console.WriteLine("Canceled");
                }

            });
            var consumer = Task.Run(() =>
            {
                try
                {
                    while (true)
                    {
                        //cts.Token.ThrowIfCancellationRequested();   recomment here for immediate cancel
                        _ = BlockingCollection<int>.TakeFromAny(list, out int val, cts.Token);
                        Console.WriteLine("---taked " + val.ToString());
                        Task.Delay(1000).Wait();
                    }
                }
                catch (OperationCanceledException)
                {
                    Console.WriteLine("Canceled");
                }
            });
            Console.ReadLine();
    }
}

Expected behavior

Take operation canceled if cancellationToken canceled and any of collection not empty

Actual behavior

Now Take operation do not canceled if cancellationToken canceled and any of collection not empty

Regression?

No response

Known Workarounds

No response

Configuration

No response

Other information

No response

Metadata

Metadata

Assignees

Labels

area-System.Collectionsbugin-prThere is an active PR which will close this issue when it is merged

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions