Code Monkey home page Code Monkey logo

rebus.amazonsqs's People

Contributors

ajacksms avatar dietermijle avatar jcmdsbr avatar jonathanyong81 avatar knepe avatar mookid8000 avatar mvandevy avatar theworstprogrammerever avatar thomaseg avatar xhafan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

rebus.amazonsqs's Issues

Support for FIFO queues

Hi
When using a queue with type "FIFO" the request fails with "The request must contain the parameter MessageGroupId." I have tried to set the Header "MessageGroupId" but that does not make any difference (it is not a Header, but a Parameter).

I think this library should support this, as the FIFO queue is a great feature in AWS.
Any thoughts?

deferCount is not increasing

Hi,

When we try to setup the second level retry configuration, the deferCount header increases only to 1, after that it is not increasing more.

Sometimes when it reaches the deferCount = 1, it is duplicating the messages.

Rebus.AmazonSqs: 6.2.0
.NET Framework: 4.7.1

public Task Handle(TestMessage message)
        {
            throw new Exception("whatever"); //Second level retry tests
        }

public Task Handle(IFailed<TestMessage> message)
        {
            var deferCount = Convert.ToInt32(context.Headers.GetValueOrDefault(Headers.DeferCount));
            if (deferCount >= 5)
            {
                return bus.Advanced.TransportMessage.Deadletter($"Failed after {deferCount} attempts\n\n{message.ErrorDescription}");
            }

var defer = 3 * (deferCount + 1);
            Console.WriteLine($"DEFER: {deferCount}, MINUTES: {defer}");
            return bus.Advanced.TransportMessage.Defer(TimeSpan.FromMinutes(defer));
        } 

Our workaround is use the CorrelationSequence header which is working as expected.

Problem with deferred messages

If I defer a message it appears in the senders input queue with defer headers. It never gets delivered to the destination queue.

Below the documentation from the wiki. Haven't configured anything regarding the timeout manager.


Transports WITH native support for deferred messages
When the transport supports delayed delivery, the message is usually deferred simply by setting a timestamp on the message when sending it, causing it to stay invisible in its destination queue until that time comes.

This works out-of-the-box with the Azure Service Bus tranport, the Azure Storage Queues transport, and the Amazon SQS transport. Curiously, it also works with the SQL Server transport, as all messages are provided with a visible DateTime2 value, which is used by the transport when querying for messages to receive.

Change visibility time on abort

Hello!

Can I control time for visibility? I want to add some pause after error, for example 2 minutes before it will be processed again.

image

Dependency package versions

Hi! We are using this package in our software and my colleagues Dieter Mijle and Rob van Pamel already contributed here.

We are working on introducing feature flags/toggles into our project. By using AWSSDK.AppConfig package, we have added a dependency to the AWSSDK.Core package version >= 3.5.2.10.
In this project there are references to AWSSDK.SQS 3.3.103.21 and AWSSDK.SecurityToken 3.3.105.38, which both are also dependent on the AWSSDK.Core, but limited to versions <= 3.4.0, which is why we would like to update the dependencies here to use newer versions of the Core package.

v3 and v4 compatiblity

It seems like between version 3 and 4 the way the messages were created has changed.

In v3 the entire message is base64 encoded and sent as the SQS message body and the headers are sent as message attributes and in v4 the header and body are sent in the sqs body, with only the body itself being base64 encoded.

This causes incompatiblities and forced upgrade between the versions which isn't an easy thing to do in all circumstances.

It would seem this could be worked around with a small change in the ExtractTransportMessageFrom method

TransportMessage ExtractTransportMessageFrom(Message message)
{
    var sqsMessage = _serializer.Deserialize(message.Body);
    return new TransportMessage(sqsMessage.Headers, GetBodyBytes(sqsMessage.Body));
 }

If the deserialise fails try to base64 decode the message then try the deserialisation again.

Any chance you would be able to include something like this?

Receiving SQS Messages from Eventbridge

I'm having the same issue as the person who asked #41. I have an Eventbridge filter sending a native SQS message that I would like to receive with Rebus.

My question is this - how do I access the raw message body of an SQS message in Rebus? Because the raw message isn't in the shape of the Rebus SQS message envelope, the Body property is always empty.

Rebus not subscribing on aws

Hi! I'm facing a problem that took me the entire afternoon without solution:
I have two separate projects: one just publish and one just subscribe. I can publish normally in a queue called reports, but when I try to subscribe I receive THIS response.

Can you help me?
Additionally, I couldn't get the right way to set it up. Do I need to create two different queues? One for publishing and one for subscribing?
what should I pass in UseAmazonSQS method?
Here is some code I have:

Publisher uses SImpleInjector

            _container.ConfigureRebus(c => 
                c.Logging(l => l.Console())
                .Transport(t => t.UseAmazonSQSAsOneWayClient(new AmazonSQSTransportOptions{
                    ClientFactory= ()=>
                    new Amazon.SQS.AmazonSQSClient(
                        _appSettings.QueueSettings.AwsAccessKey,
                        _appSettings.QueueSettings.AwsSecretKey,
                        Amazon.RegionEndpoint.USEast2),
                    CreateQueues=true,
                    ReceiveWaitTimeSeconds=20
                }))
                .Routing(r => r.TypeBased()
                    .Map<ReportExportEnvelope>("reports")
                )
                .Subscriptions(s => s.StoreInSqlServer(Configuration.GetConnectionString("Default"), "Subscriptions", isCentralized: true))
                .Start()
            );

Subscriber uses Default .net code IOC

services.AddSingleton<Processor>();
                    services.AutoRegisterHandlersFromAssemblyOf<Processor>();
                    services.AddRebus(configure => configure
                    .Logging(l => l.Console())
                    .Transport(t => t.UseAmazonSQS(
                        "reports_subscriber",
                        new AmazonSQSTransportOptions
                        {
                            ClientFactory = () =>
                            new Amazon.SQS.AmazonSQSClient(
                                appSettings.QueueSettings.AwsAccessKey,
                                appSettings.QueueSettings.AwsSecretKey,
                                Amazon.RegionEndpoint.USEast2),
                            CreateQueues = true,
                            ReceiveWaitTimeSeconds = 20
                        }
                    ))
                    .Routing(r => r.TypeBased()
                        .Map<ReportExportEnvelope>("reports")
                    )
                    .Subscriptions(s => s.StoreInSqlServer(hostBuilder.Configuration.GetConnectionString("Default"), "Subscriptions", isCentralized: true)));

                    using (var provider = services.BuildServiceProvider())
                    {
                        provider.UseRebus(async bus => await bus.Subscribe<ReportExportEnvelope>());
                    }

Thank you!

A message received from SQS just after bus disposal is made invisible in SQS

When a bus is disposed, the message receiving from SQS is cancelled, but when a message it sent by SQS quickly enough before the socket is closed, the message is marked by SQS as invisible, and will be handled only after the message visibility timeout.

The scenario above seems to be similar to what NServiceBus team reported for AWS SDK: aws/aws-sdk-net#796 (comment): It looks like there's a race condition when the request gets canceled. When the CancellationSource gets triggered control returns to the calling code but the socket isn't closed yet. So even though a message is sent after the previous ReceiveMessage request has been canceled the socket from the previous receive is still open. SQS replies over the old socket with the new message, considers it received, and subjects the message to the visibility timeout.

The suggested workaround not passing cancellationToken into sqsClient.ReceiveMessageAsync() but passing in CancellationToken.None works for me:
image
The downside is that the bus disposal is not immediate with this workaround, but it takes up to 20s.

The AWS SDK issue was closed in 2018, and @vellozi mentioned We're still looking into how the CancellationToken works within the HTTP client, and possible solutions to that.. But I don't think they've resolved it somehow, so I assume we will have to live with this workaround and the bus disposal taking up to 20s.

Original SO question: https://stackoverflow.com/questions/67656986/a-message-received-from-sqs-by-a-stopped-aws-ecs-instance-is-handled-by-rebus-on

Use of Rebus with "native" AWS messages?

We use Rebus extensively and it works great! But one thing annoys me with our current solution. We have a few cases where we get messages from AWS and these don't conform to the format Rebus needs. So we need to have some sort of AWS Lambda function to transform all "native" AWS messages like S3.ObjectCreated into a "rebus-friendly" message. This seems like a bad work-around at best and it is hard to maintain....

An example of a message would be the S3EventNotification which looks like this:

{
	"Records": [{
		"eventVersion": "2.1",
		"eventSource": "aws:s3",
		"awsRegion": "eu-central-1",
		"eventTime": "2022-06-30T08:50:15.572Z",
		"eventName": "ObjectCreated:Put",
		"userIdentity": {
			"principalId": "AWS:xxxxx"
		},
		"requestParameters": {
			"sourceIPAddress": "111.111.111.111"
		},
		"responseElements": {
			"x-amz-request-id": "XFF3WE66ZXXXXXXP",
			"x-amz-id-2": "7HgI9ICJHuazq8nB10RFqdDdeidxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxyajiV9FpQIw4Yhtjx"
		},
		"s3": {
			"s3SchemaVersion": "1.0",
			"configurationId": "xxxxxxxxxxx",
			"bucket": {
				"name": "myBucketName",
				"ownerIdentity": {
					"principalId": "A3AXXXXXXXXXX"
				},
				"arn": "arn:aws:s3:::myBucketName"
			},
			"object": {
				"key": "somefile.txt",
				"size": 123,
				"eTag": "786d8a1xxxxxxxxxxxxxxxef2",
				"sequencer": "00XXXXXXXXXXXXXX7"
			}
		}
	}]
}

I tried looking into hooking into the pipeline as discussed somewhere on stack-overflow, but I was getting in over my head and thought that I couldn't be the only person with this issue, so instead i started to think that I must be missing something obvious...

Is there any (good) way of getting these messages through Rebus?

sqs:SendMessage permission is required

I'm not sure what triggers the difference between needing sqs:SendMessageBatch and sqs:SendMessage, but the latter is apparently needed sometimes. The README in this repo only mentions the former permission.

In my test, I got the error below because I did not add sqs:SendMessage permission to the IAM role I use for my fargate service (which is a .NET application that sends and receives SQS messages with Rebus). The stack trace is below. I hope this helps identify the cause of the discrepancy. Is this simply a matter of outdated documentation?

[16:27:45 ERR] HTTP POST /api/deployments responded 500 in 1005.8799 ms
Amazon.SQS.AmazonSQSException: User: SNIP is not authorized to perform: sqs:sendmessage on resource: SNIP because no identity-based policy allows the sqs:sendmessage action
 ---> Amazon.Runtime.Internal.HttpErrorResponseException: Exception of type 'Amazon.Runtime.Internal.HttpErrorResponseException' was thrown.
   at Amazon.Runtime.HttpWebRequestMessage.GetResponseAsync(CancellationToken cancellationToken)
   at Amazon.Runtime.Internal.HttpHandler`1.InvokeAsync[T](IExecutionContext executionContext)
   at Amazon.Runtime.Internal.Unmarshaller.InvokeAsync[T](IExecutionContext executionContext)
   at Amazon.SQS.Internal.ValidationResponseHandler.InvokeAsync[T](IExecutionContext executionContext)
   at Amazon.Runtime.Internal.ErrorHandler.InvokeAsync[T](IExecutionContext executionContext)
   --- End of inner exception stack trace ---
   at Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.HandleExceptionStream(IRequestContext requestContext, IWebResponseData httpErrorResponse, HttpErrorResponseException exception, Stream responseStream)
   at Amazon.Runtime.Internal.HttpErrorResponseExceptionHandler.HandleExceptionAsync(IExecutionContext executionContext, HttpErrorResponseException exception)
   at Amazon.Runtime.Internal.ExceptionHandler`1.HandleAsync(IExecutionContext executionContext, Exception exception)
   at Amazon.Runtime.Internal.ErrorHandler.ProcessExceptionAsync(IExecutionContext executionContext, Exception exception)
   at Amazon.Runtime.Internal.ErrorHandler.InvokeAsync[T](IExecutionContext executionContext)
   at Amazon.Runtime.Internal.CallbackHandler.InvokeAsync[T](IExecutionContext executionContext)
   at Amazon.Runtime.Internal.Signer.InvokeAsync[T](IExecutionContext executionContext)
   at Amazon.Runtime.Internal.EndpointDiscoveryHandler.InvokeAsync[T](IExecutionContext executionContext)
   at Amazon.Runtime.Internal.EndpointDiscoveryHandler.InvokeAsync[T](IExecutionContext executionContext)
   at Amazon.Runtime.Internal.CredentialsRetriever.InvokeAsync[T](IExecutionContext executionContext)
   at Amazon.Runtime.Internal.RetryHandler.InvokeAsync[T](IExecutionContext executionContext)
   at Amazon.Runtime.Internal.RetryHandler.InvokeAsync[T](IExecutionContext executionContext)
   at OpenTelemetry.Contrib.Instrumentation.AWS.Implementation.AWSTracingPipelineHandler.InvokeAsync[T](IExecutionContext executionContext)
   at Amazon.Runtime.Internal.CallbackHandler.InvokeAsync[T](IExecutionContext executionContext)
   at Amazon.Runtime.Internal.CallbackHandler.InvokeAsync[T](IExecutionContext executionContext)
   at Amazon.Runtime.Internal.ErrorCallbackHandler.InvokeAsync[T](IExecutionContext executionContext)
   at Amazon.Runtime.Internal.MetricsHandler.InvokeAsync[T](IExecutionContext executionContext)
   at Rebus.AmazonSQS.AmazonSqsTransport.<SendOutgoingMessages>b__23_1(IGrouping`2 batch)
   at Rebus.AmazonSQS.AmazonSqsTransport.SendOutgoingMessages(ConcurrentQueue`1 outgoingMessages)
   at Rebus.AmazonSQS.AmazonSqsTransport.<>c__DisplayClass21_1.<<Send>b__1>d.MoveNext()
--- End of stack trace from previous location ---
   at Rebus.Transport.TransactionContext.InvokeAsync(Func`2 actions)
   at Rebus.Transport.TransactionContext.Complete()
   at Rebus.Bus.RebusBus.InnerSend(IEnumerable`1 destinationAddresses, Message logicalMessage)
   at Rebus.Bus.RebusBus.Send(Object commandMessage, IDictionary`2 optionalHeaders)
   at Monitor.Deployment.Services.DeploymentOperationRebusExtensions.SendCopyDeployment(IBus bus, String stagedBucket, String sourceBuildId, String deploymentsBucket, String deploymentId) in /home/vsts/work/1/s/src/Monitor.Deployment/Services/DeploymentOperationRebusExtensions.cs:line 25
   at Monitor.Deployment.Services.CopyDeploymentService.TryStartCopyAsync(String stagedBucket, String sourceBuildId, String deploymentsBucket, String deploymentId) in /home/vsts/work/1/s/src/Monitor.Deployment/Services/CopyDeploymentService.cs:line 33
   at Monitor.Deployment.Deployments.DeploymentsController.PostAsync(CopyStagedBuild copyStagedBuild) in /home/vsts/work/1/s/src/Monitor.Deployment/Deployments/DeploymentsController.cs:line 94
   at Microsoft.AspNetCore.Mvc.Infrastructure.ActionMethodExecutor.TaskOfIActionResultExecutor.Execute(ActionContext actionContext, IActionResultTypeMapper mapper, ObjectMethodExecutor executor, Object controller, Object[] arguments)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeActionMethodAsync>g__Logged|12_1(ControllerActionInvoker invoker)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeNextActionFilterAsync>g__Awaited|10_0(ControllerActionInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Rethrow(ActionExecutedContextSealed context)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeInnerFilterAsync>g__Awaited|13_0(ControllerActionInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeFilterPipelineAsync>g__Awaited|20_0(ResourceInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Logged|17_1(ResourceInvoker invoker)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Logged|17_1(ResourceInvoker invoker)
   at Microsoft.AspNetCore.Routing.EndpointMiddleware.<Invoke>g__AwaitRequestTask|6_0(Endpoint endpoint, Task requestTask, ILogger logger)
   at Microsoft.AspNetCore.Authorization.AuthorizationMiddleware.Invoke(HttpContext context)
   at Serilog.AspNetCore.RequestLoggingMiddleware.Invoke(HttpContext httpContext)

Unable to subscribe to messages

Hi

I want to migrate message queue from sql server to SQS.

`
_activator = new BuiltinHandlerActivator();
_activator.Register(() => new Example.ExampleHandler(_activator.Bus));
Configure.With(_activator)
.Transport(x => x.UseAmazonSQS(SqsAcessKey, SqsSecretKey, RegionEndpoint, queueName))
.Routing(r => r.TypeBased().MapAssemblyOf(queueName))
.Subscriptions(t => t.StoreInSqlServer(ConnectionString, "Subscriptions", isCentralized: true))
.Options(b =>
{
b.SimpleRetryStrategy(
errorQueueAddress: $"{queueName}_Errors",
maxDeliveryAttempts: 1,
secondLevelRetriesEnabled: true);
b.SetNumberOfWorkers(2);
b.SetMaxParallelism(2);
})
.Start();

Bus.Subscribe().Wait();

`

This piece of code isn't working, It throws an exception at the line Bus.Subscribe().Wait(); "unable to access queue" and I dont want to use a centralized Subscription storage which should be sql server. So, I changed the transport to

`
_activator = new BuiltinHandlerActivator();
_activator.Register(() => new Example.ExampleHandler(_activator.Bus));
Configure.With(_activator)
.Transport(t => t.UseAmazonSnsAndSqs(
amazonCredentialsFactory: awsCredentials,
amazonSqsConfig: SQSConfig,
amazonSimpleNotificationServiceConfig: SNSConfig,
workerQueueAddress: queueName))
.Routing(r => r.TypeBased().Map(queueName))
.Options(b =>
{
b.SimpleRetryStrategy(
errorQueueAddress: $"{queueName}_Errors",
maxDeliveryAttempts: 1,
secondLevelRetriesEnabled: true);
b.SetNumberOfWorkers(2);
b.SetMaxParallelism(2);
})
.Start();

Bus.Subscribe().Wait();

`
This also throws an exception at the line Bus.Subscribe().Wait(); "unable to access queue".

And I'm publishing messages like this.

`
public IBus GetMessageBusForPublish(string connectionString)
{
if (_messageBus == null)
{
using (var activator = new BuiltinHandlerActivator())
{

            Configure.With(activator)
                .Transport(x => x.UseAmazonSQSAsOneWayClient(accessKey, secretKey, RegionEndpoint))
                .Start();

            _messageBus = activator.Bus;
        }
    }
    return _messageBus;
}

`

I am not getting what I'm doing wrong. Please do help.

Rebus is publishing to queues it's not configured for

I'm having issues with running Rebus over Amazon SQS.

I have a service that runs on 3 different levels; test, qat, prod. I want independent queues for each level of service instance, so not to interfere with each other even though it's the same types we're sending (same Assembly). Alright, we have two services: a publisher and consumer of messages.

The publisher is configured as follows:

var sqsConfig = new AmazonSQSConfig() { RegionEndpoint = RegionEndpoint.USEast1 };
                var credentials = new BasicAWSCredentials(Configuration.AWSKey, Configuration.AWSSecret);
                _sqsBus = Configure.With(new BuiltinHandlerActivator())
                    .Transport(t => t.UseAmazonSQS(credentials, sqsConfig, _nameOfQueue))
                    .Routing(r => r.TypeBased().MapAssemblyOf<TemplateTestEmail>(_nameOfQueue))
                    .Subscriptions(s => s.UseJsonFile(_jsonFilePath))
                    .Start();

                return _sqsBus;

where _nameOfQueue is "myservice-test-publisher". I Send messages like

_bus.Publish(message);

Then I have a consumer with the following config:

_bus = Configure.With(_activator)
                .Transport(t => t.UseAmazonSQS(credentials, sqsConfig, _consumerQueue))
                .Routing(r => r.TypeBased().MapAssemblyOf<TemplateTestEmail>(_publisherQueue))
                .Options(o => o.SimpleRetryStrategy(_errorQueue))
                .Start();

where _consumerQueue is "myservice-test-consumer", _publisherQueue is "myservice-test-publisher", and _errorQueue is "myservice-test-error". Under this config everything is running fine; All published messages are consumed by the consumer. Usage is straight forward with IHandleMessages handlers and subscription like

  _bus.Subscribe<Type...>().Wait();

And everything is running fine.

Now, lets say I shut down my publisher service and restart it with _nameOfQueue is "myservice-qat-publisher". It starts and creates the right queue, e.g. "myservice-qat-publisher", but when I publish a message it ends up being consumed by the "test" consumer on queue "myservice-test-consumer". Note it does not matter if I shut down a consumer or vice versa, no matter what every consumer ends up consumer messages from other levels (test, qat, prod).

However, I do notice that, if I start up a consumer with no publishers running, I can tell that the Subscribe requests goes to the right queues, e.g. in the test level the test consumer will send the subscribe messages with the correct header:

"rbs2-return-address": "https://sqs.(...)/myservice-test-consumer",

to the myservice-test-publisher queue which of course will be consumed on startup of the publisher.

My question is why does this happen, and is there a way to solve it? I note that there's not support for Amazon SNS (topics), which I guess would solve my problem. I've used Rebus before with RabbitMQ without any issues.

My setup is as follows:

  • for all services there will only be one consumer
  • there might be more than one publishers to a given queue.
  • there are 3 levels (test, qat, prod)
  • we use same aws credentials, region etc. for all levels and queus.

Change casing in package references.

Hi,

Could you change line 51 in https://github.com/rebus-org/Rebus.AmazonSQS/blob/master/Rebus.AmazonSQS/Rebus.AmazonSQS.csproj to use the correct casing.
If you are using .NET Core with project.json files this causes an issue that rebus cannot be found (since it should be Rebus)

Before:

<ItemGroup>
    <PackageReference Include="awssdk.sqs" Version="3.3.1.10" />
    <PackageReference Include="rebus" Version="4.0.0-b05" />
</ItemGroup>

After:

 <ItemGroup>
    <PackageReference Include="awssdk.sqs" Version="3.3.1.10" />
    <PackageReference Include="Rebus" Version="4.0.0-b05" />
</ItemGroup>

Thanks

support for SNS and Kinesis

We're about to implement SNS subscription store and transport and soon after, we'll add support for Kinesis streams. I'm curious if you've looked into these and have any gotcha's you'd like to share.

Our plan is to create

  • Rebus.AmazonSNS repo with a package containing

    • SnsTransport: publish only
    • SnsSubscriptionStore
    • SnsSqsTransport: uses the SnsTransport for publish & this SqsTransport for receive
  • Rebus.AmazonKinesis repo with a package containing

    • KinesisTransport: publish and receive. will implement KCL's IRecordProcessor to feed messages from KCL into the rebus pipeline.
    • (considering) update to SnsSubscriptionStore or new SnsKinesisTransport to create a lambda that will forward from SNS to a Kinesis stream.

Does that sound reasonable? Do you have any recommendations or requests?

Messages that have exhausted the retry count are being given an invalid delay value when passed to SQS

We have an implementation of IHandleMessages<IFailed<>> which throws an exception when the maximum retry count has been met, so that Rebus will place it in the dead letter queue. We have enabled the UseNativeDeferredMessages option.

Rebus does this, however, the value of DelaySeconds passed to SQS is a negative integer, which is invalid and SQS throws an exception. When we are iterating through our retries, we are using Defer, which updates the rbs2-deferred-until header value with a new time when the message should be retried. On the final attempt, this value is not updated.

I am of the opinion that when the GetDelaySeconds method in AmazonSqsTransport.cs is called with a DeferredUntil value in the header that is before the current time, then GetDelaySeconds should return zero, as that message should be processed without delay.

Detail of the error message thrown from the API:
Rebus.Workers.ThreadPoolBased.ThreadPoolWorker An error occurred when attempting to complete the transaction context System.AggregateException: One or more errors occurred. ---> Amazon.SQS.AmazonSQSException: Failed Value -8 for parameter DelaySeconds is invalid. Reason: DelaySeconds must be >= 0 and <= 900. with Code=InvalidParameterValue, SenderFault=True

Suggestion: Add required ACL's to readme.md?

While setting up a service using Rebus.AmazonSQS i needed to know what permissions the service required, i had to browse the code to see which endpoints where called. Maybe it could be something to write in the readme.md under some sort of requirements paragraph?

This is the policy i've found to be working:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:GetQueueUrl",
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessageBatch",
                "sqs:SendMessageBatch",
                "sqs:ReceiveMessage",
                "sqs:DeleteQueue",
                "sqs:CreateQueue",
                "sqs:SetQueueAttributes"
            ],
            "Resource": "[Your SQS queue arn here]"
        }
    ]
}

Note: If your sevice don't own the queue(aka CreateQueue: False) then the last 3 permissions can/should be omitted...

No possibility to add custom headers

By default Rebus uses about 8 headers, which it adds to each message. The number of headers is increased by 2 whenever a message is poisonous. So by the time it reaches the poison queue it has 10 headers. 10 is however the upper limit of the number of headers allowed on an SQS message. See: 'http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-attributes.html' which states:

Each message can have up to 10 attributes.

As a result you are no longer possible to add your own header(s) to a message. What actually happens is that when you've added an additional header, so you have 9. And the message is poisonous, then the failing message can't even be placed on the poison queue. Rebus stays in a perpetual retry ๐Ÿ˜ข

I would propose to have a additional configuration option which would tell the SQS transport to pack the 'known' rebus header into a single header attribute on SQS. This would allow you add up to at least 9 custom header. On the receiving side the SQS library would just need to check for the 'packed together key' in order to know how to parse the headers.

I can create a pull request for this as I've already solved the issue as described above, but I'd like to know if this is a good approach.

Doesn't work with .net core 2.1

Just thought I'd drop this here after a few hours of frustrated debugging.

Running under core 2.1 gives the error when authenticating:

The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method

Switching to core 2.0 works fine. Though, that will present a challenge for me.

Error sending too many messages to SQS with version 4.01

We have many threads which share the same IBus instance. This worked fine with version 3.0 of Rebus.AmazonSQS but as soon as we upgraded to version 4.0.1 we ran into a problem with messages failing to send when we attempt to process larger batches of items:

2018-01-09 15:54:32,898 [8] WARN Rebus.Retry.ErrorTracking.InMemErrorTracker - Unhandled exception 1 while handling message with ID "50494559-d298-41b4-bdde-8933a4acda7a"
Amazon.SQS.Model.TooManyEntriesInBatchRequestException: Maximum number of entries per request are 10. You have sent 48. ---> Amazon.Runtime.Internal.HttpErrorResponseException: The remote server returned an error: (400) Bad Request. ---> System.Net.WebException: The remote server returned an error: (400) Bad Request

I saw nothing in the config which would allow us to restrict Rebus to only send 10 messages at a time. Also I'm curious why this was logged as a warning, shouldn't it be an error since the messages were not sent?

Port to .NET standard

The library isn't compatible with .NET standard yet. Just like I've (just) created a pull request that would make the Rebus main lib compatible with both .NET Standard and .NET 4.5, I'd like to port the library to be compatible with both target frameworks. How can I proceed? Would you be able to give me a branch for this? Or can I create a pull request for the master branch?

Support creation of FIFO queues

I searched existing issues and found #24, but it is unclear to me what that intended to accomplish. Sounds like it looks at message headers in some way but I don't fully get how that supports FIFO queues.

Looking at the code, I see that the AmazonSqsTransport class does not support the FifoQueue attribute. This attribute is not specified in the CreateQueue() method.

I'm able to pass in the URL of an existing FIFO queue, but it doesn't seem like I can use Rebus to create that queue for me.

Also worth noting that you can't change an existing queue to FIFO, so the call to SetQueueAttributesAsync() will not be able to add the FIFO attribute to existing queues, requiring a new failure path I think.

Transforming messages from AWS

I have messages sent to SQS from other AWS services which cant be used with Rebus due to the format (Body field is the full body and doesnt contain a mix of header + body). Is there any plan to provide the ability to transform messages to adapt messages into something Rebus can handle?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.