Code Monkey home page Code Monkey logo

alpakka's Introduction

Alpakka: Akka Streams Connectors

Gitter

https://alpakka.getakka.net/

This project provides a home to Akka Streams connectors to various technologies, protocols or libraries.

Contributions

Contributions are welcome, see CONTRIBUTING.md

Caveat Emptor

A component in this project does not have to obey the rule of staying binary compatible between releases. Breaking API changes may be introduced without notice as we refine and simplify based on your feedback. A module may be dropped in any release without prior deprecation.

Running the Test Suite

As of August 2020, the Alpakka repository structure and projects has been consolidated and modernized into a single solution file which affects how test projects are run. Please check the test documentation for more information.

alpakka's People

Contributors

aaronontheweb avatar alexvaluyskiy avatar andresteenbergen avatar anthony-steele-cko avatar arkatufus avatar bobanco avatar danthar avatar dependabot-preview[bot] avatar dependabot[bot] avatar eaba avatar ericphan avatar horusiath avatar igorfedchenko avatar ismaelhamed avatar marcpiechura avatar ptjhuang avatar rosskuehl avatar seanfarrow avatar to11mtm avatar vasily-kirichenko avatar vkulikov avatar yifeis7 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

alpakka's Issues

Add debug logging to Kinesis Stages

In order to make it easier to debug connectivity issues with AWS Kinesis, we should add some debug logging in these areas:

_iterator = s.Response.NextShardIterator;
}
if (records.Count > 0)
{
foreach (var record in records)
_buffer.Enqueue(record);
_actor.Become(Ready);
_self.Tell(Pump.Instance);
}

Some pseudo code:

if(LogLevel.Debug.Enabled)
   Log.Debug("No messages found in Kinesis shard {0} retrying in {1} seconds", etc...);

Akka.Streams.Azure.EventHub.V5 batching processing not working as expected

Version Information
NuGet versions
Akka, version 1.5.0
Akka.Streams.Azure.EventHub.V5, version 1.5.0.1

Describe the bug
Batched Event Hub processing not returning the number of events as expected, as in the total number of events (e.g. running single or multi processor samples in V5 Event Hub samples).
For example, ingesting 10000 events may yield 9000 ish.

Running samples mentioned above with non-batched processing works just as expected, as does a non-Akka.NET implementation (such as this).

All in all, the BatchedEventProcessorClient does not work as expected.

Been playing around with various batch setting (i.e. primarily batch sizing through EventProcessorClientOptions.CacheEventCount but to no avail.
If I set ingest batch size to something really low (such as 4, 10, 20) seemingly works better (less events lost), but with a batch size of 100-1000 the loss rate is 10%+.

To Reproduce
Steps to reproduce the behavior:

  1. Run sample in batched mode, single or multi processor.
  2. Ingest events using batches (e.g. 1000 messages a batch) to Event Hub
  3. Observe and compare ingested data with processed data (e.g. ConcurrentDictionary and counts by Event Hub partition, persist to file, whatnot) - expectation being that number of processed events = number of ingested events.

Links to working reproductions on Github / Gitlab are very much appreciated

Expected behavior
Regardless of the exact number of messages returned by each Event Hub batch, I expect the total number of egressed events to correspond to the number of ingested events.

Actual behavior
Total egressed and processed events are significantly lower compared to the number of send events.
Absolute data loss is less if ingest rate is lowered.

Can't seem to connect to ActiveMQ

I guess there are differences between RabbitMQ and ActiveMQ. I am getting RabbitMQ.Client.Exceptions.BrokerUnreachableException I know the ActiveMQ broker is up and accepting AMQP messages.

Could this be an issue RabbitMQ is 0.9 AMQP and activeMQ is 1.0 AMQP?

Using AmqpConnectionDetails with SSL enabled always results with "localhost" endpoint

The following connection details:

let settings =
        AmqpConnectionDetails.Create("host22", 5671)
            .WithAutomaticRecoveryEnabled(true)
            .WithNetworkRecoveryInterval(TimeSpan.FromSeconds 1.0)
            .WithCredentials(AmqpCredentials.Create("foo", "1"))
            .WithSsl(SslOption("host22", enabled = true))

results with a ConnectionFactory, which's Endpoint is localhost.

A workaround: use AmqpConnectionUri.Create("amqps://foo:1@host22:5671") instead.

WindowsAzure.Storage nuget package is deprecated

We need to modernize the WindowsAzure.Storage package to the new Azure.Storage.Queues:

From the package description:
NOTE: As of version 9.4.0, this library has been split into multiple parts and replaced: See https://www.nuget.org/packages/Microsoft.Azure.Storage.Blob/, https://www.nuget.org/packages/Microsoft.Azure.Storage.File/, https://www.nuget.org/packages/Microsoft.Azure.Storage.Queue/, and https://www.nuget.org/packages/Microsoft.Azure.Storage.Common/.
For table support, see https://www.nuget.org/packages/Microsoft.Azure.CosmosDB.Table/.
This client library enables working with the Microsoft Azure storage services which include the blob and file service for storing binary and text data, the table service for storing structured non-relational data, and the queue service for storing messages that may be accessed by a client.
For this release see notes - https://github.com/Azure/azure-storage-net/blob/master/README.md and https://github.com/Azure/azure-storage-net/blob/master/changelog.txt
Microsoft Azure Storage team's blog - http://blogs.msdn.com/b/windowsazurestorage/

Akka.Streams.EventHubs uses a deprecated Azure SDK (WindowsAzure.ServiceBus 6.2.0)

Version Information
Version of Akka.NET? N/A
Which Akka.NET Modules? Akka.Streams.EventHubs

Describe the bug
It's not actually a bug but a maintenance request as the Akka.Streams.EventHubs package is using SDK WindowsAzure.ServiceBus 6.2.0. In August 2017 a new SDK version was released by Microsoft to replace this SDK: Microsoft.Azure.ServiceBus. This SDK is however also already replaced by Azure.Messaging.EventHubs since October 2020.

Expected behavior
Upgrade to the latest version of Azure.Messaging.EventHubs SDK

QueueSource does not remove processed messages from the Azure Storage Queue

Version Information
Version of Akka.NET? 1.4.40
Which Akka.NET Modules? Akka.Streams.Azure.StorageQueue

Describe the bug
QueueSource does not remove processed messages from the Azure Storage Queue.

To Reproduce
Steps to reproduce the behavior:

  1. Set up an Actor that consumes from an Azure Storage Queue Stream, like so:
protected override void PreStart()
        {
            QueueSource.Create(Queue)
                .RunWith(Sink.ActorRef<QueueMessage>(Self, Done.Instance), Context.Materializer());
        }
  1. Let the actor run. Notice how, after the default VisibilityTimeout has elapsed, the Actor receives the Same StorageQueueMessage again.

Links to working reproductions on Github / Gitlab are very much appreciated
I was able to replicate this by modifying the A_QueueSource_should_poll_for_messages_if_the_queue_is_empty test that is found here:
If you modify the QueueCreate instruction to specify a short (5s) VisibilityTimeout, like so:"
var probe = QueueSource.Create(Queue, pollInterval: TimeSpan.FromSeconds(1), options: new GetRequestOptions(TimeSpan.FromSeconds(5)))

Then extend the timeout for which the test Expects No Message, like so:

probe.Request(2)
                .ExpectNext("Test1")
                .ExpectNoMsg(TimeSpan.FromSeconds(10));

Then you see the test failing with the error that a message was found when none was expected. This is because after 5 seconds, the originally processed "Test1" message is now visible again, and then gets returned by the QueueSource again.

Expected behavior
After an Azure Storage Queue message has been delivered to the Sink, it should be removed from the Azure Storage Queue.

Actual behavior
The delivered Azure Storage Queue message remains on the Azure Queue after being delivered to the configured Stream Sink.

Screenshots
None.

Environment
Windows 11 Visual Studio 2022

Additional context
Add any other context about the problem here.

[Overhaul] Missing items in current Alpakka repo

These items are currently missing from Alpakka:

  • Amqp documentation
  • Amqp samples
  • Amqp V1 documentation
  • Amqp V1 samples
  • Akka.Streams.Azure.ServiceBus.Tests
  • Akka.Streams.Azure.EventHub.Tests
  • Akka.Streams.Azure.StorageQueue documentation
  • Csv documentation
  • Csv samples
  • Akka.Streams.SignalR.Tests
  • Xml samples

AMQP Source manual ack

Current implementation acks messages as soon as it pushed them downstream, which is not what I want. I need to be on control of acking each message manually, in some of downstream stages or outside the stream (e.g. in an independent actor):

https://github.com/AkkaNetContrib/Alpakka/blob/1a6a51c562e44143d0eb0f1cb175316494ed0d6a/Amqp/src/Akka.Streams.Amqp/AmqpSourceStage.cs#L136

Scala Alpakka was added CommittableMessage more that a year ago in akka/alpakka@d478a61, see discussion akka/alpakka#97

https://github.com/akka/alpakka/blob/453809a489b37963dc0395ab37db20fd2963a3fe/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSourceStage.scala#L95-L109

AMQP Sink does not recover after RabbitMQ server restart

Run the following code

let system = ActorSystem.Create("test", ConfigurationFactory.Default())
    let mat = ActorMaterializer.Create(system, ActorMaterializerSettings.Create(system).WithSupervisionStrategy(Deciders.ResumingDecider))
    
    let connSettings = 
        AmqpConnectionDetails
            .Create("localhost", 5672)
            .WithAutomaticRecoveryEnabled(true)
            .WithNetworkRecoveryInterval(TimeSpan.FromSeconds 1.0)
            
    let queueName = "my-queue"
    let queueDeclaration = QueueDeclaration.Create(queueName).WithDurable(true).WithAutoDelete(false)
    let sink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(connSettings).WithRoutingKey("my-queue").WithDeclarations(queueDeclaration))
    
    Source
        .From(seq { 1..1_000_000 })
        .Select(fun x -> x |> string |> ByteString.FromString)
        .To(sink)
        .Run(mat)
    |> ignore
    
    system.WhenTerminated.Wait()

Everything is OK, messages are being queued at ~20K/second rate.
Now restart RabbitMQ server. The following error is in the log

[ERROR][17-Jan-18 19:06:48][Thread 0012][[akka://test/user/StreamSupervisor-0/Flow-0-1-AmqpSink#1922506265]] Error in stage [AmqpSink]: Unable to write data to the transport connection: An existing connection was forcibly closed by the remote host.
Cause: System.IO.IOException: Unable to write data to the transport connection: An existing connection was forcibly closed by the remote host. ---> System.Net.Sockets.SocketException: An existing connection was forcibly closed by the remote host
   at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
   --- End of inner exception stack trace ---
   at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
   at System.IO.BufferedStream.Flush()
   at RabbitMQ.Client.Impl.SocketFrameHandler.WriteFrameSet(IList`1 frames)
   at RabbitMQ.Client.Impl.Command.TransmitAsFrameSet(Int32 channelNumber, Connection connection)
   at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
   at RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, Byte[] body)
   at RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, Byte[] body)
   at Akka.Streams.Amqp.AmqpSinkStage.AmqpSinkStageLogic.<.ctor>b__3_0()
   at Akka.Streams.Implementation.Fusing.GraphInterpreter.ProcessEvent(Connection connection)
   at Akka.Streams.Implementation.Fusing.GraphInterpreter.Execute(Int32 eventLimit)

and the messages are not queued anymore.

I thought that WithAutomaticRecoveryEnabled(true) reconnects automatically, and WithSupervisionStrategy(Deciders.ResumingDecider) resumes the failing stream stage. As a result, I expected messages to continue being queued after server restart (and a single message to be lost).

Akka.Streams.Channels ChannelSource is failing in the middle of processing with TaskCanceledException

Version Information
Version of Akka.NET? 1.4.38
Which Akka.NET Modules? Akka.Streams 1.4.38, Akka.Streams.Channels 1.0.0-beta9

Describe the bug
I noticed strange behavior during working with Akka.Streams.Channels ChannelSource. I decided to use Channels as my backing store for some objects. I created ChannelSource to read these data in batches and send them to Azure Service Bus. Everything was working pretty well if I have around 1000 items stored in Channel. In case of the number increased I noticed that ChannelSource stopped processing data after some time. After some investigation, I noticed that ChannelSource is failing with TaskCanceledException which is raised with FailStage here - ChannelReaderSource.cs

To Reproduce
Steps to reproduce the behavior:

  1. Create new channel.
  2. Create simple ChannelSource with Sink.Ignore.
  3. Start writing items into Channel (it has to be at least 50 000 items).
  4. ChannelSource is failing after some time.

Here is the link for simple repo which is on my machine failing almost every time - https://github.com/pavoldecky/AkkaStreamChannel/blob/main/AkkaStreamChannel/Program.cs

image

Expected behavior
First of all, I am not sure if ChannelSource is intended to use a huge amount of items. I try to play with Channels Reader and Writer and I was able to process much more data as it is failing with ChannelSource. So I thought that ChannelSource should be able o handle the same amount of data.

Actual behavior
So after some more investigation, I noticed that problem could be with the continuation ValueTask in OnPull method

public override void OnPull()
           {
               if (_reader.TryRead(out var element))
               {
                   Push(_outlet, element);
               }
               else
               {
                   var continuation = _reader.WaitToReadAsync();
                   if (continuation.IsCompletedSuccessfully)
                   {
                       var dataAvailable = continuation.GetAwaiter().GetResult();
                       if (dataAvailable && _reader.TryRead(out element))
                           Push(_outlet, element);
                       else
                           CompleteStage();
                   }
                   else if (!continuation.IsCompleted)
                       continuation.AsTask().ContinueWith(t =>
                       {
                           if (t.IsFaulted) _onValueReadFailure(t.Exception);
                           else if (t.IsCanceled) _onValueReadFailure(new TaskCanceledException(t));
                           else _onValueRead(t.Result);
                       });
                   else if (continuation.IsFaulted)
                       FailStage(continuation.AsTask().Exception);
                   else
                       FailStage(new TaskCanceledException(continuation.AsTask()));
               }
           }

so the issue occurs when _reader.TryRead returns false and we are calling _reader.WaitToReadAsync(). In some cases, code execution ends up in last else. But when I check the status of continuation ValueTask in this case, this value task is CompletedSuccesfully. So could be the problem that continuation is completed with some delay and because of that it is not handled by first if (continuation.IsCompletedSuccessfully) ?

I tried to do some changes and it is working for my case, but as a user with limited knowledge of ValueTasks and Akka.Streams I would expect that this is not exactly what is needed :)

Here are my changes.

public override void OnPull()
       {
           if (_reader.TryRead(out var element))
           {
               Push(_outlet, element);
           }
           else
           {
               var task = _reader.WaitToReadAsync().AsTask();

               if (task.IsCompletedSuccessfully)
               {
                   var dataAvailable = task.GetAwaiter().GetResult();
                   if (dataAvailable && _reader.TryRead(out element))
                   {
                       Push(_outlet, element);
                   }
                   else
                   {
                       CompleteStage();
                   }
               }
               else if (task.IsFaulted)
               {
                   FailStage(task.Exception);
               }
               else
               {
                   task.ContinueWith(result =>
                   {
                       if (result.IsCanceled)
                       {
                           _onValueReadFailure(new TaskCanceledException(result));
                       }
                       else if (result.IsFaulted)
                       {
                           _onValueReadFailure(result.Exception);
                       }
                       else
                       {
                           _onValueRead(result.Result);
                       }
                   });
               }
      }

Environment
Windows 11, .NET 6.0.300

SQS example is incorrect

The demo example in the README here is not correct, does not compile, beccause

_materializer is not defined, it is probably meant to be materializer.

.Run(materializer) does not return something that can be await ed.

I can't tell as a newcomer how to fix this, which is a blocker to trying to use this code.

Use Akka.net as SignalR Backplane

Is your feature request related to a problem? Please describe.
I would like to use AKKA.net as SignalR Backplane, likewise REDIS.

Describe the solution you'd like
The idea is to create a cluster of ASPNET sites and stream SignalR message from clients connected to one cluster node to all other clients connected to other nodes.

Describe alternatives you've considered
I'm using StreamConnection has SignaR Hub on each node, then I send received message to a ReceiveActor, but I cannot resend messagges back to SignalR clients from the ReceiveActor

Additional context

NullReferenceException - when using Akka.Net v1.5.12 and Akka.Streams.Amqp.RabbitMq v1.5.8

Version Information
Akka.Net v1.5.12
Akka.Hosting v1.5.12
Akka.Streams v1.5.12
Akka.Streams.Amqp.RabbitMq v1.5.8
Akka.Cluster.Hosting v1.5.12

Describe the bug
Both Producer and Consumer of the Sample are throwing NullReferenceException.

To Reproduce
Steps to reproduce the behavior:

  1. Checkout master from
    https://github.com/petabridge/akkadotnet-code-samples/tree/master/src/reliability/rabbitmq-backpressure

  2. Update NugetDependencies in the project: ReliableRabbitMQ.Shared

<PropertyGroup>
    <TargetFramework>$(NetRuntime)</TargetFramework>
    <ImplicitUsings>enable</ImplicitUsings>
    <Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
    <PackageReference Include="Akka" Version="1.5.12" />
    <PackageReference Include="Akka.Hosting" Version="1.5.12" />
    <PackageReference Include="Akka.Streams" Version="1.5.12" />
    <PackageReference Include="Akka.Streams.Amqp.RabbitMq" Version="1.5.8" />
    <PackageReference Include="Akka.Cluster.Hosting" Version="1.5.12" />
</ItemGroup>
  1. Run both ReliableRabbitMQ.Consumer and ReliableRabbitMQ.Producer

  2. See the error in console.

Expected behavior
Both ReliableRabbitMQ.Consumer and ReliableRabbitMQ.Producer are working without an error.

Actual behavior
Both ReliableRabbitMQ.Consumer and ReliableRabbitMQ.Producer console apps are throwing NullReferenceException.

-------------------------------- Producer --------------------------------
[ERROR][08. 07. 2023 12:36:44.268Z][Thread 0014][[akka://RabbitMQProducer/user/amqp-producer/StreamSupervisor-1/Flow-1-2-AmqpSink#881794866]] Error during PreStart in [AmqpSink]
Cause: RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
---> System.AggregateException: One or more errors occurred. (Object reference not set to an instance of an object.)
---> System.NullReferenceException: Object reference not set to an instance of an object.
at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func2 socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout) at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint) at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func2 selector)
--- End of inner exception stack trace ---
at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func2 selector) at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints) at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName) --- End of inner exception stack trace --- at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName) at Akka.Streams.Amqp.RabbitMq.AmqpConnectorLogic.PreStart() --- End of stack trace from previous location --- at Akka.Actor.PipeToSupport.PipeTo(Task taskToPipe, ICanTell recipient, Boolean useConfigureAwait, IActorRef sender, Func1 success, Func`2 failure)
at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)

-------------------------------- Consumer --------------------------------
[ERROR][08. 07. 2023 12:36:45.507Z][Thread 0014][[akka://RabbitConsumer/user/amqp-consumer/amqp-consumer/StreamSupervisor-1/Flow-0-0-unknown-operation#844566059]] Error during PreStart in [AmqpSource]
Cause: RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
---> System.AggregateException: One or more errors occurred. (Object reference not set to an instance of an object.)
---> System.NullReferenceException: Object reference not set to an instance of an object.
at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func2 socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout) at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint) at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func2 selector)
--- End of inner exception stack trace ---
at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints)
at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
--- End of inner exception stack trace ---
at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
at Akka.Streams.Amqp.RabbitMq.AmqpConnectorLogic.PreStart()
at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)

Screenshots

Environment
Windows 10 Pro 22H2,
RabbitMQ 3.11.10 on Erlang 25.3 running in Docker,
.NET 6

Relation between Akka.NET and Petabridge

I want to make a statement here.
The Akka.NET project and organization is a separate entity from Aarons Petabridge.
I personally love what you @Aaronontheweb are doing and I am impressed with the success you have had.

But, when things like this The Petabridge subscription does not cover support for these modules sneak into the documentation and organizational repos, then the boundaries have become too much blurred.

Same rules must apply to everyone, if anyone else wants to run a business around Akka.NET, where do we draw the line whos business get exposed in the internal information?
(I have no intention on doing so, so it's not that, it's about principles, open, free, same rules for everyone)

I see no issues with linking to blogs or articles from the docs, but when someones personal support license pops up in here, then there is a problem with ownership and perception from the community.

Amqp Connector don't work with RestartSource

HI guys!
I am trying to create a Reconnectable Source with RestartSource using Akka Streams Amqp.V1 and some strange things happen. When I disconnect the network, RestartSource does not try to reconnect again and no message is printed in the logs. However, when I receive an invalid message (type incompatible) it restarts my source. I am not sure why this is so and I would like to know if you have already been there and if you could help me. Below is the Source creation code:

using System.Numerics;
using Akka.Serialization;
using Akka.Streams.Amqp.V1.Dsl;
using Akka.Streams.Dsl;
using Amqp;
using System;
using Akka.Actor;
using Address = Amqp.Address;
using Akka.Streams;
using Akka.Streams.Amqp.V1;
using System.Text;

namespace image_processor_akkanet
{
    class Program
    {
        private static Connection connection;
        private static Session session;

        static void Main(string[] args)
        {
            var sys = ActorSystem.Create("AMQP-System");
            var materializer = ActorMaterializer.Create(sys);
            var serialization = sys.Serialization;
            var serializer = serialization.FindSerializerForType(typeof(byte[]));

            try
            {
                var address = new Address("0.0.0.0", 61616, "admin", "admin", scheme: "AMQP");
                connection = new Connection(address);
                session = new Session(connection);

                var queueName = "akka.teste";
                var receiverlinkName = "amqp-conn-test-sender";

                //create source
                var amqpSource = RestartSource.OnFailuresWithBackoff(
                    () => {
                        Console.WriteLine("Start/Restart...");
                        return AmqpSource
                          .Create(new NamedQueueSourceSettings<byte[]>(session, receiverlinkName, queueName, 200, serializer));
                    },
                    TimeSpan.FromSeconds(1),
                    TimeSpan.FromSeconds(3),
                    0.2,
                    Int16.MaxValue
                );

                //run source
                var result = amqpSource
                    .Throttle(1, TimeSpan.FromSeconds(1), 10, ThrottleMode.Shaping)
                    .Select(elem => Encoding.UTF8.GetString(elem, 0, elem.Length))
                    .RunForeach(elem => Console.WriteLine(elem), materializer);

                result.Wait(TimeSpan.FromSeconds(Int16.MaxValue));
            }
            catch (Exception ex)
            {
                Console.WriteLine("Error " + ex.Message);
            }
            finally
            {
                session.Close();
                connection.Close();
            }
        }
    }
}

I tried using both RestartSource.OnFailuresWithBackoff and RestartSource.WithBackoff both to no avail

My project and dependencies:

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>netcoreapp3.1</TargetFramework>
    <RootNamespace>image_processor_akkanet</RootNamespace>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="Akka" Version="1.4.11" />
    <PackageReference Include="Akka.Streams" Version="1.4.11" />
    <PackageReference Include="Akka.Streams.Amqp.V1" Version="1.0.0-beta2" />
  </ItemGroup>

</Project>

I'm using ActiveMQ Artemis as a AMQP Broker.

AMQP only works in localhost?

Hello, I am trying to connect a Broker that is not available for localhost. But this example shows how to use NamedQueueSourceSettings.Create (DefaultAmqpConnection.Instance, queueName) .WithDeclarations (queueDeclaration) where DefaultAmqpConnection.Instance implies a local host with no authentication.
Can someone help me?
I would also like to know about the status of this project. Is he active?

Thanks

Add nats-net-v2 Connectors

Is your feature request related to a problem? Please describe.
I think it would be awesome if the community had a set of Alpakka connectors for nats-net-v2.

Describe the solution you'd like
That we do the needful. I've done some basic scaffolding. General idea being with a clean starting API we can then optimize with a more native stage (unless akkadotnet/akka.net#7028 stalls, in that case maybe we do up front?)

I suppose my ask is ideally "let's get some feedback on what sort of API we think is useful" but will be happy with "sure do the PR and if you take the feedback we will approve".

Describe alternatives you've considered
Per above, we could get fancier with stages....
Inversely speaking, one can see there's nothing fancy in my gist and one could just party on Unfold easily enough, however there's something to be said for providing a polite API for others.

Additional context

Per above I'm happy to own this and do the work :)

I brought this idea up to the Nats .Net folks and they were as always gracious and supportive, It sounds like they may be willing to provide feedback or other input but I will leave those comments via other channels.

Also Note, not sure if a concern: This would be a Net6.0+ only connector, as that is the baseline for the underlying library we are exposing via Alpakka.

Kafka with Azure Event Hub

I'm trying to find some example of using Akka.NET Streams to consume Kafka events from the Azure Event Hub. It seems to me that Kafka support does not support the authentication required to connect to Azure Hub, but I may be mistaken. If anyone has an example I would appreciate it very much.

I'm not actually a .Net developer by nature. In Java or Scala using Akka Streams I would configure my application like this:

akka {
  kafka.producer {
    #Akka kafka producer properties can be defined here
    client.id = "proxy-client"


    # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
    # can be defined in this configuration section.
    kafka-clients {
      bootstrap.servers = "my-kafka.servicebus.windows.net:9093"
      sasl.mechanism = PLAIN
      security.protocol = SASL_SSL
      sasl.jaas.config = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://my-kafka.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={key}\";"
    }
  }

On the Gitter channel, Aaron told me that you support these options so it must be the case that you just find them. So an example would be welcome.

Brainstorm/ Discussion: Kafka as "persistent" source

Kafka is a persistent commit log. Because of the persistent nature of Kafka we can use kafka for event sourcing. Here is a nice article about the subject https://dzone.com/articles/event-sourcing-with-kafka. This article describes RocksDb for state.

I think Kafka and Akka could have great synergy. There is just one catch, which is already mentioned in the article as well. Because Kafka uses files on disc to store the event stream it is considered not wise to use topic per entity, unless you have little entities in the system, but you would also need some administration of what entities exist (is isn't that hard, as we can get info using metadata).

Examples from other github sources actually have 1 topic per actor, java/scala example: https://github.com/krasserm/akka-persistence-kafka there is an EventTopicMapper but I can't really see if it can be used for many entities into 1 topic, or if it meant to do that altogether.

I think it is not smart to make kafka a source for a persistent actor at this moment as you will need as much consumers as number of actors which are restoring state. Kafka topics are not meant to be read from the beginning over and over again from a performance point.

To not get all to abstract here, a simple Event Sourced system could be a Users topic. We could define UserCreated, UserDisplaynameChanged, UserLoggedIn, UserLoggedOut. I could see we have 1 UsersTopic. Let's make the Key a Guid (we all love Guid don't we ;))

What we would need:

  • mapping topic to topic coordinator, in our case UserCoordinator?
  • partitions mapping, what set of user are part of a particular topic
  • trigger when an actor has caught up in the topic it is in, presuming multiple partitions some actors would be synched sooner then others.
  • I think the system should stash commands as long as the actor is still being build

I don't think we need to worry a lot about other actors producing in the the topics out subsystem is subscribing, this is up the the developer and has the same issues when someone has persistent actors with overlapping persistenceids, system behaviour could be real strange, but as long as consumers keep subcribed to the topic, it will be a lot more robust then current persistent actors, as they stop the subscription (most implementations anyway) ...

Kafka can perform at great speeds, I think creating snapshots could be done, but snapshots for multiple actors in a topic could be quite hard to do, so scope of this discussion should be events/ journal only at this time.

I have already build actors based on topics, but I have created this "issue" for a discussion for an generic solution, we could provide others in this repo, or anywhere else.

My thoughts how to do this with the simple users topic. I created this discussion because I would like to see what others think before missing important Akka features/ Kafka features etc.

  • implementation should be based on a BaseClass (KafkaTopicEntityCoordinator), it would need a way to create child actors Funct<byte[], Props> maybe?, transform key into IEquatable, I guess Func<T, IComparable> could do the trick.
  • abstract method Synced we can Become
  • message Key can be used to create a child actorname, simply to create string from byte[] (ansi encoding)
  • The BaseClass should have a mapping which Dictionary<topic, HAshSet> to tell all partition dependant actors the topic reached EOF, all actors we get trigger to Become(Synced).
  • When all topics reach the end the coordinator can cal Become(Synced)
  • When Synced all actors can tell the Producer associated with the Coordinator baseclass new events to persist.

Based on my first thoughts on this I guess we need the PartititionEOF from Kafka as an event as well in the SourceStage.

RabbitMQ docker container doesn't work in Azure Pipeline Linux container

There are several anomalies related with Akka.Streams.AMQP.V1 and the RabbitMQ docker container running under the ubuntu-18.04 Azure Pipelines image.

For the V1.0 plugin, our code could not establish any AMQP handshake with the container:

Akka.Streams.Amqp.V1.Tests.AmqpConnectorsTest.Publish_and_consume_elements_through_a_simple_queue_again_in_the_same_process [FAIL]
       System.OperationCanceledException : The transport 'TcpTransport' is closed.
       Stack Trace:
            at Amqp.Framing.Reader.ReadBuffer(ITransport transport, Byte[] buffer, Int32 offset, Int32 count)
            at Amqp.Framing.Reader.ReadHeader(ITransport transport)
            at Amqp.Sasl.SaslProfile.Open(String hostname, ITransport transport)
            at Amqp.Connection.Connect(SaslProfile saslProfile, Open open)
            at Amqp.Connection..ctor(Address address, IHandler handler)
            at Amqp.Connection..ctor(Address address)
         /home/vsts/work/1/s/src/Amqp/Akka.Streams.Amqp.Tests/V1/AmqpConnectorsTest.cs(42,0): at Akka.Streams.Amqp.V1.Tests.AmqpConnectorsTest.Publish_and_consume_elements_through_a_simple_queue_again_in_the_same_process()
         --- End of stack trace from previous location where exception was thrown ---
       Output:
         [07:51:11.297] SEND AMQP 3 1 0 0
         [DEBUG][08/11/2020 19:51:11][Thread 0016][ActorSystem(test)] System shutdown initiated
         [DEBUG][08/11/2020 19:51:11][Thread 0015][EventStream] Shutting down: StandardOutLogger started
         [DEBUG][08/11/2020 19:51:11][Thread 0015][EventStream] All default loggers stopped

It appears from the frame log that the broker immediately closes the TCP socket as soon as the client sends a SEND AMQP 3 1 0 0 to start a handshake, and refuses any connection attempts after that.

For the standard AMQP protocol, the broker seemed to suddenly drops TCP connection on specific tests:

X Akka.Streams.Amqp.Tests.AmqpConnectorsTest.Pub_sub_from_one_source_with_multiple_sinks [5s 120ms]
   Error Message:
    System.AggregateException : One or more errors occurred. (Did not get at least one element from every fanout branch)
 ---- System.Exception : Did not get at least one element from every fanout branch
   Stack Trace:
      at System.Threading.Tasks.Task`1.GetResultCore(Boolean waitCompletionNotification)
    at System.Threading.Tasks.Task`1.get_Result()
    at Akka.Streams.Amqp.Tests.AmqpConnectorsTest.Pub_sub_from_one_source_with_multiple_sinks() in /home/vsts/work/1/s/src/Amqp/Akka.Streams.Amqp.Tests/AmqpConnectorsTest.cs:line 277
 ----- Inner Stack Trace -----
   Standard Output Messages:
  [ERROR][08/11/2020 19:51:12][Thread 0015][[akka://test/user/StreamSupervisor-2/Flow-0-0-unknown-operation#2121431516]] Error during PreStart in [AmqpSource]
  Cause: RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
   ---> System.IO.IOException: Unable to write data to the transport connection: Cannot access a disposed object.
  Object name: 'System.Net.Sockets.Socket'..
   ---> System.ObjectDisposedException: Cannot access a disposed object.
  Object name: 'System.Net.Sockets.Socket'.
     at System.Net.Sockets.Socket.Send(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags, SocketError& errorCode)
     at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
     --- End of inner exception stack trace ---
     at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
     at System.IO.BufferedStream.Flush()
     at System.IO.BinaryWriter.Flush()
     at RabbitMQ.Client.Impl.SocketFrameHandler.SendHeader()
     at RabbitMQ.Client.Framing.Impl.Connection.StartAndTune()
     at RabbitMQ.Client.Framing.Impl.Connection.Open(Boolean insist)
     at RabbitMQ.Client.Framing.Impl.Connection..ctor(IConnectionFactory factory, Boolean insist, IFrameHandler frameHandler, String clientProvidedName)
     at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IFrameHandler fh)
     at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints)
     at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
     --- End of inner exception stack trace ---
     at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
     at RabbitMQ.Client.ConnectionFactory.CreateConnection(IList`1 endpoints)
     at Akka.Streams.Amqp.AmqpConnector.NewConnection(IConnectionFactory factory, IAmqpConnectionSettings settings) in /home/vsts/work/1/s/src/Amqp/Akka.Streams.Amqp/AmqpConnector.cs:line 71
     at Akka.Streams.Amqp.AmqpSourceStage.AmqpSourceStageLogic.NewConnection(IConnectionFactory factory, IAmqpConnectionSettings settings) in /home/vsts/work/1/s/src/Amqp/Akka.Streams.Amqp/AmqpSourceStage.cs:line 75
     at Akka.Streams.Amqp.AmqpConnectorLogic.PreStart() in /home/vsts/work/1/s/src/Amqp/Akka.Streams.Amqp/AmqpConnector.cs:line 106
     at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)

Please indicate that the Azure Storage Queue Stream consumer is to delete consumed messages from the Azure Queue

Please describe what you are trying to understand
In trying to build my first Actor that sets up a Stream over Azure Storage Queue, I was stuck for a few hours trying to figure out why the Stream keeps returning the same message over and over.

Which pages have you looked at?
https://alpakka.getakka.net/documentation/Azure/StorageQueue.html

What did these pages not make clear?
That the Stream provider does not delete delivered messages from the Azure Storage Queue.

How can we do it better?
The page should make it clear that deletion of consumed messages from the Azure Storage Queue is the responsibility of the message consumer.

Kafka - assignedPartitions is null for ConsumerStage when using TopicPartitionOffset

Hi there,

I am using the producer for the examples and the same code for the consumer except for 1 difference.

Instead of using Subscriptions.Topics("testtopic"), I am using Subscriptions.AssignmentWithOffset(new TopicPartitionOffset("testtopic", 0, Offset.Beginning));

This all works ok until the buffer is reached and the following code in ConsumerStage throws a an argument null exception as the assignedPartitions is null:

 private void PullQueue()
        {
            _consumer.Poll(_settings.PollTimeout);

            if (!isPaused && _buffer.Count > _settings.BufferSize)
            {
                Log.Debug($"Polling paused, buffer is full");
                _consumer.Pause(assignedPartitions);   // BREAKING CODE
                isPaused = true;
            }
        }

When I use the following code :

 static void Main(string[] args)
        {
            Config fallbackConfig = ConfigurationFactory.ParseString(@"
                    akka.suppress-json-serializer-warning=true
                    akka.loglevel = DEBUG
                ").WithFallback(ConfigurationFactory.FromResource<ConsumerSettings<object, object>>("Akka.Streams.Kafka.reference.conf"));

            var system = ActorSystem.Create("TestKafkaConsumer", fallbackConfig);
            var materializer = system.Materializer();

            var consumerSettings = ConsumerSettings<Null, string>.Create(system, null, new StringDeserializer(Encoding.UTF8))
                .WithBootstrapServers("localhost:9092")
                .WithGroupId("group1");


            //var subscription = Subscriptions.Topics("testtopic");

            var subscription = Subscriptions.AssignmentWithOffset(new TopicPartitionOffset("testtopic", 0, Offset.Beginning));

            var source = Consumer.PlainSource(consumerSettings, subscription);
            
            source.Throttle(5, TimeSpan.FromSeconds(1), 5, ThrottleMode.Shaping)
                .RunForeach(
                    result =>
                    {
                        Console.WriteLine(
                            $"Consumer: {result.Topic}/{result.Partition} {result.Offset}: {result.Value}");
                    }, materializer);

            Console.ReadLine();
        }

I'm not a Kafka expert and couldn't tell how I could fix this. It seems to be similar to this issue:

confluentinc/confluent-kafka-dotnet#434

Restarted AMQP Source does not recreates auto-delete queue

let system = ActorSystem.Create("test", ConfigurationFactory.Load())
let mat = ActorMaterializer.Create(system, ActorMaterializerSettings.Create(system).WithSupervisionStrategy(Deciders.ResumingDecider))

let connSettings = 
    AmqpConnectionDetails
        .Create("localhost", 5672)
        .WithAutomaticRecoveryEnabled(true)
        .WithNetworkRecoveryInterval(TimeSpan.FromSeconds 1.0)
        .WithTopologyRecoveryEnabled(true)
        
let taskQueue = "task-queue"
let taskQueueDecl = QueueDeclaration.Create(taskQueue).WithDurable(false).WithAutoDelete(true)

// send 10 tasks
let sink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(connSettings).WithRoutingKey(taskQueue).WithDeclarations(taskQueueDecl))
Source.From([1..10]).Select(fun x -> x |> string |> ByteString.FromString).RunWith(sink, mat).Wait()

let reportQueue = QueueDeclaration.Create("report-queue").WithDurable(false).WithAutoDelete(true)

RestartSource
    .WithBackoff((fun _ ->
        AmqpSource.CommittableSource(NamedQueueSourceSettings.Create(connSettings, taskQueue), bufferSize = 100)),
        TimeSpan.FromSeconds 1.0, TimeSpan.FromSeconds 2.0, 0.2)
    .Throttle(1, TimeSpan.FromMilliseconds 1000.0, 0, ThrottleMode.Shaping)
    .Select(fun x ->
        printfn "[0] %s" (x.Message.Bytes.ToString())
        x)
    .SelectAsync(1, fun cms ->
        async {
            if cms.Message.Bytes.ToString() = "5" then
                do! cms.Nack() |> Async.AwaitTask
            do! cms.Ack() |> Async.AwaitTask // here we fail on "5" message
            return cms.Message.Bytes
        }
        |> Async.StartAsTask)
    .Select(fun x ->
        printfn "[1] %O" x
        x)
    .RunWith(
        RestartSink.WithBackoff((fun _ ->
            AmqpSink.CreateSimple(AmqpSinkSettings.Create(connSettings).WithRoutingKey(reportQueue.Name).WithDeclarations(reportQueue))),
                TimeSpan.FromSeconds 1.0, TimeSpan.FromSeconds 2.0, 0.2), mat)
|> ignore
system.WhenTerminated.Wait()
[0] 1
[1] 1
[0] 2
[1] 2
[0] 3
[1] 3
[0] 4
[1] 4
[0] 5
[1] 5
[ERROR][18-Jan-18 19:45:06][Thread 0011][Akka.Streams.Dsl.RestartWithBackoffSource`2+Logic[Akka.Streams.Amqp.Dsl.CommittableIncomingMessage,Akka.NotUsed]] Restarting graph due to failure
Cause: Akka.Streams.Amqp.ShutdownSignalException: ShutdownSignal has been received, ShutdownInitiator=Peer, Cause=, ClassId=60, MethodId=80, ReplyCode=406, ReplyText=PRECONDITION_FAILED - unknown delivery tag 5
[DEBUG][18-Jan-18 19:45:06][Thread 0011][Akka.Streams.Dsl.RestartWithBackoffSource`2+Logic[Akka.Streams.Amqp.Dsl.CommittableIncomingMessage,Akka.NotUsed]] Last restart attempt was more than 00:00:01 ago, resetting restart count
[DEBUG][18-Jan-18 19:45:06][Thread 0011][Akka.Streams.Dsl.RestartWithBackoffSource`2+Logic[Akka.Streams.Amqp.Dsl.CommittableIncomingMessage,Akka.NotUsed]] Restarting graph in 00:00:01.0657050
[0] 6
[ERROR][18-Jan-18 19:45:08][Thread 0003][[akka://test/user/StreamSupervisor-0/Flow-1-0-unknown-operation#477839087]] Error during PreStart in [AmqpSource]
Cause: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'task-queue' in vhost '/'", classId=60, methodId=20, cause=
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
   at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
   at Akka.Streams.Amqp.AmqpSourceStage.AmqpSourceStageLogic.SetupNamedQueue(NamedQueueSourceSettings settings)
   at Akka.Streams.Amqp.AmqpConnectorLogic.PreStart()
   at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)
[ERROR][18-Jan-18 19:45:08][Thread 0003][Akka.Streams.Dsl.RestartWithBackoffSource`2+Logic[Akka.Streams.Amqp.Dsl.CommittableIncomingMessage,Akka.NotUsed]] Restarting graph due to failure
Cause: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'task-queue' in vhost '/'", classId=60, methodId=20, cause=
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
   at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
   at Akka.Streams.Amqp.AmqpSourceStage.AmqpSourceStageLogic.SetupNamedQueue(NamedQueueSourceSettings settings)
   at Akka.Streams.Amqp.AmqpConnectorLogic.PreStart()
   at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)
[DEBUG][18-Jan-18 19:45:08][Thread 0003][Akka.Streams.Dsl.RestartWithBackoffSource`2+Logic[Akka.Streams.Amqp.Dsl.CommittableIncomingMessage,Akka.NotUsed]] Restarting graph in 00:00:02.3403801
[ERROR][18-Jan-18 19:45:10][Thread 0005][[akka://test/user/StreamSupervisor-0/Flow-1-0-unknown-operation#477839087]] Error during PreStart in [AmqpSource]
Cause: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'task-queue' in vhost '/'", classId=60, methodId=20, cause=
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
   at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
   at Akka.Streams.Amqp.AmqpSourceStage.AmqpSourceStageLogic.SetupNamedQueue(NamedQueueSourceSettings settings)
   at Akka.Streams.Amqp.AmqpConnectorLogic.PreStart()
   at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)
[ERROR][18-Jan-18 19:45:10][Thread 0005][Akka.Streams.Dsl.RestartWithBackoffSource`2+Logic[Akka.Streams.Amqp.Dsl.CommittableIncomingMessage,Akka.NotUsed]] Restarting graph due to failure
Cause: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'task-queue' in vhost '/'", classId=60, methodId=20, cause=
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
   at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
   at Akka.Streams.Amqp.AmqpSourceStage.AmqpSourceStageLogic.SetupNamedQueue(NamedQueueSourceSettings settings)
   at Akka.Streams.Amqp.AmqpConnectorLogic.PreStart()
   at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)
[DEBUG][18-Jan-18 19:45:10][Thread 0005][Akka.Streams.Dsl.RestartWithBackoffSource`2+Logic[Akka.Streams.Amqp.Dsl.CommittableIncomingMessage,Akka.NotUsed]] Restarting graph in 00:00:02.1493502
[ERROR][18-Jan-18 19:45:12][Thread 0008][[akka://test/user/StreamSupervisor-0/Flow-1-0-unknown-operation#477839087]] Error during PreStart in [AmqpSource]
Cause: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'task-queue' in vhost '/'", classId=60, methodId=20, cause=
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
   at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
   at Akka.Streams.Amqp.AmqpSourceStage.AmqpSourceStageLogic.SetupNamedQueue(NamedQueueSourceSettings settings)
   at Akka.Streams.Amqp.AmqpConnectorLogic.PreStart()
   at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)

It fails with "text="NOT_FOUND - no queue 'task-queue'". I don't understand why.

Akka.Streams.SignalR

I think, that good idea would be to add a source/sink support for SignalR. Ideally it should also cover a reactive streams API back to the client side - RxJs already implemented reactive-streams compliant API in form of controlled observables.

More frequent releases

For instance, the LogRotatorSink was contributed in October but it's still not available. Maybe we could force a new release at least right after every Akka.NET release?

Also related to #173

Flaky tests when transient error/delay happens

Version Information
21710e9

Describe the bug

We find a few unit tests fail undeterministically due to different reasons when the Azure Storage service is unstable:

Akka.Streams.Azure.StorageQueue.Tests.QueueSinkSpec.A_QueueSink_should_add_elements_to_the_queue
Akka.Streams.Azure.StorageQueue.Tests.QueueSinkSpec.A_QueueSink_should_retry_failing_messages_if_supervision_strategy_is_resume
Akka.Streams.Azure.StorageQueue.Tests.QueueSinkSpec.A_QueueSink_should_skip_failing_messages_if_supervision_strategy_is_restart
Akka.Streams.Azure.StorageQueue.Tests.QueueSourceSpecs.A_QueueSource_should_fail_when_an_error_occurs
Akka.Streams.Azure.StorageQueue.Tests.QueueSourceSpecs.A_QueueSource_should_not_fail_if_the_supervision_strategy_is_not_stop_when_an_error_occurs
Akka.Streams.Azure.StorageQueue.Tests.QueueSourceSpecs.A_QueueSource_should_only_poll_if_demand_is_available
Akka.Streams.Azure.StorageQueue.Tests.QueueSourceSpecs.A_QueueSource_should_poll_for_messages_if_the_queue_is_empty
Akka.Streams.Azure.StorageQueue.Tests.QueueSourceSpecs.A_QueueSource_should_push_available_messages

For example, hardcoded timeout is often used in the test code like

t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();

which expects t finishes within 3 seconds.
If the Azure API called by t gets processed slower than usual (when the service is busy), the test will fail.

Another example is in A_QueueSink_should_skip_failing_messages_if_supervision_strategy_is_restart

probe.SendNext("2");
probe.SendComplete();
await task;
(await Queue.ReceiveMessagesAsync()).Value[0].MessageText.Should().Be("2");

If some transient error happens (like 503 server is busy) when processing the Azure Storage request to send message, the test will fail due to System.IndexOutOfRangeException : Index was outside the bounds of the array.

We are not sure whether such flaky tests (caused by transient error/delay) are desired or not.
If not, would it be better to:

  1. wait until t finishes w/o hardcoded timeout in the first example?
  2. check whether Value contains any element before calling Value[0] in the second example?
    If these are considered better practice, we are willing to send a PR for the fix.

It's impossible to send durable messages with AMQP Sink

To make messages persistent it's necessary to do two things: make the queue durable and set DeliveryMode = 2 for each message. The former is simple:

let queueDeclaration = QueueDeclaration.Create(queueName).WithDurable(true)`

But the latter is impossible with the current implementation:

    Source
        .From(seq { 1..1_000 })
        .Select(fun x -> x |> string |> ByteString.FromString) // no way to set basic properties
        .To(sink)
        .Run(mat)
    |> ignore

Sample is here https://github.com/vasily-kirichenko/AmqpStreams/blob/master/AmqpStreams1/Program.fs

Prepare Alpakka release

A number of PR's have been merged lately and we want to put out a new release for Alpakka.

The follow projects need to be updated to the latest version of akka.streams (at the very least).

  • Azure projects (Done)
  • AMQP project
  • Kafka. See #42
  • AWS
  • CSV
  • SNS
  • SignalR
  • XML

Note that these are all simple update dependencies and increment version actions.
Ill be slowly working my way through these projects over the coming weeks as i get around to doing it.

But if anyone would like to do some of the work. Make a comment here and claim a project (or 2, 3). Doing so would speed up the process.

Net Standard Support for Akka Streams Azure Storage Queue

As a developer working with NET Core / Akka / Azure Storage Queue
I would like to convert existing Azure Storage Queue project to netstandard2
so that it would be reusable with new net core projects same as currently is used with net461

I changed type of project from net461 to netstandard2 and solution is still compilable, tests are green.

[AMQP] Doesn't work with Akka.Streams v.1.3.6 and higher

Method not found: 'Void Akka.Streams.Stage.GraphStageLogic.SetHandler(Akka.Streams.Outlet, System.Action, System.Action)'.

   at Akka.Streams.Amqp.AmqpSourceStage.AmqpSourceStageLogic..ctor(AmqpSourceStage stage)
   at Akka.Streams.Amqp.AmqpSourceStage.CreateLogic(Attributes inheritedAttributes)
   at Akka.Streams.Stage.GraphStage`1.CreateLogicAndMaterializedValue(Attributes inheritedAttributes)
   at Akka.Streams.Implementation.Fusing.GraphAssembly.Materialize(Attributes inheritedAttributes, IModule[] copiedModules, IDictionary`2 materializedValues, Action`1 register)
   at Akka.Streams.Implementation.ActorMaterializerImpl.ActorMaterializerSession.MaterializeGraph(GraphModule graph, Attributes effectiveAttributes, IDictionary`2 materializedValues)
   at Akka.Streams.Implementation.ActorMaterializerImpl.ActorMaterializerSession.MaterializeAtomic(AtomicModule atomic, Attributes effectiveAttributes, IDictionary`2 materializedValues)
   at Akka.Streams.Implementation.MaterializerSession.MaterializeModule(IModule module, Attributes effectiveAttributes)
   at Akka.Streams.Implementation.MaterializerSession.Materialize()
   at Akka.Streams.Implementation.ActorMaterializerImpl.Materialize[TMat](IGraph`2 runnable, Func`2 subFlowFuser, Attributes initialAttributes)
   at Akka.Streams.Implementation.ActorMaterializerImpl.Materialize[TMat](IGraph`2 runnable)
   at Akka.Streams.Dsl.RunnableGraph`1.Run(IMaterializer materializer)
   at Akka.Streams.Dsl.Source`2.RunWith[TMat2](IGraph`2 sink, IMaterializer materializer)

It looks like you have not update the nuget package after changes:
pr 32
pr 56

.net Core Support for Azure Service Bus

Has anyone worked on a port to .net core? If not, I can put a PR together. However, one issue is that the message GetBody() no longer exists on the Message, so a direction would need to be decided.

Kafka fails stage on producer getting disconnected

When I use a long running producer without message for longer then 10 minutes the kafka idle time reaper comes along and disconnects the producer. Giving an error. I have never had issues producing after this message.

The ProducerStageLogic however exits with a FailedStage. To my knowledge I don't think this should occur like this. This error is in the logs 9092/bootstrap: Receive failed: Disconnected.

I am in doubt if this piece of code is correct, but I don't know 100%

                Log.Error(error.Reason);

                if (!KafkaExtensions.IsBrokerErrorRetriable(error) && !KafkaExtensions.IsLocalErrorRetriable(error))
                {
                    var exception = new KafkaException(error);
                    FailStage(exception);
                }

IndexOutOfRangeException happens when trying to get the queue message

Version Information
3d881af

Describe the bug

When using SendNext method to send a message to the queue based on Azure Queue service, it may encounter transient errors (e.g., 503). So the Queue SendMessageAsync called inside will fail. However, it may not fail immediately but will fail at the ReceiveMessageAsync when trying to get the message, and a System.IndexOutOfRangeException will appear.

To Reproduce

We found the test Akka.Streams.Azure.StorageQueue.Tests.QueueSinkSpec.A_QueueSink_should_skip_failing_messages_if_supervision_strategy_is_restart which exercises the SendMessageAsync and ReceiveMessageAsync could be used to reproduce. Stack trace:

...
System.IndexOutOfRangeException : Index was outside the bounds of the array.
at Akka.Streams.Azure.StorageQueue.Tests.QueueSinkSpec.A_QueueSink_should_skip_failing_messages_if_supervision_strategy_is_restart() in C:\Users\Ze\Alpakka\src\Azure\Akka.Streams.Azure.StorageQueue.Tests\QueueSinkSpec.cs:line 88
...

Environment
Windows

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.