Code Monkey home page Code Monkey logo

open.channelextensions's People

Contributors

dependabot-preview[bot] avatar electricessence avatar jkamsker avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

open.channelextensions's Issues

Ambiguous ReadAllConcurrentlyAsync signatures, when using named arguments

Hi there,

please consider following snippet:

channel.ReadAllConcurrentlyAsync(maxConcurency: ?,
                                 receiver: ?,
                                 cancellationToken: ?)

This call will fail with CS0121.

Referenced methods:

Additionally, TaskReadAllConcurrentlyAsync is also affected.

Multi channel multi transformation - how to do it using ChannelExtensions

hi,

I didn't see any discussion options. Hence opening it as an issue. Primarily looking for suggestion on how to do this using this ChannelExtensions.

Currently I am using TPL with multiple bufferblock and the code was written almost 7 years back. So now I am trying to convert this to using channels and I cam across this library which seemed to simplify the task. But I am not very clear on how to use it as I have need multiple intermediate channels to process the whole pipeline.

  1. I have a datasource that I initially split into ranges. Range could be 1K, 2K, 5K etc depending on config.
  2. Then for each range, I extract the data which could run into 10K - 100K.
  3. I then want to push the above extract into smaller chunks (say 500 or 1000 or 2000) into another channel.
  4. These smaller chunks can be picked in batch and then be transformed before being written to final channel
  5. From the final channel, I may want to either write to a file or another target table or make another API call to push the data into different system.

The main thing that I cannot figure out is that 1 entry on 1st channel can generate 10K-100K records for 2nd channel. So when I use PipeAsync, how do I code for it as it seems to assume 1 input on source channel transformed to 1 output item. I dont want to write 100K rows as a single item on 2nd channel. So primary question is how do I achieve this? Any suggestions would be very helpful.

I was referring to examples from this repo and below link.
https://blog.maartenballiauw.be/post/2020/08/26/producer-consumer-pipelines-with-system-threading-channels.html

Thanks!

Batching with time limit to force data through on an interval

I'm trying to determine if this project can help me construct a channel that does the following:

  1. Batch item reading using the standard .Batch() method, but...
  2. If nothing is written to the channel after X ms, the batch is pushed thru anyway even if it doesn't meet the size requirement.

That way the consumer efficiently processes batches of data, but is not permitted to get stale if the flow slows down.

Or simply batch item reading based on a time interval alone, without any regard for size?
(i.e. read batches of items from the channel every 1 second assuming there is at least 1 item to read)

Any advice would be appreciated! Thanks

Note, I found this article that does exactly what I need, however it relies on an exception to pulse the read which is not great for tight intervals like 1 second or less.
[https://stackoverflow.com/questions/63881607/how-to-read-remaining-items-in-channel-less-than-batch-size-if-there-is-no-new]

Supporting .Net 6 and up

Is there a plan to target .Net 6 and remove the dependency on System.Threading.Channels 6.0.0, since frameworks .Net 6 and up ship with System.Threading.Channels?

Channels created by `Source` or `ToChannel` do not reach completion when canceled.

In the following code, reader.Completion task is never completed, resulting in an infinite await. I would expect the completion task to be canceled. Looking at the source code for Channels, calling TryComplete without reading the remaining items off the queue will not complete this task.

	var cts = new CancellationTokenSource();
	var reader = Enumerable.Range(0, 10_000).ToChannel(10, true, cts.Token);

	try
	{
		cts.Cancel();
		await reader.ReadAll(_ => {}, cts.Token);
	}
	catch (Exception)
	{
		// Catches the OperationCanceledException
	}
	finally
	{
		// Will await forever here
		await reader.Completion;
	}

Consider flush batch reader by timeout as well

Hello!
I found your extensions library quite useful and expressive!
I also think that BatchingChannelReader is lacking of feature to flush its buffer by timeout as well.
Could you please add it?

Support disposable messages

Hey,
My producer creates disposable messages that are consumed by readallasync.

Is it possible for you to dispose it automatically without needing me to do it manually?

Thanks!

IntelliSense not showing in VisualStudio

IntelliSense does not show up in VisualStudio (VS2022)
image

When renaming Documentation.xml to Open.ChannelExtensions.xml
in C:\Users\<account>\.nuget\packages\open.channelextensions\6.0.1\lib\netstandard2.1
it shows up
image

Question: Exception in Pipe breaks the channel?

I would like to clarify if following behaviour is by design or not.
Imagine that channel is created from AsyncEnumerable. If DoB() throws an exception, then all next produced values will only be handled in PipeAsync that calls DoA(). They will never reach PipeAsync with DoB() or ReadAllAsync()

await .....
.ToChannel()
.PipeAsync(async x =>
{
   return await DoA();
})
.PipeAsync(async x =>
{
   return await DoB();
})
.ReadAllAsync(x => 
{
   await Finish();
});

Am i supposed to try/catch all exceptions in Pipe methods and then return some faulty code that is handled by Filter method?

Conditional package reference to System.Threading.Channels NuGet

Since System.Threading.Channels is part of the framework since .Net Core 3, the NuGet reference can be made conditional to netstandard 2.0 and 2.1

<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0' or '$(TargetFramework)' == 'netstandard2.1'">
    <PackageReference Include="System.Threading.Channels" Version="8.*" />
</ItemGroup>

This way .net 6 and .net 8 versions of Open.ChannelExtensions won't have any dependencies.

As per Microsoft:
The System.Threading.Channels namespace provides a set of synchronization data structures for passing data between producers and consumers asynchronously. The library targets .NET Standard and works on all .NET implementations. This library is available in the System.Threading.Channels NuGet package. However, if you're using .NET Core 3.0 or later, the package is included as part of the framework.

And Steven Toub recommends using System.Threading.Channels that ships with framework, over the NuGet when possible, since the native version is optimized and performs better (paraphrasing). Direct quote below:

System.Threading.Channels is part of the .NET Core shared framework, meaning a .NET Core app can start using it without installing anything additional. It’s also available as a separate NuGet package, though the separate implementation doesn’t have all of the optimizations that built-in implementation has, in large part because the built-in implementation is able to take advantage of additional runtime and library support in .NET Core.
Source: https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/

Test involving a BatchingChannelReader with 'WithTimeout' integration keeps failing

I do not understand why this test involving a Channel Reader keeps reaching case 1: in the switch case and still has two elements in the batch. Any ideas ? Every time I reach batch.Should().HaveCount(1), the batch still has 2 items left. Same result whether I do Run Test or Debug Test so I am assuming it's not a timing issue ? I don't understand this (intern so quite new to all of these libraries).

Thanks.

[Fact]
public async Task BatchTimeoutIsReached()
{
    var c = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = false, SingleWriter = false });
    _ = Task.Run(async () =>
    {
        c.Writer.TryWrite(1);
        c.Writer.TryWrite(2);
        await Task.Delay(200);
        c.Writer.TryWrite(3);
        await Task.Delay(200);
        c.Writer.TryWrite(4);
        c.Writer.TryWrite(5);
        c.Writer.TryWrite(6);
    });

    using var tokenSource = new CancellationTokenSource();
    BatchingChannelReader<int, System.Collections.Generic.List<int>> batchReaderWithTimeout = c.Reader.Batch(2).WithTimeout(TimeSpan.FromMilliseconds(100));
    await batchReaderWithTimeout.ReadAllAsync(async (batch, i) =>
        {
            switch (i)
            {
                case 0:
                    batch.Should().HaveCount(2);
                    Assert.Equal(1, batch[0]);
                    Assert.Equal(2, batch[1]);
                    break;
                case 1:
                    batch.Should().HaveCount(1);
                    Assert.Equal(3, batch[0]);
                    break;
                case 2:
                    batch.Should().HaveCount(2);
                    Assert.Equal(4, batch[0]);
                    Assert.Equal(5, batch[1]);
                    break;
                case 3:
                    batch.Should().HaveCount(1);
                    Assert.Equal(6, batch[0]);
                    c.Writer.Complete();
                    break;
                default:
                    throw new Exception("Shouldn't arrive here.");
            }

            await Task.Delay(500);
        });
}

image

Why there is prefetching in ReadUntilCancelledAsync?

Current implementation of ReadUntilCancelledAsync which is widely used for implementing other methods like (PipeAsync etc) has some prefetching mechanism. It acquires next item before waiting for the current item to be completed. In cases than items aren't similiar in terms of processing complexity this ends up in blocking at the end of channel. There are free processors, but they do nothing as one of processor have two items active and just taken.

Current code:

		do
		{
			var next = new ValueTask();
			while (
				!cancellationToken.IsCancellationRequested
				&& reader.TryRead(out T? item))
			{
				await next.ConfigureAwait(false);
				next = receiver(item, index++);
			}
			await next.ConfigureAwait(false);
		}
		while (
			!cancellationToken.IsCancellationRequested
			&& await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));

I had simplified it to:

		do
		{
			while (
				!cancellationToken.IsCancellationRequested
				&& reader.TryRead(out T? item))
			{
				await receiver(item, index++).ConfigureAwait(false);
			}
		}
		while (
			!cancellationToken.IsCancellationRequested
			&& await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false));

Which solves tail blocking, and all tests are running perfectly as well. But I wonder if there are cases which this prefetching was designed to solve? (because it had introduced intentional complexity in the code)

BatchingChannelReader Timer ObjectDisposedException

Today I encountered the following exception:

System.ObjectDisposedException: Cannot access a disposed object.
   at System.Threading.TimerQueueTimer.Change(UInt32 dueTime, UInt32 period)
   at System.Threading.Timer.Change(Int64 dueTime, Int64 period)
   at Open.ChannelExtensions.BatchingChannelReader`1.TryPipeItems(Boolean flush)
   at Open.ChannelExtensions.BufferingChannelReader`2.TryRead(TOut& item)

I think this can occur because it's possible TryPipeItems retrieves the _timer value on thread A, the last item is pushed to the channel on thread B which disposes and nulls _timer, but then TryPipeItems on thread A still tries to use the value it previously loaded which has now been disposed.

ReadAllConcurrently completes before last task completes

var sourceFileChannel = ConsumeFilesToChannel(metaJson.Files);

var dlChannel = Channel.CreateBounded<FileMetaDataDto>(10);

var checkTask = sourceFileChannel.ReadAllConcurrently(maxConcurrency: 30, async file =>
{
    var filePath = Path.Combine(rootDirectory, file.Path);
    if (File.Exists(filePath))
    {
        var existingMd5 = await CalculateMd5Async(filePath);
        if (existingMd5 == file.Md5)
        {
            //Console.WriteLine($"Skipping {file.Path} because it already exists and has the same MD5");
            return;
        }
        else
        {
            // Write to DL channel
            await dlChannel.Writer.WriteAsync(file);
        }
    }
    else
    {
        // Write to to DL channel
        await dlChannel.Writer.WriteAsync(file);
    }
}).ContinueWith(async x =>
{

    // This is required since the ReadAllConcurrently completes before the last task completed
    await dlChannel.Writer.WaitToWriteAsync();
    await Task.Delay(TimeSpan.FromMilliseconds(100));
    // End
    dlChannel.Writer.Complete();
});

awaiting ReadAllConcurrently is appearantly not enougth to ensure all tasks it executed, completed.
Not doing this hack will result in the dlChannel complaining about already being closed when trying to write to it.

Unbatch / Split

Hello and first of all, awesome library. Making my life a lot easier already. But I seem to missing something. I batched results to perform database lookups efficiently (I don't want 500 db looksup, I want 5 queries that fetch 100 rows each). Now I want to split them back out. But I can't seem to find how to do this.

Am I missing or overlooking something? Is there a pattern to do it (return IAsyncEnumerable<T>)? Or is there no extension to easily do this?

Exception handling

I like the library a lot and I would like to use it in my projects. However I do have a small issue. When there is an unhandled exception inside .Transform method then the execution simply hangs. For example:

var range = Enumerable.Range(0, 10000);
var pipe = range.ToChannel().Transform(i =>
{
    if ((i + 1) % 100 == 0) throw new Exception();
    return i.ToString();
});

var result = pipe.ReadAll(i => { });

Now I do realize that exception should be handled with try/catch inside the .Transform however I do believe it should not hang if something unexpected happens. But perhaps I'm doing something wrong.

Different/Incorrect Behaviour in .NET Framework

The below example works "as expected" on netcoreapp2.1, net5.0 etc but hangs on .NET Framework (tested net472, net48).

"as expected" means items are streamed through the pipeline as soon as items are available i.e. StartProcessingTask2(...) returns immediately and the int values from queue go through ReadAll(IncrementCount2) as soon as the first int is available and before all ints have been added to queue.

When your nuget package is referenced by .NET Framework projects, StartProcessingTask2(...) does not return because queue.CompleteAdding() has not been called, your library is waiting for the IEnumerable given to .Source(source, true) to complete. If it's body is wrapped in a Task so that StartProcessingTask2(...) may return and subsequently queue can be populated, IncrementCount2(int c) is not called until all items from queue have been read completely into some intermediate buffer, only then will ReadAll(IncrementCount2) execute IncrementCount2.

When I cloned your code and referenced it as a project, it worked "as expected". Therefore I believe there is a bug/incompatibility with how msbuild builds this library for netstandard2.0, netstandard2.1, those dlls do not function correctly with .NET Framework.

I suggest/ask that you include .NET Framework specific dlls in the nuget package e.g. add dlls built specifically for 4.6.2, 4.7, 4.7.1, 4.7.2, 4.8.

Great ideas executed in this library, thank you!

Open.ChannelExtensions.Tests.zip

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using NUnit.Framework;

namespace Open.ChannelExtensions.Tests
{
        [Test]
        public void Example()
        {
            var queue = new BlockingCollection<int>();
            var processingTask = StartProcessingTask2(queue.GetConsumingEnumerable());

            for (var i = 0; i < 100000000; i++)
                queue.Add(i);

            queue.CompleteAdding();

            processingTask.Wait();
        }

        private int count_;

        private Task StartProcessingTask2(IEnumerable<int> source)
        {
            return Channel.CreateUnbounded<int>()
                .Source(source, true)
                .ReadAll(IncrementCount2)
                .AsTask();
        }

        private void IncrementCount2(int c)
        {
            Interlocked.Increment(ref count_);
        }
    }
}

Exception handling in pipeline

Im having issues with proper handling of exceptions thrown from pipeline steps.
In some scenarios exception is being swallowed instead of being propagated to caller.

From my observations, it seems that it's somehow related to the .Batch() step, also moment of throwing exceptions may have some meaning.

Am I doing something wrong? How it should be properly handled to propagate exception up?

using System.Threading.Channels;
using Open.ChannelExtensions;

var test = new Test();
try
{
    //await test.Scenario1();	//exception catched
    //await test.Scenario2();	//exception swallowed
    //await test.Scenario3();	//exception catched
    //await test.Scenario4();	//exception sometimes catched (~25% chance)
}
catch (Exception)
{
	Console.WriteLine("Got exception");
}


class Test
{
public async Task Scenario1()
{
	var channel = Channel.CreateBounded<int>(10000);

	for (int i = 0; i < 100; i++)
	{
		await channel.Writer.WriteAsync(i);
	}

	var task = channel.Reader.Pipe(1, (element) =>
		{
			throw new Exception();
			Console.WriteLine(element);
			return 1;
		})
		.Pipe(2, (evt) =>
		{
			Console.WriteLine("\t" + evt);
			return evt * 2;
		})
		//.Batch(20)
		.PipeAsync(1, async (evt) =>
		{
			Console.WriteLine("\t\t" + evt);
			return Task.FromResult(evt);

		})
		.ReadAll(task =>
		{
		});

	channel.Writer.TryComplete();
	await task;

	Console.WriteLine("end");
}

public async Task Scenario2()
{
	var channel = Channel.CreateBounded<int>(10000);

	for (int i = 0; i < 100; i++)
	{
		await channel.Writer.WriteAsync(i);
	}

	var task = channel.Reader.Pipe(1, (element) =>
		{
			throw new Exception();
			Console.WriteLine(element);
			return 1;
		})
		.Pipe(2, (evt) =>
		{
			Console.WriteLine("\t" + evt);
			return evt * 2;
		})
		.Batch(20)
		.PipeAsync(1, async (evt) =>
		{
			Console.WriteLine("\t\t" + evt);
			return Task.FromResult(evt);

		})
		.ReadAll(task =>
		{
		});

	channel.Writer.TryComplete();
	await task;
}

public async Task Scenario3()
{
    var channel = Channel.CreateBounded<int>(10000);

    for (int i = 0; i < 100; i++)
    {
	    await channel.Writer.WriteAsync(i);
    }

    var task = channel.Reader.Pipe(1, (element) =>
	    {
			if(element == 20)
		    throw new Exception();
		    Console.WriteLine(element);
		    return 1;
	    })
	    .Pipe(2, (evt) =>
	    {
		    Console.WriteLine("\t" + evt);
		    return evt * 2;
	    })
		//.Batch(20)
		.PipeAsync(1, async (evt) =>
		{
			Console.WriteLine("\t\t" + evt);
			return Task.FromResult(evt);

		})
		.ReadAll(task =>
	    {
	    });

	channel.Writer.TryComplete();
	await task;
}

public async Task Scenario4()
{
	var channel = Channel.CreateBounded<int>(10000);

	for (int i = 0; i < 100; i++)
	{
		await channel.Writer.WriteAsync(i);
	}

	var task = channel.Reader.Pipe(1, (element) =>
		{
			if (element == 20)
				throw new Exception();
			Console.WriteLine(element);
			return 1;
		})
		.Pipe(2, (evt) =>
		{
			Console.WriteLine("\t" + evt);
			return evt * 2;
		})
		.Batch(20)
		.PipeAsync(1, async (evt) =>
		{
			Console.WriteLine("\t\t" + evt);
			return Task.FromResult(evt);

		})
		.ReadAll(task =>
		{
		});

	channel.Writer.TryComplete();
	await task;
}
}

IAsyncEnumerable ToChannel Extension not visible in .NET 8 projects

Hi guys,

I have a problem upgrading from 6.x to 8.x, it appears that ToChannel extension is not visible anymore for IAsyncEnumerable<>

I downloaded the project and added a test method as I remarked that the test project is targeting .NET 8 to see whether the code would build or not

Result : I observe the same behavior, the following code does not build

image

Here is the code I added to BasicTests.cs

        [Theory]
	[InlineData(testSize1)]
	[InlineData(testSize2)]
	[SuppressMessage("Reliability", "CA2012:Use ValueTasks correctly", Justification = "Testing only.")]
	[SuppressMessage("CodeQuality", "IDE0079:Remove unnecessary suppression", Justification = "<Pending>")]
	public static async Task ToChannelTest(int testSize)
	{
		var range = CreateAsyncEnumerableRange(testSize);
		var result = new List<int>(testSize);

		var startedAt = TimeProvider.System.GetTimestamp();
		ChannelReader<int> reader = range
			.ToChannel(singleReader: true, deferredExecution: true);

		_ = reader.ReadAll(i => result.Add(i), true);

		await reader.Completion;
		var elapsed = TimeProvider.System.GetElapsedTime(startedAt);
	}

	static async IAsyncEnumerable<int> CreateAsyncEnumerableRange(int testSize)
	{
		foreach (var i in Enumerable.Range(0, testSize))
			yield return i;
		await Task.CompletedTask;
	}

ForceBatch method doesn't complete batch immediately

I got the same behaviour described here with the .ForceBatch():
it doesn't complete batch immediately when called, but when the next item is written.
I couldn't find tests of this method to refer how it is supposed to be used, so I decided it is wrong.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.