akkadotnet / alpakka Goto Github PK
View Code? Open in Web Editor NEWAkka Streams Connectors - Alpakka
Home Page: https://alpakka.getakka.net/
License: Apache License 2.0
Akka Streams Connectors - Alpakka
Home Page: https://alpakka.getakka.net/
License: Apache License 2.0
Version Information
21710e9
We find a few unit tests fail undeterministically due to different reasons when the Azure Storage service is unstable:
Akka.Streams.Azure.StorageQueue.Tests.QueueSinkSpec.A_QueueSink_should_add_elements_to_the_queue
Akka.Streams.Azure.StorageQueue.Tests.QueueSinkSpec.A_QueueSink_should_retry_failing_messages_if_supervision_strategy_is_resume
Akka.Streams.Azure.StorageQueue.Tests.QueueSinkSpec.A_QueueSink_should_skip_failing_messages_if_supervision_strategy_is_restart
Akka.Streams.Azure.StorageQueue.Tests.QueueSourceSpecs.A_QueueSource_should_fail_when_an_error_occurs
Akka.Streams.Azure.StorageQueue.Tests.QueueSourceSpecs.A_QueueSource_should_not_fail_if_the_supervision_strategy_is_not_stop_when_an_error_occurs
Akka.Streams.Azure.StorageQueue.Tests.QueueSourceSpecs.A_QueueSource_should_only_poll_if_demand_is_available
Akka.Streams.Azure.StorageQueue.Tests.QueueSourceSpecs.A_QueueSource_should_poll_for_messages_if_the_queue_is_empty
Akka.Streams.Azure.StorageQueue.Tests.QueueSourceSpecs.A_QueueSource_should_push_available_messages
For example, hardcoded timeout is often used in the test code like
t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
which expects t
finishes within 3 seconds.
If the Azure API called by t
gets processed slower than usual (when the service is busy), the test will fail.
Another example is in A_QueueSink_should_skip_failing_messages_if_supervision_strategy_is_restart
probe.SendNext("2");
probe.SendComplete();
await task;
(await Queue.ReceiveMessagesAsync()).Value[0].MessageText.Should().Be("2");
If some transient error happens (like 503 server is busy) when processing the Azure Storage request to send message, the test will fail due to System.IndexOutOfRangeException : Index was outside the bounds of the array.
We are not sure whether such flaky tests (caused by transient error/delay) are desired or not.
If not, would it be better to:
t
finishes w/o hardcoded timeout in the first example?Value
contains any element before calling Value[0]
in the second example?Is your feature request related to a problem? Please describe.
I would like to use AKKA.net as SignalR Backplane, likewise REDIS.
Describe the solution you'd like
The idea is to create a cluster of ASPNET sites and stream SignalR message from clients connected to one cluster node to all other clients connected to other nodes.
Describe alternatives you've considered
I'm using StreamConnection has SignaR Hub on each node, then I send received message to a ReceiveActor, but I cannot resend messagges back to SignalR clients from the ReceiveActor
Additional context
I'm trying to find some example of using Akka.NET Streams to consume Kafka events from the Azure Event Hub. It seems to me that Kafka support does not support the authentication required to connect to Azure Hub, but I may be mistaken. If anyone has an example I would appreciate it very much.
I'm not actually a .Net developer by nature. In Java or Scala using Akka Streams I would configure my application like this:
akka {
kafka.producer {
#Akka kafka producer properties can be defined here
client.id = "proxy-client"
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers = "my-kafka.servicebus.windows.net:9093"
sasl.mechanism = PLAIN
security.protocol = SASL_SSL
sasl.jaas.config = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://my-kafka.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={key}\";"
}
}
On the Gitter channel, Aaron told me that you support these options so it must be the case that you just find them. So an example would be welcome.
The following connection details:
let settings =
AmqpConnectionDetails.Create("host22", 5671)
.WithAutomaticRecoveryEnabled(true)
.WithNetworkRecoveryInterval(TimeSpan.FromSeconds 1.0)
.WithCredentials(AmqpCredentials.Create("foo", "1"))
.WithSsl(SslOption("host22", enabled = true))
results with a ConnectionFactory
, which's Endpoint
is localhost
.
A workaround: use AmqpConnectionUri.Create("amqps://foo:1@host22:5671")
instead.
The demo example in the README here is not correct, does not compile, beccause
_materializer
is not defined, it is probably meant to be materializer
.
.Run(materializer)
does not return something that can be await
ed.
I can't tell as a newcomer how to fix this, which is a blocker to trying to use this code.
These items are currently missing from Alpakka:
I think, that good idea would be to add a source/sink support for SignalR. Ideally it should also cover a reactive streams API back to the client side - RxJs already implemented reactive-streams compliant API in form of controlled observables.
So users can get access to nightlies.
This will be done in the future.
Akka.Streams.Azure.EventHub
with Akka.Streams.Azure.EventHub.V5
Akka.Streams.Azure.EventHub.V5
NuGet package.Right now, all of the docker images used in unit testing are pushed to my private dockerhub account, we need to move this to the akkadotnet owned account.
https://www.nuget.org/packages/Microsoft.AspNet.SignalR - this is what we should be targeting inside, rather than:
Version Information
3d881af
When using SendNext
method to send a message to the queue based on Azure Queue service, it may encounter transient errors (e.g., 503). So the Queue SendMessageAsync
called inside will fail. However, it may not fail immediately but will fail at the ReceiveMessageAsync
when trying to get the message, and a System.IndexOutOfRangeException
will appear.
We found the test Akka.Streams.Azure.StorageQueue.Tests.QueueSinkSpec.A_QueueSink_should_skip_failing_messages_if_supervision_strategy_is_restart which exercises the SendMessageAsync
and ReceiveMessageAsync
could be used to reproduce. Stack trace:
...
System.IndexOutOfRangeException : Index was outside the bounds of the array.
at Akka.Streams.Azure.StorageQueue.Tests.QueueSinkSpec.A_QueueSink_should_skip_failing_messages_if_supervision_strategy_is_restart() in C:\Users\Ze\Alpakka\src\Azure\Akka.Streams.Azure.StorageQueue.Tests\QueueSinkSpec.cs:line 88
...
Environment
Windows
Run the following code
let system = ActorSystem.Create("test", ConfigurationFactory.Default())
let mat = ActorMaterializer.Create(system, ActorMaterializerSettings.Create(system).WithSupervisionStrategy(Deciders.ResumingDecider))
let connSettings =
AmqpConnectionDetails
.Create("localhost", 5672)
.WithAutomaticRecoveryEnabled(true)
.WithNetworkRecoveryInterval(TimeSpan.FromSeconds 1.0)
let queueName = "my-queue"
let queueDeclaration = QueueDeclaration.Create(queueName).WithDurable(true).WithAutoDelete(false)
let sink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(connSettings).WithRoutingKey("my-queue").WithDeclarations(queueDeclaration))
Source
.From(seq { 1..1_000_000 })
.Select(fun x -> x |> string |> ByteString.FromString)
.To(sink)
.Run(mat)
|> ignore
system.WhenTerminated.Wait()
Everything is OK, messages are being queued at ~20K/second rate.
Now restart RabbitMQ server. The following error is in the log
[ERROR][17-Jan-18 19:06:48][Thread 0012][[akka://test/user/StreamSupervisor-0/Flow-0-1-AmqpSink#1922506265]] Error in stage [AmqpSink]: Unable to write data to the transport connection: An existing connection was forcibly closed by the remote host.
Cause: System.IO.IOException: Unable to write data to the transport connection: An existing connection was forcibly closed by the remote host. ---> System.Net.Sockets.SocketException: An existing connection was forcibly closed by the remote host
at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
--- End of inner exception stack trace ---
at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
at System.IO.BufferedStream.Flush()
at RabbitMQ.Client.Impl.SocketFrameHandler.WriteFrameSet(IList`1 frames)
at RabbitMQ.Client.Impl.Command.TransmitAsFrameSet(Int32 channelNumber, Connection connection)
at RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)
at RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, Byte[] body)
at RabbitMQ.Client.Impl.ModelBase.BasicPublish(String exchange, String routingKey, Boolean mandatory, IBasicProperties basicProperties, Byte[] body)
at Akka.Streams.Amqp.AmqpSinkStage.AmqpSinkStageLogic.<.ctor>b__3_0()
at Akka.Streams.Implementation.Fusing.GraphInterpreter.ProcessEvent(Connection connection)
at Akka.Streams.Implementation.Fusing.GraphInterpreter.Execute(Int32 eventLimit)
and the messages are not queued anymore.
I thought that WithAutomaticRecoveryEnabled(true)
reconnects automatically, and WithSupervisionStrategy(Deciders.ResumingDecider)
resumes the failing stream stage. As a result, I expected messages to continue being queued after server restart (and a single message to be lost).
This test is very racy:
ChannelSourceSpec.ChannelSource_must_read_incoming_events()
We still don't know if it is because something changed in the underlying Task
or ValueTask
code or because of our own async handling code or a combination of both.
I think that an example of that would be beneficial for a lot of people, including me.
It is possible to use the scala version as a source
https://github.com/akka/alpakka/tree/master/amqp/src
This nuget package could be used: https://www.nuget.org/packages/RabbitMQ.Client
To make messages persistent it's necessary to do two things: make the queue durable and set DeliveryMode = 2
for each message. The former is simple:
let queueDeclaration = QueueDeclaration.Create(queueName).WithDurable(true)`
But the latter is impossible with the current implementation:
Source
.From(seq { 1..1_000 })
.Select(fun x -> x |> string |> ByteString.FromString) // no way to set basic properties
.To(sink)
.Run(mat)
|> ignore
Sample is here https://github.com/vasily-kirichenko/AmqpStreams/blob/master/AmqpStreams1/Program.fs
A number of PR's have been merged lately and we want to put out a new release for Alpakka.
The follow projects need to be updated to the latest version of akka.streams (at the very least).
Note that these are all simple update dependencies and increment version actions.
Ill be slowly working my way through these projects over the coming weeks as i get around to doing it.
But if anyone would like to do some of the work. Make a comment here and claim a project (or 2, 3). Doing so would speed up the process.
Need an Azure EventHub source that supports manual commits to support atomic operation on events.
Version Information
Version of Akka.NET? 1.4.38
Which Akka.NET Modules? Akka.Streams 1.4.38, Akka.Streams.Channels 1.0.0-beta9
Describe the bug
I noticed strange behavior during working with Akka.Streams.Channels ChannelSource. I decided to use Channels as my backing store for some objects. I created ChannelSource to read these data in batches and send them to Azure Service Bus. Everything was working pretty well if I have around 1000 items stored in Channel. In case of the number increased I noticed that ChannelSource stopped processing data after some time. After some investigation, I noticed that ChannelSource is failing with TaskCanceledException which is raised with FailStage here - ChannelReaderSource.cs
To Reproduce
Steps to reproduce the behavior:
Here is the link for simple repo which is on my machine failing almost every time - https://github.com/pavoldecky/AkkaStreamChannel/blob/main/AkkaStreamChannel/Program.cs
Expected behavior
First of all, I am not sure if ChannelSource is intended to use a huge amount of items. I try to play with Channels Reader and Writer and I was able to process much more data as it is failing with ChannelSource. So I thought that ChannelSource should be able o handle the same amount of data.
Actual behavior
So after some more investigation, I noticed that problem could be with the continuation ValueTask in OnPull method
public override void OnPull()
{
if (_reader.TryRead(out var element))
{
Push(_outlet, element);
}
else
{
var continuation = _reader.WaitToReadAsync();
if (continuation.IsCompletedSuccessfully)
{
var dataAvailable = continuation.GetAwaiter().GetResult();
if (dataAvailable && _reader.TryRead(out element))
Push(_outlet, element);
else
CompleteStage();
}
else if (!continuation.IsCompleted)
continuation.AsTask().ContinueWith(t =>
{
if (t.IsFaulted) _onValueReadFailure(t.Exception);
else if (t.IsCanceled) _onValueReadFailure(new TaskCanceledException(t));
else _onValueRead(t.Result);
});
else if (continuation.IsFaulted)
FailStage(continuation.AsTask().Exception);
else
FailStage(new TaskCanceledException(continuation.AsTask()));
}
}
so the issue occurs when _reader.TryRead returns false and we are calling _reader.WaitToReadAsync(). In some cases, code execution ends up in last else. But when I check the status of continuation ValueTask in this case, this value task is CompletedSuccesfully. So could be the problem that continuation is completed with some delay and because of that it is not handled by first if (continuation.IsCompletedSuccessfully) ?
I tried to do some changes and it is working for my case, but as a user with limited knowledge of ValueTasks and Akka.Streams I would expect that this is not exactly what is needed :)
Here are my changes.
public override void OnPull()
{
if (_reader.TryRead(out var element))
{
Push(_outlet, element);
}
else
{
var task = _reader.WaitToReadAsync().AsTask();
if (task.IsCompletedSuccessfully)
{
var dataAvailable = task.GetAwaiter().GetResult();
if (dataAvailable && _reader.TryRead(out element))
{
Push(_outlet, element);
}
else
{
CompleteStage();
}
}
else if (task.IsFaulted)
{
FailStage(task.Exception);
}
else
{
task.ContinueWith(result =>
{
if (result.IsCanceled)
{
_onValueReadFailure(new TaskCanceledException(result));
}
else if (result.IsFaulted)
{
_onValueReadFailure(result.Exception);
}
else
{
_onValueRead(result.Result);
}
});
}
}
Environment
Windows 11, .NET 6.0.300
As a developer working with NET Core / Akka / Azure Storage Queue
I would like to convert existing Azure Storage Queue project to netstandard2
so that it would be reusable with new net core projects same as currently is used with net461
I changed type of project from net461 to netstandard2 and solution is still compilable, tests are green.
Running Akka.Streams 1.3.10 against the current CloudQueue integration causes an MissingMethod exception on the SetHandler
method call in its QueueSource.Logic
class.
Its probably some overload that changed or something.
If possible we should port all available modules from the JVM.
Alpakka/src/AWS/Kinesis/Akka.Streams.Kinesis/KinesisSourceStage.cs
Lines 100 to 108 in 26a9e78
StartingSequenceNumber
must be copied when ShardIteratorType
is set to AFTER_SEQUENCE_NUMBER
or AT_SEQUENCE_NUMBER
I want to make a statement here.
The Akka.NET project and organization is a separate entity from Aarons Petabridge.
I personally love what you @Aaronontheweb are doing and I am impressed with the success you have had.
But, when things like this The Petabridge subscription does not cover support for these modules
sneak into the documentation and organizational repos, then the boundaries have become too much blurred.
Same rules must apply to everyone, if anyone else wants to run a business around Akka.NET, where do we draw the line whos business get exposed in the internal information?
(I have no intention on doing so, so it's not that, it's about principles, open, free, same rules for everyone)
I see no issues with linking to blogs or articles from the docs, but when someones personal support license pops up in here, then there is a problem with ownership and perception from the community.
Kafka is a persistent commit log. Because of the persistent nature of Kafka we can use kafka for event sourcing. Here is a nice article about the subject https://dzone.com/articles/event-sourcing-with-kafka. This article describes RocksDb for state.
I think Kafka and Akka could have great synergy. There is just one catch, which is already mentioned in the article as well. Because Kafka uses files on disc to store the event stream it is considered not wise to use topic per entity, unless you have little entities in the system, but you would also need some administration of what entities exist (is isn't that hard, as we can get info using metadata).
Examples from other github sources actually have 1 topic per actor, java/scala example: https://github.com/krasserm/akka-persistence-kafka there is an EventTopicMapper
but I can't really see if it can be used for many entities into 1 topic, or if it meant to do that altogether.
I think it is not smart to make kafka a source for a persistent actor at this moment as you will need as much consumers as number of actors which are restoring state. Kafka topics are not meant to be read from the beginning over and over again from a performance point.
To not get all to abstract here, a simple Event Sourced system could be a Users topic. We could define UserCreated, UserDisplaynameChanged, UserLoggedIn, UserLoggedOut. I could see we have 1 UsersTopic. Let's make the Key a Guid (we all love Guid don't we ;))
What we would need:
I don't think we need to worry a lot about other actors producing in the the topics out subsystem is subscribing, this is up the the developer and has the same issues when someone has persistent actors with overlapping persistenceids, system behaviour could be real strange, but as long as consumers keep subcribed to the topic, it will be a lot more robust then current persistent actors, as they stop the subscription (most implementations anyway) ...
Kafka can perform at great speeds, I think creating snapshots could be done, but snapshots for multiple actors in a topic could be quite hard to do, so scope of this discussion should be events/ journal only at this time.
I have already build actors based on topics, but I have created this "issue" for a discussion for an generic solution, we could provide others in this repo, or anywhere else.
My thoughts how to do this with the simple users topic. I created this discussion because I would like to see what others think before missing important Akka features/ Kafka features etc.
Based on my first thoughts on this I guess we need the PartititionEOF from Kafka as an event as well in the SourceStage.
Current implementation acks messages as soon as it pushed them downstream, which is not what I want. I need to be on control of acking each message manually, in some of downstream stages or outside the stream (e.g. in an independent actor):
Scala Alpakka was added CommittableMessage
more that a year ago in akka/alpakka@d478a61, see discussion akka/alpakka#97
Hi there,
I am using the producer for the examples and the same code for the consumer except for 1 difference.
Instead of using Subscriptions.Topics("testtopic"), I am using Subscriptions.AssignmentWithOffset(new TopicPartitionOffset("testtopic", 0, Offset.Beginning));
This all works ok until the buffer is reached and the following code in ConsumerStage throws a an argument null exception as the assignedPartitions is null:
private void PullQueue()
{
_consumer.Poll(_settings.PollTimeout);
if (!isPaused && _buffer.Count > _settings.BufferSize)
{
Log.Debug($"Polling paused, buffer is full");
_consumer.Pause(assignedPartitions); // BREAKING CODE
isPaused = true;
}
}
When I use the following code :
static void Main(string[] args)
{
Config fallbackConfig = ConfigurationFactory.ParseString(@"
akka.suppress-json-serializer-warning=true
akka.loglevel = DEBUG
").WithFallback(ConfigurationFactory.FromResource<ConsumerSettings<object, object>>("Akka.Streams.Kafka.reference.conf"));
var system = ActorSystem.Create("TestKafkaConsumer", fallbackConfig);
var materializer = system.Materializer();
var consumerSettings = ConsumerSettings<Null, string>.Create(system, null, new StringDeserializer(Encoding.UTF8))
.WithBootstrapServers("localhost:9092")
.WithGroupId("group1");
//var subscription = Subscriptions.Topics("testtopic");
var subscription = Subscriptions.AssignmentWithOffset(new TopicPartitionOffset("testtopic", 0, Offset.Beginning));
var source = Consumer.PlainSource(consumerSettings, subscription);
source.Throttle(5, TimeSpan.FromSeconds(1), 5, ThrottleMode.Shaping)
.RunForeach(
result =>
{
Console.WriteLine(
$"Consumer: {result.Topic}/{result.Partition} {result.Offset}: {result.Value}");
}, materializer);
Console.ReadLine();
}
I'm not a Kafka expert and couldn't tell how I could fix this. It seems to be similar to this issue:
Version Information
Akka.Net v1.5.12
Akka.Hosting v1.5.12
Akka.Streams v1.5.12
Akka.Streams.Amqp.RabbitMq v1.5.8
Akka.Cluster.Hosting v1.5.12
Describe the bug
Both Producer and Consumer of the Sample are throwing NullReferenceException.
To Reproduce
Steps to reproduce the behavior:
Checkout master from
https://github.com/petabridge/akkadotnet-code-samples/tree/master/src/reliability/rabbitmq-backpressure
Update NugetDependencies in the project: ReliableRabbitMQ.Shared
<PropertyGroup>
<TargetFramework>$(NetRuntime)</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka" Version="1.5.12" />
<PackageReference Include="Akka.Hosting" Version="1.5.12" />
<PackageReference Include="Akka.Streams" Version="1.5.12" />
<PackageReference Include="Akka.Streams.Amqp.RabbitMq" Version="1.5.8" />
<PackageReference Include="Akka.Cluster.Hosting" Version="1.5.12" />
</ItemGroup>
Run both ReliableRabbitMQ.Consumer and ReliableRabbitMQ.Producer
See the error in console.
Expected behavior
Both ReliableRabbitMQ.Consumer and ReliableRabbitMQ.Producer are working without an error.
Actual behavior
Both ReliableRabbitMQ.Consumer and ReliableRabbitMQ.Producer console apps are throwing NullReferenceException.
-------------------------------- Producer --------------------------------
[ERROR][08. 07. 2023 12:36:44.268Z][Thread 0014][[akka://RabbitMQProducer/user/amqp-producer/StreamSupervisor-1/Flow-1-2-AmqpSink#881794866]] Error during PreStart in [AmqpSink]
Cause: RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
---> System.AggregateException: One or more errors occurred. (Object reference not set to an instance of an object.)
---> System.NullReferenceException: Object reference not set to an instance of an object.
at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func2 socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout) at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint) at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func
2 selector)
--- End of inner exception stack trace ---
at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func2 selector) at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints) at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName) --- End of inner exception stack trace --- at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName) at Akka.Streams.Amqp.RabbitMq.AmqpConnectorLogic.PreStart() --- End of stack trace from previous location --- at Akka.Actor.PipeToSupport.PipeTo(Task taskToPipe, ICanTell recipient, Boolean useConfigureAwait, IActorRef sender, Func
1 success, Func`2 failure)
at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)
-------------------------------- Consumer --------------------------------
[ERROR][08. 07. 2023 12:36:45.507Z][Thread 0014][[akka://RabbitConsumer/user/amqp-consumer/amqp-consumer/StreamSupervisor-1/Flow-0-0-unknown-operation#844566059]] Error during PreStart in [AmqpSource]
Cause: RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
---> System.AggregateException: One or more errors occurred. (Object reference not set to an instance of an object.)
---> System.NullReferenceException: Object reference not set to an instance of an object.
at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func2 socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout) at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint) at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func
2 selector)
--- End of inner exception stack trace ---
at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints)
at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
--- End of inner exception stack trace ---
at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
at Akka.Streams.Amqp.RabbitMq.AmqpConnectorLogic.PreStart()
at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)
Screenshots
Environment
Windows 10 Pro 22H2,
RabbitMQ 3.11.10 on Erlang 25.3 running in Docker,
.NET 6
These were all incorporated into Akka.Streams core very late 2022: akkadotnet/akka.net#6268
We should remove those stages from this project and mark their respective NuGet packages as deprecated.
For instance, the LogRotatorSink
was contributed in October but it's still not available. Maybe we could force a new release at least right after every Akka.NET release?
Also related to #173
let system = ActorSystem.Create("test", ConfigurationFactory.Load())
let mat = ActorMaterializer.Create(system, ActorMaterializerSettings.Create(system).WithSupervisionStrategy(Deciders.ResumingDecider))
let connSettings =
AmqpConnectionDetails
.Create("localhost", 5672)
.WithAutomaticRecoveryEnabled(true)
.WithNetworkRecoveryInterval(TimeSpan.FromSeconds 1.0)
.WithTopologyRecoveryEnabled(true)
let taskQueue = "task-queue"
let taskQueueDecl = QueueDeclaration.Create(taskQueue).WithDurable(false).WithAutoDelete(true)
// send 10 tasks
let sink = AmqpSink.CreateSimple(AmqpSinkSettings.Create(connSettings).WithRoutingKey(taskQueue).WithDeclarations(taskQueueDecl))
Source.From([1..10]).Select(fun x -> x |> string |> ByteString.FromString).RunWith(sink, mat).Wait()
let reportQueue = QueueDeclaration.Create("report-queue").WithDurable(false).WithAutoDelete(true)
RestartSource
.WithBackoff((fun _ ->
AmqpSource.CommittableSource(NamedQueueSourceSettings.Create(connSettings, taskQueue), bufferSize = 100)),
TimeSpan.FromSeconds 1.0, TimeSpan.FromSeconds 2.0, 0.2)
.Throttle(1, TimeSpan.FromMilliseconds 1000.0, 0, ThrottleMode.Shaping)
.Select(fun x ->
printfn "[0] %s" (x.Message.Bytes.ToString())
x)
.SelectAsync(1, fun cms ->
async {
if cms.Message.Bytes.ToString() = "5" then
do! cms.Nack() |> Async.AwaitTask
do! cms.Ack() |> Async.AwaitTask // here we fail on "5" message
return cms.Message.Bytes
}
|> Async.StartAsTask)
.Select(fun x ->
printfn "[1] %O" x
x)
.RunWith(
RestartSink.WithBackoff((fun _ ->
AmqpSink.CreateSimple(AmqpSinkSettings.Create(connSettings).WithRoutingKey(reportQueue.Name).WithDeclarations(reportQueue))),
TimeSpan.FromSeconds 1.0, TimeSpan.FromSeconds 2.0, 0.2), mat)
|> ignore
system.WhenTerminated.Wait()
[0] 1
[1] 1
[0] 2
[1] 2
[0] 3
[1] 3
[0] 4
[1] 4
[0] 5
[1] 5
[ERROR][18-Jan-18 19:45:06][Thread 0011][Akka.Streams.Dsl.RestartWithBackoffSource`2+Logic[Akka.Streams.Amqp.Dsl.CommittableIncomingMessage,Akka.NotUsed]] Restarting graph due to failure
Cause: Akka.Streams.Amqp.ShutdownSignalException: ShutdownSignal has been received, ShutdownInitiator=Peer, Cause=, ClassId=60, MethodId=80, ReplyCode=406, ReplyText=PRECONDITION_FAILED - unknown delivery tag 5
[DEBUG][18-Jan-18 19:45:06][Thread 0011][Akka.Streams.Dsl.RestartWithBackoffSource`2+Logic[Akka.Streams.Amqp.Dsl.CommittableIncomingMessage,Akka.NotUsed]] Last restart attempt was more than 00:00:01 ago, resetting restart count
[DEBUG][18-Jan-18 19:45:06][Thread 0011][Akka.Streams.Dsl.RestartWithBackoffSource`2+Logic[Akka.Streams.Amqp.Dsl.CommittableIncomingMessage,Akka.NotUsed]] Restarting graph in 00:00:01.0657050
[0] 6
[ERROR][18-Jan-18 19:45:08][Thread 0003][[akka://test/user/StreamSupervisor-0/Flow-1-0-unknown-operation#477839087]] Error during PreStart in [AmqpSource]
Cause: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'task-queue' in vhost '/'", classId=60, methodId=20, cause=
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
at Akka.Streams.Amqp.AmqpSourceStage.AmqpSourceStageLogic.SetupNamedQueue(NamedQueueSourceSettings settings)
at Akka.Streams.Amqp.AmqpConnectorLogic.PreStart()
at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)
[ERROR][18-Jan-18 19:45:08][Thread 0003][Akka.Streams.Dsl.RestartWithBackoffSource`2+Logic[Akka.Streams.Amqp.Dsl.CommittableIncomingMessage,Akka.NotUsed]] Restarting graph due to failure
Cause: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'task-queue' in vhost '/'", classId=60, methodId=20, cause=
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
at Akka.Streams.Amqp.AmqpSourceStage.AmqpSourceStageLogic.SetupNamedQueue(NamedQueueSourceSettings settings)
at Akka.Streams.Amqp.AmqpConnectorLogic.PreStart()
at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)
[DEBUG][18-Jan-18 19:45:08][Thread 0003][Akka.Streams.Dsl.RestartWithBackoffSource`2+Logic[Akka.Streams.Amqp.Dsl.CommittableIncomingMessage,Akka.NotUsed]] Restarting graph in 00:00:02.3403801
[ERROR][18-Jan-18 19:45:10][Thread 0005][[akka://test/user/StreamSupervisor-0/Flow-1-0-unknown-operation#477839087]] Error during PreStart in [AmqpSource]
Cause: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'task-queue' in vhost '/'", classId=60, methodId=20, cause=
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
at Akka.Streams.Amqp.AmqpSourceStage.AmqpSourceStageLogic.SetupNamedQueue(NamedQueueSourceSettings settings)
at Akka.Streams.Amqp.AmqpConnectorLogic.PreStart()
at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)
[ERROR][18-Jan-18 19:45:10][Thread 0005][Akka.Streams.Dsl.RestartWithBackoffSource`2+Logic[Akka.Streams.Amqp.Dsl.CommittableIncomingMessage,Akka.NotUsed]] Restarting graph due to failure
Cause: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'task-queue' in vhost '/'", classId=60, methodId=20, cause=
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
at Akka.Streams.Amqp.AmqpSourceStage.AmqpSourceStageLogic.SetupNamedQueue(NamedQueueSourceSettings settings)
at Akka.Streams.Amqp.AmqpConnectorLogic.PreStart()
at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)
[DEBUG][18-Jan-18 19:45:10][Thread 0005][Akka.Streams.Dsl.RestartWithBackoffSource`2+Logic[Akka.Streams.Amqp.Dsl.CommittableIncomingMessage,Akka.NotUsed]] Restarting graph in 00:00:02.1493502
[ERROR][18-Jan-18 19:45:12][Thread 0008][[akka://test/user/StreamSupervisor-0/Flow-1-0-unknown-operation#477839087]] Error during PreStart in [AmqpSource]
Cause: RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text="NOT_FOUND - no queue 'task-queue' in vhost '/'", classId=60, methodId=20, cause=
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
at RabbitMQ.Client.Impl.AutorecoveringModel.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
at Akka.Streams.Amqp.AmqpSourceStage.AmqpSourceStageLogic.SetupNamedQueue(NamedQueueSourceSettings settings)
at Akka.Streams.Amqp.AmqpConnectorLogic.PreStart()
at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)
It fails with "text="NOT_FOUND - no queue 'task-queue'". I don't understand why.
Method not found: 'Void Akka.Streams.Stage.GraphStageLogic.SetHandler(Akka.Streams.Outlet, System.Action, System.Action)'.
at Akka.Streams.Amqp.AmqpSourceStage.AmqpSourceStageLogic..ctor(AmqpSourceStage stage)
at Akka.Streams.Amqp.AmqpSourceStage.CreateLogic(Attributes inheritedAttributes)
at Akka.Streams.Stage.GraphStage`1.CreateLogicAndMaterializedValue(Attributes inheritedAttributes)
at Akka.Streams.Implementation.Fusing.GraphAssembly.Materialize(Attributes inheritedAttributes, IModule[] copiedModules, IDictionary`2 materializedValues, Action`1 register)
at Akka.Streams.Implementation.ActorMaterializerImpl.ActorMaterializerSession.MaterializeGraph(GraphModule graph, Attributes effectiveAttributes, IDictionary`2 materializedValues)
at Akka.Streams.Implementation.ActorMaterializerImpl.ActorMaterializerSession.MaterializeAtomic(AtomicModule atomic, Attributes effectiveAttributes, IDictionary`2 materializedValues)
at Akka.Streams.Implementation.MaterializerSession.MaterializeModule(IModule module, Attributes effectiveAttributes)
at Akka.Streams.Implementation.MaterializerSession.Materialize()
at Akka.Streams.Implementation.ActorMaterializerImpl.Materialize[TMat](IGraph`2 runnable, Func`2 subFlowFuser, Attributes initialAttributes)
at Akka.Streams.Implementation.ActorMaterializerImpl.Materialize[TMat](IGraph`2 runnable)
at Akka.Streams.Dsl.RunnableGraph`1.Run(IMaterializer materializer)
at Akka.Streams.Dsl.Source`2.RunWith[TMat2](IGraph`2 sink, IMaterializer materializer)
It looks like you have not update the nuget package after changes:
pr 32
pr 56
Hello, I am trying to connect a Broker that is not available for localhost. But this example shows how to use NamedQueueSourceSettings.Create (DefaultAmqpConnection.Instance, queueName) .WithDeclarations (queueDeclaration) where DefaultAmqpConnection.Instance implies a local host with no authentication.
Can someone help me?
I would also like to know about the status of this project. Is he active?
Thanks
This scala module could be used as a source: https://github.com/akka/reactive-kafka
This Kafka client is the best for .NET https://www.nuget.org/packages/Confluent.Kafka/
Please describe what you are trying to understand
In trying to build my first Actor that sets up a Stream over Azure Storage Queue, I was stuck for a few hours trying to figure out why the Stream keeps returning the same message over and over.
Which pages have you looked at?
https://alpakka.getakka.net/documentation/Azure/StorageQueue.html
What did these pages not make clear?
That the Stream provider does not delete delivered messages from the Azure Storage Queue.
How can we do it better?
The page should make it clear that deletion of consumed messages from the Azure Storage Queue is the responsibility of the message consumer.
Please, publish the AMQP package.
Please describe what you are trying to understand
It's not clear to me, a RabbitMQ / AMQP novice, which of these two libraries I should be using and why.
Version Information
Version of Akka.NET? 1.4.40
Which Akka.NET Modules? Akka.Streams.Azure.StorageQueue
Describe the bug
QueueSource does not remove processed messages from the Azure Storage Queue.
To Reproduce
Steps to reproduce the behavior:
protected override void PreStart()
{
QueueSource.Create(Queue)
.RunWith(Sink.ActorRef<QueueMessage>(Self, Done.Instance), Context.Materializer());
}
Links to working reproductions on Github / Gitlab are very much appreciated
I was able to replicate this by modifying the A_QueueSource_should_poll_for_messages_if_the_queue_is_empty
test that is found here:
If you modify the QueueCreate instruction to specify a short (5s) VisibilityTimeout, like so:"
var probe = QueueSource.Create(Queue, pollInterval: TimeSpan.FromSeconds(1), options: new GetRequestOptions(TimeSpan.FromSeconds(5)))
Then extend the timeout for which the test Expects No Message, like so:
probe.Request(2)
.ExpectNext("Test1")
.ExpectNoMsg(TimeSpan.FromSeconds(10));
Then you see the test failing with the error that a message was found when none was expected. This is because after 5 seconds, the originally processed "Test1" message is now visible again, and then gets returned by the QueueSource again.
Expected behavior
After an Azure Storage Queue message has been delivered to the Sink, it should be removed from the Azure Storage Queue.
Actual behavior
The delivered Azure Storage Queue message remains on the Azure Queue after being delivered to the configured Stream Sink.
Screenshots
None.
Environment
Windows 11 Visual Studio 2022
Additional context
Add any other context about the problem here.
There are several anomalies related with Akka.Streams.AMQP.V1 and the RabbitMQ docker container running under the ubuntu-18.04 Azure Pipelines image.
For the V1.0 plugin, our code could not establish any AMQP handshake with the container:
Akka.Streams.Amqp.V1.Tests.AmqpConnectorsTest.Publish_and_consume_elements_through_a_simple_queue_again_in_the_same_process [FAIL]
System.OperationCanceledException : The transport 'TcpTransport' is closed.
Stack Trace:
at Amqp.Framing.Reader.ReadBuffer(ITransport transport, Byte[] buffer, Int32 offset, Int32 count)
at Amqp.Framing.Reader.ReadHeader(ITransport transport)
at Amqp.Sasl.SaslProfile.Open(String hostname, ITransport transport)
at Amqp.Connection.Connect(SaslProfile saslProfile, Open open)
at Amqp.Connection..ctor(Address address, IHandler handler)
at Amqp.Connection..ctor(Address address)
/home/vsts/work/1/s/src/Amqp/Akka.Streams.Amqp.Tests/V1/AmqpConnectorsTest.cs(42,0): at Akka.Streams.Amqp.V1.Tests.AmqpConnectorsTest.Publish_and_consume_elements_through_a_simple_queue_again_in_the_same_process()
--- End of stack trace from previous location where exception was thrown ---
Output:
[07:51:11.297] SEND AMQP 3 1 0 0
[DEBUG][08/11/2020 19:51:11][Thread 0016][ActorSystem(test)] System shutdown initiated
[DEBUG][08/11/2020 19:51:11][Thread 0015][EventStream] Shutting down: StandardOutLogger started
[DEBUG][08/11/2020 19:51:11][Thread 0015][EventStream] All default loggers stopped
It appears from the frame log that the broker immediately closes the TCP socket as soon as the client sends a SEND AMQP 3 1 0 0
to start a handshake, and refuses any connection attempts after that.
For the standard AMQP protocol, the broker seemed to suddenly drops TCP connection on specific tests:
X Akka.Streams.Amqp.Tests.AmqpConnectorsTest.Pub_sub_from_one_source_with_multiple_sinks [5s 120ms]
Error Message:
System.AggregateException : One or more errors occurred. (Did not get at least one element from every fanout branch)
---- System.Exception : Did not get at least one element from every fanout branch
Stack Trace:
at System.Threading.Tasks.Task`1.GetResultCore(Boolean waitCompletionNotification)
at System.Threading.Tasks.Task`1.get_Result()
at Akka.Streams.Amqp.Tests.AmqpConnectorsTest.Pub_sub_from_one_source_with_multiple_sinks() in /home/vsts/work/1/s/src/Amqp/Akka.Streams.Amqp.Tests/AmqpConnectorsTest.cs:line 277
----- Inner Stack Trace -----
Standard Output Messages:
[ERROR][08/11/2020 19:51:12][Thread 0015][[akka://test/user/StreamSupervisor-2/Flow-0-0-unknown-operation#2121431516]] Error during PreStart in [AmqpSource]
Cause: RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
---> System.IO.IOException: Unable to write data to the transport connection: Cannot access a disposed object.
Object name: 'System.Net.Sockets.Socket'..
---> System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'System.Net.Sockets.Socket'.
at System.Net.Sockets.Socket.Send(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags, SocketError& errorCode)
at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
--- End of inner exception stack trace ---
at System.Net.Sockets.NetworkStream.Write(Byte[] buffer, Int32 offset, Int32 size)
at System.IO.BufferedStream.Flush()
at System.IO.BinaryWriter.Flush()
at RabbitMQ.Client.Impl.SocketFrameHandler.SendHeader()
at RabbitMQ.Client.Framing.Impl.Connection.StartAndTune()
at RabbitMQ.Client.Framing.Impl.Connection.Open(Boolean insist)
at RabbitMQ.Client.Framing.Impl.Connection..ctor(IConnectionFactory factory, Boolean insist, IFrameHandler frameHandler, String clientProvidedName)
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IFrameHandler fh)
at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints)
at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
--- End of inner exception stack trace ---
at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
at RabbitMQ.Client.ConnectionFactory.CreateConnection(IList`1 endpoints)
at Akka.Streams.Amqp.AmqpConnector.NewConnection(IConnectionFactory factory, IAmqpConnectionSettings settings) in /home/vsts/work/1/s/src/Amqp/Akka.Streams.Amqp/AmqpConnector.cs:line 71
at Akka.Streams.Amqp.AmqpSourceStage.AmqpSourceStageLogic.NewConnection(IConnectionFactory factory, IAmqpConnectionSettings settings) in /home/vsts/work/1/s/src/Amqp/Akka.Streams.Amqp/AmqpSourceStage.cs:line 75
at Akka.Streams.Amqp.AmqpConnectorLogic.PreStart() in /home/vsts/work/1/s/src/Amqp/Akka.Streams.Amqp/AmqpConnector.cs:line 106
at Akka.Streams.Implementation.Fusing.GraphInterpreter.Init(IMaterializer subMaterializer)
We need to modernize the WindowsAzure.Storage
package to the new Azure.Storage.Queues
:
From the package description:
NOTE: As of version 9.4.0, this library has been split into multiple parts and replaced: See https://www.nuget.org/packages/Microsoft.Azure.Storage.Blob/, https://www.nuget.org/packages/Microsoft.Azure.Storage.File/, https://www.nuget.org/packages/Microsoft.Azure.Storage.Queue/, and https://www.nuget.org/packages/Microsoft.Azure.Storage.Common/.
For table support, see https://www.nuget.org/packages/Microsoft.Azure.CosmosDB.Table/.
This client library enables working with the Microsoft Azure storage services which include the blob and file service for storing binary and text data, the table service for storing structured non-relational data, and the queue service for storing messages that may be accessed by a client.
For this release see notes - https://github.com/Azure/azure-storage-net/blob/master/README.md and https://github.com/Azure/azure-storage-net/blob/master/changelog.txt
Microsoft Azure Storage team's blog - http://blogs.msdn.com/b/windowsazurestorage/
I guess there are differences between RabbitMQ and ActiveMQ. I am getting RabbitMQ.Client.Exceptions.BrokerUnreachableException I know the ActiveMQ broker is up and accepting AMQP messages.
Could this be an issue RabbitMQ is 0.9 AMQP and activeMQ is 1.0 AMQP?
Has anyone worked on a port to .net core? If not, I can put a PR together. However, one issue is that the message GetBody() no longer exists on the Message, so a direction would need to be decided.
Is your feature request related to a problem? Please describe.
I think it would be awesome if the community had a set of Alpakka connectors for nats-net-v2.
Describe the solution you'd like
That we do the needful. I've done some basic scaffolding. General idea being with a clean starting API we can then optimize with a more native stage (unless akkadotnet/akka.net#7028 stalls, in that case maybe we do up front?)
I suppose my ask is ideally "let's get some feedback on what sort of API we think is useful" but will be happy with "sure do the PR and if you take the feedback we will approve".
Describe alternatives you've considered
Per above, we could get fancier with stages....
Inversely speaking, one can see there's nothing fancy in my gist and one could just party on Unfold
easily enough, however there's something to be said for providing a polite API for others.
Additional context
Per above I'm happy to own this and do the work :)
I brought this idea up to the Nats .Net folks and they were as always gracious and supportive, It sounds like they may be willing to provide feedback or other input but I will leave those comments via other channels.
Also Note, not sure if a concern: This would be a Net6.0+ only connector, as that is the baseline for the underlying library we are exposing via Alpakka.
In order to make it easier to debug connectivity issues with AWS Kinesis, we should add some debug logging in these areas:
Alpakka/src/AWS/Kinesis/Akka.Streams.Kinesis/KinesisSourceStage.cs
Lines 158 to 168 in c0ba451
Some pseudo code:
if(LogLevel.Debug.Enabled)
Log.Debug("No messages found in Kinesis shard {0} retrying in {1} seconds", etc...);
When I use a long running producer without message for longer then 10 minutes the kafka idle time reaper comes along and disconnects the producer. Giving an error. I have never had issues producing after this message.
The ProducerStageLogic however exits with a FailedStage. To my knowledge I don't think this should occur like this. This error is in the logs 9092/bootstrap: Receive failed: Disconnected
.
I am in doubt if this piece of code is correct, but I don't know 100%
Log.Error(error.Reason);
if (!KafkaExtensions.IsBrokerErrorRetriable(error) && !KafkaExtensions.IsLocalErrorRetriable(error))
{
var exception = new KafkaException(error);
FailStage(exception);
}
HI guys!
I am trying to create a Reconnectable Source with RestartSource using Akka Streams Amqp.V1 and some strange things happen. When I disconnect the network, RestartSource does not try to reconnect again and no message is printed in the logs. However, when I receive an invalid message (type incompatible) it restarts my source. I am not sure why this is so and I would like to know if you have already been there and if you could help me. Below is the Source creation code:
using System.Numerics;
using Akka.Serialization;
using Akka.Streams.Amqp.V1.Dsl;
using Akka.Streams.Dsl;
using Amqp;
using System;
using Akka.Actor;
using Address = Amqp.Address;
using Akka.Streams;
using Akka.Streams.Amqp.V1;
using System.Text;
namespace image_processor_akkanet
{
class Program
{
private static Connection connection;
private static Session session;
static void Main(string[] args)
{
var sys = ActorSystem.Create("AMQP-System");
var materializer = ActorMaterializer.Create(sys);
var serialization = sys.Serialization;
var serializer = serialization.FindSerializerForType(typeof(byte[]));
try
{
var address = new Address("0.0.0.0", 61616, "admin", "admin", scheme: "AMQP");
connection = new Connection(address);
session = new Session(connection);
var queueName = "akka.teste";
var receiverlinkName = "amqp-conn-test-sender";
//create source
var amqpSource = RestartSource.OnFailuresWithBackoff(
() => {
Console.WriteLine("Start/Restart...");
return AmqpSource
.Create(new NamedQueueSourceSettings<byte[]>(session, receiverlinkName, queueName, 200, serializer));
},
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(3),
0.2,
Int16.MaxValue
);
//run source
var result = amqpSource
.Throttle(1, TimeSpan.FromSeconds(1), 10, ThrottleMode.Shaping)
.Select(elem => Encoding.UTF8.GetString(elem, 0, elem.Length))
.RunForeach(elem => Console.WriteLine(elem), materializer);
result.Wait(TimeSpan.FromSeconds(Int16.MaxValue));
}
catch (Exception ex)
{
Console.WriteLine("Error " + ex.Message);
}
finally
{
session.Close();
connection.Close();
}
}
}
}
I tried using both RestartSource.OnFailuresWithBackoff and RestartSource.WithBackoff both to no avail
My project and dependencies:
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<RootNamespace>image_processor_akkanet</RootNamespace>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka" Version="1.4.11" />
<PackageReference Include="Akka.Streams" Version="1.4.11" />
<PackageReference Include="Akka.Streams.Amqp.V1" Version="1.0.0-beta2" />
</ItemGroup>
</Project>
I'm using ActiveMQ Artemis as a AMQP Broker.
Change DockerClient.Images.ListImagesAsync()
in docker fixture to use Filters
instead of MatchName
Version Information
NuGet versions
Akka, version 1.5.0
Akka.Streams.Azure.EventHub.V5, version 1.5.0.1
Describe the bug
Batched Event Hub processing not returning the number of events as expected, as in the total number of events (e.g. running single or multi processor samples in V5 Event Hub samples).
For example, ingesting 10000 events may yield 9000 ish.
Running samples mentioned above with non-batched processing works just as expected, as does a non-Akka.NET implementation (such as this).
All in all, the BatchedEventProcessorClient does not work as expected.
Been playing around with various batch setting (i.e. primarily batch sizing through EventProcessorClientOptions.CacheEventCount but to no avail.
If I set ingest batch size to something really low (such as 4, 10, 20) seemingly works better (less events lost), but with a batch size of 100-1000 the loss rate is 10%+.
To Reproduce
Steps to reproduce the behavior:
Links to working reproductions on Github / Gitlab are very much appreciated
Expected behavior
Regardless of the exact number of messages returned by each Event Hub batch, I expect the total number of egressed events to correspond to the number of ingested events.
Actual behavior
Total egressed and processed events are significantly lower compared to the number of send events.
Absolute data loss is less if ingest rate is lowered.
Version Information
Version of Akka.NET? N/A
Which Akka.NET Modules? Akka.Streams.EventHubs
Describe the bug
It's not actually a bug but a maintenance request as the Akka.Streams.EventHubs package is using SDK WindowsAzure.ServiceBus 6.2.0. In August 2017 a new SDK version was released by Microsoft to replace this SDK: Microsoft.Azure.ServiceBus. This SDK is however also already replaced by Azure.Messaging.EventHubs since October 2020.
Expected behavior
Upgrade to the latest version of Azure.Messaging.EventHubs SDK
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.