Code Monkey home page Code Monkey logo

rebus.azureservicebus's People

Contributors

benne avatar binick avatar dependabot[bot] avatar eeskildsen avatar ehabelgindy avatar georgechond94 avatar hjalle avatar jorgenbosman avatar jr01 avatar lezzi avatar meyce avatar mookid8000 avatar riezebosch avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rebus.azureservicebus's Issues

Lock Expiration

Hello,

I am seeing the following exception happen in my message handler when I have a break point set and wait for about 30 seconds. I know rebus sets the default lock timeout to five minutes but I can't figure out why this would cause an exception. Any help would be greatly appreciated.

        public async Task Handle(CreateUser message)
        {
            Console.WriteLine(message.Name);

            string msg = $"Hello {message.Name} from Rebus";

            await this.bus.Publish(new UserCreated { Name = msg, CreatedAt = DateTime.UtcNow });
        }
[ERR] Rebus.Workers.ThreadPoolBased.ThreadPoolWorker (Thread #4): Unhandled exception in thread pool worker
Rebus.Exceptions.RebusApplicationException: Could not abandon message with ID cff3dcc0-c042-4510-b5fe-57d44d80fbb6 and lock token 30a3b88e-19f4-4d49-87ed-8ee951350095 ---> Microsoft.Azure.ServiceBus.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance.
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<DisposeMessagesAsync>d__99.MoveNext() in C:\source\azure-service-bus-dotnet\src\Microsoft.Azure.ServiceBus\Core\MessageReceiver.cs:line 1425
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Microsoft.Azure.ServiceBus.RetryPolicy.<RunOperation>d__20.MoveNext() in C:\source\azure-service-bus-dotnet\src\Microsoft.Azure.ServiceBus\RetryPolicy.cs:line 86
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at Microsoft.Azure.ServiceBus.RetryPolicy.<RunOperation>d__20.MoveNext() in C:\source\azure-service-bus-dotnet\src\Microsoft.Azure.ServiceBus\RetryPolicy.cs:line 70
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<AbandonAsync>d__69.MoveNext() in C:\source\azure-service-bus-dotnet\src\Microsoft.Azure.ServiceBus\Core\MessageReceiver.cs:line 582
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c__DisplayClass28_0.<<Receive>b__6>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Rebus.Internals.AsyncHelpers.CustomSynchronizationContext.<<Run>b__7_0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at Rebus.Internals.AsyncHelpers.CustomSynchronizationContext.Run()
   at Rebus.Internals.AsyncHelpers.RunSync(Func`1 task)
   at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c__DisplayClass28_0.<Receive>b__1()
   --- End of inner exception stack trace ---
   at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c__DisplayClass28_0.<Receive>b__1()
   at Rebus.Transport.TransactionContext.Invoke(ConcurrentQueue`1 actions)
   at Rebus.Transport.TransactionContext.RaiseAborted()
   at Rebus.Transport.TransactionContext.Dispose()
   at Rebus.Workers.ThreadPoolBased.ThreadPoolWorker.<TryAsyncReceive>d__16.MoveNext()

Please provide a compat for V7 Naming Breaking change

Hey,

We have ~20 queues and ~300 topics, and it will not be possible to update all the endpoints in the same time. Would it be possible to provide a configuration option in the final v7 release to use the "legacy" v6 naming model? Something like this:

.Transport(t => t.UseAzureServiceBus("connection", "worker.input")
                        .AutomaticallyRenewPeekLock()
                        .UseLegacyNaming()
)

messages moved from azure subscription to azure queue before being dispatched to code ?

Hi, I had a look at how rebus/"rebus asb transport" actually do their magic using the ASB explorer.

It seems to me that messages are sent to the subscription and then moved to a queue before being dispatched to the application code subscriber.
Is this documented somehow ? isn't it possible to dispatch to code the message directly from the subsription ?

thank you
enrico

All exception from renewing peek locks are being swallowed and not logged

The method in MessageLockRenewer.Renew is swallowing all exceptions:

public async Task Renew()
{
	try
	{
		await _messageReceiver.RenewLockAsync(_message);

		SetNextRenewal();
	}
	catch { } //< will automatically be retried if it fails

}

Because of that the is no way exceptions would be logged in AzureServiceBusTransport.RenewPeekLocks:

try
{
	await r.Renew().ConfigureAwait(false);
	
	_log.Debug("Successfully renewed peek lock for message with ID {messageId}", r.MessageId);
}
catch (Exception exception)
{
	_log.Warn(exception, "Error when renewing peek lock for message with ID {messageId}", r.MessageId);
}

V6: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue

Hello,
After updating the transport from v5 to v6, that changed the driver from WindowsAzure.ServiceBus to Microsoft.Azure.ServiceBus, we started to get thousands of exceptions of type "Microsoft.Azure.ServiceBus.MessageLockLostException":
"The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue", originating from "Rebus.AzureServiceBus.AzureServiceBusTransport", with the stack trace:

at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<OnRenewLockAsync>d__93.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<>c__DisplayClass74_0.<<RenewLockAsync>b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Microsoft.Azure.ServiceBus.RetryPolicy.<RunOperation>d__20.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at Microsoft.Azure.ServiceBus.RetryPolicy.<RunOperation>d__20.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<RenewLockAsync>d__74.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Rebus.AzureServiceBus.AzureServiceBusTransport.<RenewPeekLock>d__32.MoveNext()

We have .AutomaticallyRenewPeekLock() enabled, the queues were created by Rebus with the previous driver, the lock duration is 5 minutes.

Our config is (over Autofac):

builder
                .RegisterRebus(config => config
                    .Serialization(s => s.UseJil(Jil.Options.ISO8601IncludeInherited))
                    .Transport(t => t.UseAzureServiceBus(ConfigurationManager.ConnectionStrings["WorkerAzureServiceBus"].ConnectionString, "worker.input")
                        .AutomaticallyRenewPeekLock())
                    .Options(o => o.SimpleRetryStrategy("worker.error"))
                    .Options(o => o.SetNumberOfWorkers(5))
                    .Options(o => o.SetMaxParallelism(10))
                    .Options(o => o.SetWorkerShutdownTimeout(TimeSpan.FromMinutes(4)))
                    .Routing(r => r.TypeBased())
                       );

Any hint?

Application crash and exits when Rebus.Exceptions.RebusApplicationException is raised

Unhandled Exception: Rebus.Exceptions.RebusApplicationException: Could not abandon message with ID 7179773f-e1cc-4877-970f-b0eab05c9581 and lock token 7a4c77de-ff1e-459c-9c66-07b9bea3f621 ---> Microsoft.Azure.ServiceBus.ServiceBusTimeoutException: The operation did not complete within the allocated time 00:00:59.9999661 for object dispose. ---> System.TimeoutException: The operation did not complete within the allocated time 00:00:59.9999661 for object dispose.
at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.<>c__DisplayClass99_0.b__1(IAsyncResult a)
at System.Threading.Tasks.TaskFactory1.FromAsyncCoreLogic(IAsyncResult iar, Func2 endFunction, Action1 endAction, Task1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable1 lockTokens, Outcome outcome) --- End of inner exception stack trace --- at Microsoft.Azure.ServiceBus.Core.MessageReceiver.DisposeMessagesAsync(IEnumerable1 lockTokens, Outcome outcome)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func1 operation, TimeSpan operationTimeout) at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageReceiver.AbandonAsync(String lockToken, IDictionary`2 propertiesToModify)
at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c__DisplayClass37_0.<b__1>d.MoveNext()
--- End of inner exception stack trace ---
at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c__DisplayClass37_0.<b__1>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location where exception was thrown ---
at System.Threading.ThreadPoolWorkQueue.Dispatch()


The "renewal peek lock" task stops before actually terminating the handler, causing another thread to process the message again in another thread and "invalid lock toke / message already removed" errors in the original one.

Hey,

I've noticed that sometimes, with the renewal peek lock enabled, the same message is consumed and processed by multiple threads in parallel, when specific conditions are met.

I've debugged the Rebus Azure Transport and I believe I've found the reason for this behavior.

Imagine the following:

  • The ASB lock time is 60 seconds
  • Transport Renewal is enabled.
  • The handler objective is to generate 1.000 new messages to "bus.Send".
  • It takes 59 seconds to run the handler code.

What happens in this scenario is that because of the transaction, both effective message dispatch and stopping of renewal task are callbacks executed on transaction commit:

  1. Renewal callback (

    context.OnCommitted(async () => renewalTask.Dispose());
    )

  2. Dispatch callback (

    )

The problem is that, when executing the commit callbacks in TransactionContext, the renewal process is killed before dispatching the messages, thus, making it possible for rebus to re-consume the same message in another thread, while the original one is still dispatching the stored messages. When the original thread is done processing and calls for ASB CompleteAsync (or AbandonAsync for that matter) with the lock token, the infamous error is thrown, because the token is, indeed, invalid - the message is being consumed by another thread and now has a different token.

This creates an infinite cycle generating messages and errors ad infinitum.

The lock renewal process should be the last one to get killed with a small delay after disposing the message, to guarantee the correct onCommit process.

Regards,
Stan

Add support for Session queues

Hi,

I am trying to use Rebus at work along with Azure, and while gathering requirements, it came that we might need ASB's ability to have session queues (group-id in AMQP). As I have looked through the code, the current Rebus.AzureServiceBus transport does not create or utilize session queues. Is there any plan to support it, or is it a conscious decision to leave it out? I have written some test code and it seems doable in Rebus.

Please let me know your thought and I appreciate your attention. If you would like, I could contribute to the code.

Best Regards,
Patrick

Error header exceeding 65K

A bug in a message handler resulted in a rather large stacktrace. After the default number of Rebus retries got this error:

An error occurred when attempting to complete the transaction contextRebus.Exceptions.RebusApplicationException: Could not send to queue 'error' --->
Microsoft.Azure.ServiceBus.MessageSizeExceededException: A serialization error occurred while processing the message header stream with sequence number 207001985.
The serialization process return the following message: The size quota for this stream (65536) has been exceeded.. (...)
at Microsoft.Azure.ServiceBus.Core.MessageSender.d__52.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at Microsoft.Azure.ServiceBus.RetryPolicy.d__19.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at Microsoft.Azure.ServiceBus.RetryPolicy.d__19.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at Microsoft.Azure.ServiceBus.Core.MessageSender.d__39.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c__DisplayClass32_0.<b__3>d.MoveNext()
--- End of inner exception stack trace ---
at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c__DisplayClass32_0.<b__3>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c__DisplayClass32_1.<b__1>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at Rebus.Transport.TransactionContext.d__24.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at Rebus.Transport.TransactionContext.d__21.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at Rebus.Transport.TransactionContext.d__19.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
at Rebus.Workers.ThreadPoolBased.ThreadPoolWorker.d__19.MoveNext()

This is with Rebus.AzureServicebus 7.0.0-a12 and Rebus 5.2.1.

I'm not sure (yet) if this would also have occurred with older Rebus/Rebus.AzureServicebus versions, at least I haven't run into this in the past few years.

A manage permission is required send/receive messages using existing queues

Background

I am trying to start a 2-way bus that is configured to use an existing Azure Service Bus Queue. The connection string I am passing is restricted to read and listen permission, there is no manage permission configured on the access policy. Here's a sample code that can be used to replicate the issue:

            using (var activator = new BuiltinHandlerActivator())
            {
                Configure.With(activator)
                    .Logging(l => l.ColoredConsole())
                    .Transport(t => t.UseAzureServiceBus(connectionString, "myqueue").DoNotCreateQueues())
                    .Start();
            }

Forcing connection strings to have manage permissions might cause security issues. Ideally, application should not have such elevated permission if it's not needed.

Expected behavior

Bus starts normally and application can send and receive messages.

Actual behavior

An exception is thrown

Unhandled Exception: Rebus.Injection.ResolutionException: Could not resolve Rebus.Bus.IBus with decorator depth 0 - registrations: Rebus.Injection.Injectionist+Handler ---> Rebus.Exceptions.RebusApplicationException: Could not get queue description for queue myqueue ---> Microsoft.Azure.ServiceBus.UnauthorizedException: Manage claim is required for this operation

Digging more into the code, I found that the issue is caused by this line of code

return await _managementClient.GetQueueAsync(address, _cancellationToken).ConfigureAwait(false);

It looks like the 'managementClient.GetQueueAsync' method requires a manage permission.

Deferring a message onto another input queue using azure service bus

Hi Mookid!
During our normal day development we are using rebus all over the place, its pretty awesome
but we stumbled into issue regarding deferring message onto another input queue ๐Ÿ˜…

Our Configuration using the latest rebus 5.4.1, using azure service bus one-way configuration

 builder.Services.AddRebus(c => c
    .Logging(l => l.ColoredConsole())
    .Transport(t => t.UseAzureServiceBusAsOneWayClient(Environment.GetEnvironmentVariable("HostingServiceBus")))
    .Routing(r => r.TypeBased()
        .Map<CheckRestoreDatabaseStatusCommand>("hosting-restoredb-checkstatus-input")
    ));

All handy dandy, later we do the following.

await _bus.Defer(TimeSpan.FromMinutes(2), new CheckRestoreDatabaseStatusCommand
{
    EnvironmentId = command.EnvironmentId,
    ContainerName = containerInstanceName,
    ContainerResourceGroup = containerInstanceResourceGroup
});

Since we are using Azure Service Bus, with built in deferral mechanism I would expect that this would just work, however when i run the code i receive and error from rebus telling me Cannot use ourselves as timeout manager because we're a one-way client.

It's a bit odd now that given that no timeout manager would be needed ๐Ÿค”.

Venturing further into rebus Defer method, I validated that the destination address is correctly set to hosting-restoredb-checkstatus-input, but when Rebus is trying to fetch is internal timeout manager hell breaking loose and the GetTimeoutManagerAddress throwing the exception seen above.

Looking at the issue it seems like rebus first tries to get the address from the central timeout manager (which we haven't, bc ASB), secondly we are trying to get rebus own input queue which also fails because we are a one-way client.

I am unsure what the remedy for this is, because it seems only be an issue with transports which has native deferral support.

Though not pretty, i've found a temporary workaround :)

var time = RebusTime.Now + TimeSpan.FromMinutes(2);
var str = time.ToIso8601DateTimeOffset();

await _bus.Send(new CheckRestoreDatabaseStatusCommand
{
    EnvironmentId = command.EnvironmentId,
    ContainerName = containerInstanceName,
    ContainerResourceGroup = containerInstanceResourceGroup
}, new Dictionary<string, string>()
{
    { Headers.DeferredUntil, str },
    { Headers.DeferredRecipient, "hosting-restoredb-checkstatus-input" }
});

If you happen to have a good solution for suppressing the Cannot use ourselves as timeout manager because we're a one-way client, and using the routing slip when using Azure Service bus, i would be happy to send a PR ๐Ÿ˜ƒ

EnsureQueue/Topic/Subscription exist exception handling can be improved

Testing the code locally I found that race conditions can lead to queues/topics/subs to throw exceptions that might be catcheable.
Basically if the entity is being created by another process and is not yet available a ServiceBusException is thrown with the "SubCode=40901." as part of the message (there is no specific exception, that's the best way I found to catch that.
The following is a piece of code that demonstrates how I did a workaround (obviouslly code must be improved) but is just a suggestion.

async Task<TopicDescription> EnsureTopicExists(string normalizedTopic)
{
    try
    {
        return await _managementClient.GetTopicAsync(normalizedTopic, _cancellationToken).ConfigureAwait(false);
    }
    catch (MessagingEntityNotFoundException)
    {
        // it's OK... try and create it instead
    }

    try
    {
        return await _managementClient.CreateTopicAsync(normalizedTopic, _cancellationToken).ConfigureAwait(false);
    }
    catch (MessagingEntityAlreadyExistsException)
    {
        return await _managementClient.GetTopicAsync(normalizedTopic, _cancellationToken).ConfigureAwait(false);
    }
    catch (ServiceBusException sbe) when (sbe.Message.Contains("SubCode=40901."))
    {
        // Case when the topic might be in creation process, so we wait for some time until is created
        
        TimeSpan WaitForCreationTime = TimeSpan.FromMilliseconds(1000);
        short WaitForCreationMaxIterations = 5;
        TopicDescription topic = null;
        var iterations = 0;
        var found = false;
        while (iterations++ < WaitForCreationMaxIterations && !found)
        {
            try
            {
                topic = await _managementClient.GetTopicAsync(normalizedTopic, _cancellationToken).ConfigureAwait(false);
                found = true;
            }
            catch (ServiceBusException) when (sbe.Message.Contains("SubCode=40901."))
            {
                await Task.Delay(WaitForCreationTime, _cancellationToken);
            }
        }

        return topic;
    }
    catch (Exception exception)
    {
        throw new ArgumentException($"Could not create topic '{normalizedTopic}'", exception);
    }
}

In order to reproduce this case, send 100 messages to a topic that hasn't been created yet and do and await of all later

var tasks = new Task[100];
for (var i = 0; i < 100; i++)
    {
        tasks[i] = bus.PublishAsync(new Event()
        {
            Id = i.ToString(),
        });
    }
await Task.WhenAll(tasks);

OperationTimeout is not parsed from connectionstring by the new Microsoft.Azure.ServiceBus

Empty list of published message causes NullRefException in MS.Azure.SB

I have been upgrading to latest versions of the following packages:

  • Rebus (v5.2.1)
  • Rebus.AzureServiceBus (6.0.6)
  • Microsoft.Azure.ServiceBus (3.3.0)

and in a test I run into into a NullRefenceException coming from Microsoft.Azure.ServiceBus.Core.MessageSender. Similar issues seems to be fixed in this PR Azure/azure-service-bus-dotnet#642 for the Microsoft.Azure.Services package, which is not yet released. But I wonder why it occurs in the first place when Rebus calls the TopicClient.SendAsync(list).

I have tracked the calls down to this line

await GetTopicClient(topicName).SendAsync(list).ConfigureAwait(false);
where list seems to be null.

The exception message from Rebus is: "Could not publish to topic 'mymessageassembly_mymessage'"

The inner stacktrace from Microsoft.Azure.ServiceBus is:

at Microsoft.Azure.ServiceBus.Core.MessageSender.<OnSendAsync>d__52.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Microsoft.Azure.ServiceBus.RetryPolicy.<RunOperation>d__19.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at Microsoft.Azure.ServiceBus.RetryPolicy.<RunOperation>d__19.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Microsoft.Azure.ServiceBus.Core.MessageSender.<SendAsync>d__39.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c__DisplayClass29_0.<<GetOutgoingMessages>b__3>d.MoveNext()

My test is calling a service that uses a one way client that publishes a message and I expect to receive the message, which the test subscribes to.

Any thoughts on the matter would be helpfull.

Transport type from connection string is not used with token provider

When using .UseAzureServiceBus(..) or .UseAzureServiceBusAsOneWayClient(..) with an ITokenProvider, the connection string is parsed internally, and parts of it used for sending and receiving in regards to Azure Service Bus.

When the transport is set up without specifying an ITokenProvider, the original connection string makes it all the way to the Service Bus SDK client and it works as expected, but when it is parsed and parts of it used to create the client manually, the transport type option is stripped away, meaning the default transport type of AMQP is always used. So, this problem only appears when used in conjunction with ITokenProvider.

Example of a connection string;
Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=Root..;SharedAccessKey=XYoXQ...;TransportType=AmqpWebSockets

Support managed identities

It would be super-neat if Azure Service Bus code could have connection strings removed from it, so we should look into supporting managed identities.

I imagine simply omitting the connection string should result in picking up the managed identity and using that โ€“ it would make the code simply and pretty like this:

Configure.With(...)
	.Transport(t => t.UseAzureServiceBus("queue-name"))
	.Start();

and this:

Configure.With(...)
	.Transport(t => t.UseAzureServiceBusAsOneWayClient())
	.Start();

EnsureTopicExists and GetOrCreateSubscription implementation could be improved

Hi , I had a look at the implementation of EnsureTopicExists : I suggest to use TopicExistsAsync.
This would avoid to generate many exceptions (which clutter my application insights logs :) btw).

Same goes for the implementation of method GetOrCreateSubscription : you could use SubscriptionExistsAsync

InnerCreateQueue is fine : it uses QueueExistsAsync

Thank you
enrico

https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.servicebus.management.managementclient.topicexistsasync?view=azure-dotnet#Microsoft_Azure_ServiceBus_Management_ManagementClient_TopicExistsAsync_System_String_System_Threading_CancellationToken_

More robust message lock renewal

When a message is received and automatic peek lock renewal is enabled, an async task is scheduled in the background to take care of the peek lock renewal.

I think it would be more robust to have a ConcurrentDictionary<string, PeekLockRenewer> which is checked every few seconds, and then PeekLockRenewer could take care of the logic pertaining to peek lock renewal.

This will also be a good time to add some kind of sensible retry mechanism around the renewal.

Topics are only created upon subscriptions

It appears that the behaviour of auto-creating topics have changed between v4 and v6 of the Rebus.ASB package.

Previously the topic was created by the publisher (when sending the message), but now it is only created once a subscriber subscribes to a topic. I cannot find this behavior change documented in any of the 5 or 6 releases, and caught us by surprise.

Are this intentionally? I would have expected that publishers owns published event messages as well as topics, and subscribers owns input queues and topic subscriptions.

Suddenly alot of lock exceptions is heading my way

Suddenly i'm experiencing a lot of lock exceptions like this.
Do you know why this is?

Rebus.Workers.ThreadPoolBased.ThreadPoolWorker ERROR (Thread #16): An error occurred when attempting to complete the transaction context
Rebus.Exceptions.RebusApplicationException: The message lock for message with ID 1e9682b3-e6ab-4c69-b702-88da2d957de4 was lost - tried to complete after 4,0 s ---> Microsoft.ServiceBus.Messaging.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. TrackingId:43249aa7-e8c5-4c3d-ac91-0007c7d98ca6_G24_B2, SystemTracker:umbksjdev:Queue:letsencrypt-input, Timestamp:4/30/2018 2:13:30 PM ---> System.ServiceModel.FaultException`1[System.ServiceModel.ExceptionDetail]: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue. TrackingId:43249aa7-e8c5-4c3d-ac91-0007c7d98ca6_G24_B2, SystemTracker:umbksjdev:Queue:letsencrypt-input, Timestamp:4/30/2018 2:13:30 PM
   at Microsoft.ServiceBus.Messaging.Sbmp.DuplexRequestBindingElement.DuplexRequestSessionChannel.ThrowIfFaultMessage(Message wcfMessage)
   at Microsoft.ServiceBus.Messaging.Sbmp.DuplexRequestBindingElement.DuplexRequestSessionChannel.HandleMessageReceived(IAsyncResult result)

Kenneth

Topic and subscription naming standard

Hi, is it possible to plug a custom naming standard in topic and subscription creation ? I'm a little bit uncomfortable to know that moving a message class around namespace and assembly will "silently" (as far as I've understood) create new topic and subscription and leave the previous ones hanging around in asb.
Thank you , enrico

Too Many Incoming Requests

I am using standard azure bus as transport layer. I found each of my services generates almost 4.5 millions requests every month even if it is not used!!!!. So, is it normal behaviour?! Also, Is any way to reduce requests count as Microsoft charges per requests/operations.

Thanks

Using Shared access signature is not working

I get this error when trying to use a Azure servicebus Shared access key connection string

ArgumentException: ConnectionString used to create NamespaceManager shouldn't include EntityPath as NamespaceManager is at Namespace level.

v7 Naming Breaking change format

This is a suggestion and is related to issue #25. Since breaking changes are being introduced to the names of topics and queues, the topic names can be changed such that they use / to create virtual directories. In #25 I added some code that alters the standard topic names from {fully qualified type name}__{assembly name} to {assembly name}/{fully qualified type name}. The difference when looking at this within software for managing queues and topics are shown in the screenshot below.

image

If there are other reasons such a change wouldn't work then this issue can be disregarded, and I can accomplish it with a IStorageSubscription decorator. However, this format is very convenient when dealing with a large number of topics on a namespace from multiple assemblies, and it would be nice if it wasn't necessary to introduce a new decorator throughout to accomplish it.

Receive message : Error when a user property has value = null

Using Rebus service bus 7.1.0

I think this line of code has a problem
var headers = userProperties.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToString());

if kvp.Valueis null it breaks with :

Exception thrown: 'System.NullReferenceException' in System.Private.CoreLib.dll
dmbo.v3.partner.api Warning: 0 : 2020-03-17 09:39:06.085 +01:00 [Warning] An error occurred when attempting to receive the next message: "System.NullReferenceException: Object reference not set to an instance of an object.
at Rebus.AzureServiceBus.AzureServiceBusTransport.<>c.b__36_4(KeyValuePair2 kvp) at System.Linq.Enumerable.ToDictionary[TSource,TKey,TElement](IEnumerable1 source, Func2 keySelector, Func2 elementSelector, IEqualityComparer`1 comparer)
at Rebus.AzureServiceBus.AzureServiceBusTransport.Receive(ITransactionContext context, CancellationToken cancellationToken)
at Rebus.Workers.ThreadPoolBased.ThreadPoolWorker.ReceiveTransportMessage(CancellationToken token, ITransactionContext context) in C:\projects-rebus\Rebus\Rebus\Workers\ThreadPoolBased\ThreadPoolWorker.cs:line 148"

I would even check if userProperties is not null

Add Transport configuration options to support MS guidance for high message throughput

This Azure Service Bus article provides a nice overview of how to achieve a high message throughput. Towards the bottom of the article it has a nice summary of the steps to be taken to meet different use cases:

  1. Maximize the throughput of a single queue. The number of senders and receivers is small.

  2. Maximize overall throughput of multiple queues. The throughput of an individual queue is moderate or high.

  3. Minimize end-to-end latency of a queue or topic. The number of senders and receivers is small. The throughput of the queue is small or moderate.

etc..

I can see some changes required in the Rebus transport in order to support the recommendations described in the article. Additionally, I see the need for the ability for consumers to be able to vary these settings per Queue as different queues can be used for different use cases inside the same application (or would you have the opinion that an application should setup multiple IBus instances to achieve this instead?).

  • The ability to specify a configurable number of message factories to create senders for specific Queues.

  • The ability to specify a configurable number of message factories to create receivers for specific Queues.

  • The ability to configure batch flushing in the client, and specify the 'BatchFlushInterval' for specific Queues (that's via MessageFactory->MessageFactorySettings-> NetMessagingTransportSettings).

  • The ability to set the prefetch count per Queue.

Error counter

Hi!

We have an issue when an error occurs on consumer side: we saw that the process tracks the error count only InMemory (InMemErrorTracker on Rebus) so when a process runs on a single machine or a cluster and the process crush, the errors count goes lost.
In this way, using the AzureServiceBus, we are trying to retry to process that message up to MAX (that we saw is 100) instead of stopping after few attempts.

Do you accept a PR in which we put the MaxDeliveryCount property settable from outside so we can change it? in this way we can go in dead letter and stop retrying to process that message.

Update referenced version of Microsoft.Azure.AzureServiceBus

The referenced version of Microsoft.Azure.AzureServiceBus(3.1.0) has a reference constraint on Microsoft's ADAL library of less than 4.0.0. Can the version that is referenced be updated to the latest(3.2.1) which supports 4.x versions of the ADAL library? We use the ADAL library in other places in our code which is currently at 4.4.2. The constraint via the 3.1.0 library which Rebus.AzureServiceBus uses causes warnings through our code.

I can fork/create a PR for this to move it along if that works.

Native dead letter queue support

Hi. I'm trying to implement a second level retry strategy based on scheduled retries but when the max retry count is reached I would like to send the message to the native ASB DLQ (can't do it directly as suggested here due to ASB implementation) I couldn't find any documentation regarding ASB transport support for native DLQ, however I found this where @mookid8000 mention that it would be a good idea to disable Rebus' built-in deadlettering and pass that responsibility to ASB. Is there currently an option to do that? if not should I create another queue for errors? Currently the only way that I found to send the message to the native DLQ is by throwing exceptions until delivery count is reached but I would like to avoid this approach since it creates noise in our logs. Thanks!

Why are pre-fetching and retry features in client not used?

The latest version of the Azure Service Bus client provides the following features, which are also implemented in the transport - was there a reason these have been implemented in the Transport and the client implementations not used?

  • Pre-fetching
  • Retries

.Net Core assembly does not support System.Configuration.ConfigurationManager

I'm Using rebus in a core console app and when ever I try to use the following code i get an exception
Configure.With(new BuiltinHandlerActivator()) .Transport(configurer => configurer.UseAzureServiceBusAsOneWayClient( "Endpoint=sb://dev.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[key]")) .Routing(r => r.TypeBased() .MapAssemblyOf<CreatePrivateProjectCommand>(Constants.Queues.WorkInput)) .Start();

Exeception:
System.IO.FileNotFoundException: 'Could not load file or assembly 'System.Configuration.ConfigurationManager, Version=0.0.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51'. The system cannot find the file specified.'
This is simply because the Configuration Manager does not exist in Core
Also I recognize the simple convenience, but I don't think it should be the responsibility of Rebus to go find configurations.

Compatibility with netstandard/net core

This is a collection of blockers and potential solutions for moving from WindowsAzure.ServiceBus (net45) to Microsoft.Azure.ServiceBus (1.0.0: net451, netstandard1.3. 2.0.0: net461, netstandard2.0)

With Microsoft.Azure.ServiceBus Microsoft has moved the management api to a new project: Microsoft.Azure.Management.Servicebus.
This new project is an attempt by Microsoft to align the servicebus sdk to their ARM-everywhere philosophy.

This introduces a couple of issues. The first and foremost is that all management commands are executed via ARM and thus requires Azure Active Directory credentials instead of the much simpler Service Bus connectionstring. Maintainers have received the criticism and have a plan to re-introduce management APIs (follow the issue here: Azure/azure-service-bus-dotnet#65)
tl;dr They will gradually re-implement it with a goal of having it all implemented by summer 2018.

Will update with more findings

Message entity could not be found

Previously, I had Azure Service Bus working, but after changing some things it stopped working. The problem is that I now get a MessagingEntityNotFoundException in this line when sending messages.

Put token failed. status-code: 404, status-description:
The messaging entity 'sb://studicabus-dev.servicebus.windows.net/sample_events_institutions_departmentaddedtoinstitutionevent__sample_events' could not be found.
TrackingId:6dad0d8e-b35d-44f6-81a4-3cebb29573e1_G33, SystemTracker:studicabus-dev.servicebus.windows.net:sample_events_institutions_departmentaddedtoinstitutionevent__sample_events, Timestamp:9/26/2018 11:00:10 AM.

This link indicates that the exception means that the resource (in this case the topic) does not exist. This seems strange considering my code previously worked. The change that has broken the functionality is that I have merged several Rebus queues into the same project, so my guess is that this causes me to use Rebus in some invalid way where topics are not created before using them.

Is there any particular information about my setup that would be helpful in solving this? The code is .NET Core if it is relevant.

Normalizing dashes to underscores.

We are using dashes in topic names in Azure Service Bus but the topic name normalization routine is converting these to underscores. I can't seem to override this behavior in a custom implementation of ITopicNameConvention because AsbStringExtensions.ToValidAzureServiceBusTopicName() is getting involved.

Would it be ok to submit a PR to not convert dashes to underscores?

Topic not found exception is swallowed

The code for sending to a topic is currently configured to swallow MessagingEntityNotFoundException with the comment "if the topic does not exist, it's allright". Why is this deemed to be ok?

My domain events are all important, and losing some because the publisher started before the subscriber has registered itself with ASB is not a desired scenario. I would like the publisher to receive an error if it tries to publish an event that does not have an associated topic (and therefore will cannot be delivered).

 if (destinationQueue.StartsWith(MagicSubscriptionPrefix))
                        {
                            var topicName = _nameFormatter.FormatTopicName(destinationQueue.Substring(MagicSubscriptionPrefix.Length));

                            foreach (var batch in messages.Batch(DefaultOutgoingBatchSize))
                            {
                                var list = batch.Select(GetMessage).ToList();

                                try
                                {
                                    await GetTopicClient(topicName).SendAsync(list).ConfigureAwait(false);
                                }
                                catch (MessagingEntityNotFoundException)
                                {
                                    // if the topic does not exist, it's allright
                                }
                                catch (Exception exception)
                                {
                                    throw new RebusApplicationException(exception, $"Could not publish to topic '{topicName}'");
                                }
                            }
                        }

Conflicting subscription names when using '/' in queue names

When creating queues, its possible to use / for structure. For example, I could have 2 queues : notifications/command and payments/command

Providing there are 2 services - a Notifications Service and a Payments Service, if both wish to subscribe to a particular topic, then the code will create a subscription called command for both cases. As a result, only one of the 2 services will receive the message.

This looks to be the culprit

e37ff5f

Is there any reasoning for splitting the transport queue name on the '/' rather than just replacing it?

/ in topic name

Queue names are allowed to have / but topics do not. Topics can be organized using slashes similar to queues. In most clients that visualize queues and topics the parts of the path end up acting like virtual directories.

public string ReplaceInvalidCharacters(string str, bool isQueueName = false)

I tested this with an update to AzureServiceBusNameHelper.cs to allow / and a decorator on the ISubscriptionStorage like below and it worked as expected. Would it be possible to update this so the / is allowed for topic names. The decorator is below with a screenshot from Service Bus Explorer. The decorator alters the topic names so that the name comes out {assembly name}/{fully qualified type name} which through most software that visualizes topics and queues makes all the types from the same assembly appear nested in the same virtual directory.

    public class NamespaceNestedSubscriptionStorage : ISubscriptionStorage
    {
        private readonly ISubscriptionStorage _toDecorate;

        public NamespaceNestedSubscriptionStorage(ISubscriptionStorage toDecorate)
        {
            _toDecorate = toDecorate;
        }
        public Task<string[]> GetSubscriberAddresses(string topic)
        {
            return _toDecorate.GetSubscriberAddresses(AssemblyNested(topic));
        }

        public Task RegisterSubscriber(string topic, string subscriberAddress)
        {
            return _toDecorate.RegisterSubscriber(AssemblyNested(topic), subscriberAddress);
        }

        public Task UnregisterSubscriber(string topic, string subscriberAddress)
        {
            return _toDecorate.UnregisterSubscriber(AssemblyNested(topic), subscriberAddress);
        }

        private string AssemblyNested(string originalTopic)
        {
            var split = originalTopic.Split("__");
            return $"{split[1]}/{split[0]}";
        }

        public bool IsCentralized => _toDecorate.IsCentralized;
    }

image

Configure max delivery count of queues

It seems the setting "max delivery count" of queues created by Rebus is 10.... it should be 100 to avoid ASB's dead-lettering mechanism kicking in and snatching messages, before Rebus gets to dead-letter them.

Add ability to change the way topic names are created

It would be nice to have the ability to configure the way topics are named. The current implementation is TypeName_AssemblyName which gets weird when using tools like LINQPad which uses a new assembly every time the process is executed. This results in lots of single use topics being created in azure.

LegacyV3NameFormatter.FormatQueueName is not compatibe with Rebus up to 6.0.3

LegacyV3NameFormatter.FormatQueueName is not compatible with naming convention of Rebus up to 6.0.3. Same goes to LegacyNameFormatter.

Also it would be nice to specify a version using .UseLegacyNaming(), for instance .UseLegacyNaming(RebusVersion.V6) or .UseLegacyNaming(RebusVersion.V3)

Before 7.x

Queue name: Foo-BAR-1 -> foo-bar-1
https://github.com/rebus-org/Rebus.AzureServiceBus/blob/6.0.7/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs#L76

After 7.x

Queue name: Foo-BAR-1 -> foo_bar_1
https://github.com/rebus-org/Rebus.AzureServiceBus/blob/7.1.3/Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs#L101
https://github.com/rebus-org/Rebus.AzureServiceBus/blob/7.1.3/Rebus.AzureServiceBus/AzureServiceBus/NameFormat/LegacyV3NameFormatter.cs#L14-L22

Workaround

public class LegacyV3NameFormatterFixed : INameFormatter
{
    private static readonly LegacyV3NameFormatter Original = new LegacyV3NameFormatter();

    public string FormatQueueName(string queueName)
    {
        return queueName.ToLowerInvariant();
    }

    public string FormatSubscriptionName(string subscriptionName)
    {
        return Original.FormatSubscriptionName(subscriptionName);
    }

    public string FormatTopicName(string topicName)
    {
        return Original.FormatTopicName(topicName);
    }
}
// boot logic
_activator = new BuiltinHandlerActivator();
Configure.With(_activator)
.Transport(t => t.UseAzureServiceBusAsOneWayClient("...").UseLegacyNaming())
.Options(o => o.Decorate<INameFormatter>(_ => new LegacyV3NameFormatterFixed()))
.Start();

Stop the RenewPeekLock task after `Could not renew lock` message

We use 7.0.0-a14 since early May in production and it's been running very well for us. One issue though is that we notice an Error Could not renew lock for message with ID keeps repeating every 5 minutes or so for the same message ID. Those messages will go on for days, until the Rebus host process gets restarted.

We still need AutomaticallyRenewPeekLock on a few endpoints because of some long running legacy processes (15 minutes or so).

Looking at the AzureServiceBusTransport.cs probably none of these guys got executed:

context.OnCommitted(async () => renewalTask.Dispose());
context.OnAborted(() => renewalTask.Dispose());
context.OnDisposed(() => renewalTask.Dispose());

Question is why. I analyzed process dumps with windbg and Visual Studio, but haven't been able to track down the root cause (yet).

Anyway, root cause aside.

Rebus currently swallows the MessageLockLostException, so the RenewPeekLock task keeps running and failing again and again, until forever or one of the context events occur.

According to MessageLockLostException there is not much use in trying to RenewLockAsync:

//     The exception that is thrown when the lock on the message is lost. Callers should
//     call Receive and process the message again.
public sealed class MessageLockLostException : ServiceBusException

So I suggest to stop the RenewPeekLock task after a MessageLockLostException. Question is how? What do you think?

Some types of event not handled in Azure Service Bus after upgrade of Rebus.AzureServiceBus

Hi,

We have a new problem after upgrading from Rebus.AzureServiceBus 6.0.4 to 7.0.0-a02 because of this issue: #19

Downgrading to 6.0.4 makes this particular issue go away - but then we are back to square one with #19 :)

We have two types of events - one type is an Aggregated Event that contains other events (we do this because of Azure Service Bus'es problem with Transactions across multiple topics).

They are pretty similar - both inherent from an Abstract class that looks like this:

    public abstract class DomainEvent : IDomainEvent
    {
        public abstract Guid Id { get; }
    }

And this

   public abstract class AggregatedDomainEvent : IDomainEvent
   {
        public DomainEvent[] DomainEvents { get; }
        protected AggregatedDomainEvent(params DomainEvent[] domainEvents)
        {
            DomainEvents = domainEvents;
        }
    }

So the difference is pretty much the constructor

After upgrading - all AggregatedDomainEvents are no longer caught by Event handlers. All DomainEvents work as normal.

I have tracked down the problem to this single commit, where 6.0.4 is changed to 7.0.0-a02 - checked before and after and also tried downgrading the newest code to 6.0.4 - so I am pretty confident that this change is what caused the problem.

Could it have something to do with serialization and use of constructors ?

Error registering event subscriptions when running multiple instances

We get a Microsoft.Azure.ServiceBus.ServiceBusException with the message "Another update request is in progress for the entity X" when trying to register subscriptions for the same events in multiple websites. We are running multiple instances of our web service so they run the same initialization code. Is this a problem that Rebus should deal with or must we coordinate between our instances to figure out which one registers the handlers... or something else entirely?

at Buma.Host.BoundedContext.AppExtensions.InitializeHost[TCommand](IApplicationBuilder app, String serviceName, Boolean isBoundedContext, Measurement measurement)
   at Buma.Host.BoundedContext.BumaBoundedContextStartup`1.Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime applicationLifetime)
--- End of stack trace from previous location where exception was thrown ---
   at Microsoft.AspNetCore.Hosting.ConventionBasedStartup.Configure(IApplicationBuilder app)
   at Microsoft.AspNetCore.HostFilteringStartupFilter.<>c__DisplayClass0_0.<Configure>b__0(IApplicationBuilder app)
   at Microsoft.AspNetCore.Hosting.Internal.AutoRequestServicesStartupFilter.<>c__DisplayClass0_0.<Configure>b__0(IApplicationBuilder builder)
   at Microsoft.AspNetCore.Hosting.Internal.WebHost.BuildApplication()
---> (Inner Exception #0) Microsoft.Azure.ServiceBus.ServiceBusException: Another update request is in progress for the entity studicabus-dev:Topic:admission_events_schools_schoolimportedevent__admission_events|studica_denormalizer_admission_shared. TrackingId:b8d9a853-9dd6-47f8-872b-010d652d44bb_G1_B32, SystemTracker:studicabus-dev:Topic:admission_events_schools_schoolimportedevent__admission_events|studica_denormalizer_admission_shared, Timestamp:11/20/2018 12:14:29 PM
   at Microsoft.Azure.ServiceBus.Management.ManagementClient.SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
   at Microsoft.Azure.ServiceBus.Management.ManagementClient.PutEntity(String path, String requestBody, Boolean isUpdate, String forwardTo, String fwdDeadLetterTo, CancellationToken cancellationToken)
   at Microsoft.Azure.ServiceBus.Management.ManagementClient.UpdateSubscriptionAsync(SubscriptionDescription subscriptionDescription, CancellationToken cancellationToken)
   at Rebus.AzureServiceBus.AzureServiceBusTransport.RegisterSubscriber(String topic, String subscriberAddress)
   at Rebus.Bus.RebusBus.InnerSubscribe(String topic)
   at Buma.Common.Rebus.BusSetup.SubscribeWithRetry(IBus bus, Type eventType)<---

---> (Inner Exception #1) Microsoft.Azure.ServiceBus.ServiceBusException: Another update request is in progress for the entity studicabus-dev:Topic:admission_events_applications_eudapplicationimportedevent__admission_events|studica_denormalizer_admission_shared. TrackingId:4b0b2118-2d21-4c4b-9c58-3db7a5e515f2_G41_B45, SystemTracker:studicabus-dev:Topic:admission_events_applications_eudapplicationimportedevent__admission_events|studica_denormalizer_admission_shared, Timestamp:11/20/2018 12:14:30 PM
   at Microsoft.Azure.ServiceBus.Management.ManagementClient.SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
   at Microsoft.Azure.ServiceBus.Management.ManagementClient.PutEntity(String path, String requestBody, Boolean isUpdate, String forwardTo, String fwdDeadLetterTo, CancellationToken cancellationToken)
   at Microsoft.Azure.ServiceBus.Management.ManagementClient.UpdateSubscriptionAsync(SubscriptionDescription subscriptionDescription, CancellationToken cancellationToken)
   at Rebus.AzureServiceBus.AzureServiceBusTransport.RegisterSubscriber(String topic, String subscriberAddress)
   at Rebus.Bus.RebusBus.InnerSubscribe(String topic)
   at Buma.Common.Rebus.BusSetup.SubscribeWithRetry(IBus bus, Type eventType)<---


Unhandled Exception: System.AggregateException: One or more errors occurred. (Another update request is in progress for the entity studicabus-dev:Topic:admission_events_schools_schoolimportedevent__admission_events|studica_denormalizer_admission_shared. TrackingId:b8d9a853-9dd6-47f8-872b-010d652d44bb_G1_B32, SystemTracker:studicabus-dev:Topic:admission_events_schools_schoolimportedevent__admission_events|studica_denormalizer_admission_shared, Timestamp:11/20/2018 12:14:29 PM) (Another update request is in progress for the entity studicabus-dev:Topic:admission_events_applications_eudapplicationimportedevent__admission_events|studica_denormalizer_admission_shared. TrackingId:4b0b2118-2d21-4c4b-9c58-3db7a5e515f2_G41_B45, SystemTracker:studicabus-dev:Topic:admission_events_applications_eudapplicationimportedevent__admission_events|studica_denormalizer_admission_shared, Timestamp:11/20/2018 12:14:30 PM) ---> Microsoft.Azure.ServiceBus.ServiceBusException: Another update request is in progress for the entity studicabus-dev:Topic:admission_events_schools_schoolimportedevent__admission_events|studica_denormalizer_admission_shared. TrackingId:b8d9a853-9dd6-47f8-872b-010d652d44bb_G1_B32, SystemTracker:studicabus-dev:Topic:admission_events_schools_schoolimportedevent__admission_events|studica_denormalizer_admission_shared, Timestamp:11/20/2018 12:14:29 PM
   at Microsoft.Azure.ServiceBus.Management.ManagementClient.SendHttpRequest(HttpRequestMessage request, CancellationToken cancellationToken)
   at Microsoft.Azure.ServiceBus.Management.ManagementClient.PutEntity(String path, String requestBody, Boolean isUpdate, String forwardTo, String fwdDeadLetterTo, CancellationToken cancellationToken)
   at Microsoft.Azure.ServiceBus.Management.ManagementClient.UpdateSubscriptionAsync(SubscriptionDescription subscriptionDescription, CancellationToken cancellationToken)
   at Rebus.AzureServiceBus.AzureServiceBusTransport.RegisterSubscriber(String topic, String subscriberAddress)
   at Rebus.Bus.RebusBus.InnerSubscribe(String topic)
   at Buma.Common.Rebus.BusSetup.SubscribeWithRetry(IBus bus, Type eventType)
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.WaitAllCore(Task[] tasks, Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at Buma.Common.Rebus.BusSetup.SubscribeToHandledEvents(IBus bus, List`1 eventTypes)
   at Buma.Common.Rebus.BusSetup.InitBasicBus(ServiceType serviceType, IContainer container, IContainerAdapter activator, Measurement measurement)
   at Buma.Common.Rebus.BusSetup.InitDenormalizationBus(Measurement measurement)
   at Buma.Host.BoundedContext.AppExtensions.InitializeHost[TCommand](IApplicationBuilder app, String serviceName, Boolean isBoundedContext, Measurement measurement)
   at Buma.Host.BoundedContext.BumaBoundedContextStartup`1.Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime applicationLifetime)
--- End of stack trace from previous location where exception was thrown ---
   at Microsoft.AspNetCore.Hosting.ConventionBasedStartup.Configure(IApplicationBuilder app)
   at Microsoft.AspNetCore.HostFilteringStartupFilter.<>c__DisplayClass0_0.<Configure>b__0(IApplicationBuilder app)
   at Microsoft.AspNetCore.Hosting.Internal.AutoRequestServicesStartupFilter.<>c__DisplayClass0_0.<Configure>b__0(IApplicationBuilder builder)
   at Microsoft.AspNetCore.Hosting.Internal.WebHost.BuildApplication()
   at Microsoft.AspNetCore.Hosting.Internal.WebHost.StartAsync(CancellationToken cancellationToken)
   at Microsoft.AspNetCore.Hosting.WebHostExtensions.RunAsync(IWebHost host, CancellationToken token, String shutdownMessage)
   at Microsoft.AspNetCore.Hosting.WebHostExtensions.RunAsync(IWebHost host, CancellationToken token)
   at Microsoft.AspNetCore.Hosting.WebHostExtensions.Run(IWebHost host)
   at Admission.Host.Program.Main(String[] args) in /app/src/Admission/Admission.Host/Program.cs:line 11
Aborted (core dumped)

Two-way communication with queue SAS's, not namespace

I'm working on a project that needs 2-way communication...1 server, n clients. Clients should be isolated...unable to see one another's messages or even know of one another's existence. The server sends specific messages to specific clients.

During testing, I started with:

  • 1 server queue
  • 1 client queue
  • 1 namespace-level SAS

Something like:

Configure
    .With(new BuiltinHandlerActivator())
    .Transport(t => t
        .UseAzureServiceBus("Endpoint=sb://<MyServiceBus>.servicebus.windows.net/;SharedAccessKeyName=<MyServerKeyName>;SharedAccessKey=<MyServerKey>", "<MyServerQueue>")
        .AutomaticallyRenewPeekLock()
        .DoNotCreateQueues()
    )
    .Routing(r => r.TypeBased().MapAssemblyOf<TestMessage>("<MyClientQueue>"))
    .Options(o => o.EnableSynchronousRequestReply())
    .Start();

This worked well; the namespace SAS had rights to both queues.

However, in reality there will be many clients. (I think) I need 1 server queue and 1 client queue for each client to keep them isolated...(right?).

At first I tried (on the server):

Configure
    .With(new BuiltinHandlerActivator())
    .Transport(t => t
        .UseAzureServiceBus("Endpoint=sb://<MyServiceBus>.servicebus.windows.net/;SharedAccessKeyName=<MyServerKeyName>;SharedAccessKey=<MyServerKey>", "<MyServerQueue>")
        .AutomaticallyRenewPeekLock()
        .DoNotCreateQueues()
    )
    .Routing(r => r.TypeBased().MapAssemblyOf<TestMessage>("Endpoint=sb://<MyServiceBus>.servicebus.windows.net/;SharedAccessKeyName=<MyClientKeyName>;SharedAccessKey=<MyClientKey>;EntityPath=<MyClientQueue>"))
    .Options(o => o.EnableSynchronousRequestReply())
    .Start();

But that didn't work. It seems MapAssemblyOf expects a queue name.

I thought about using two bus instances...1 for the server queue, 1 for the client queue. But I'm not sure how that would work with synchronous request/reply.

Is this scenario supported?

Possible problem with subscribers in the same IoC container

When using Rebus.AzureServiceBus we experience that if several event handlers "in" a Rebus Bus'es IoC Container, listens to the same event. They influence each other in that, if one fails it causes the rest to retry.

Not desired behavior in our case, but perhaps as intended ?

I would prefer that event handlers where autonomous in the same way as when they start in different processes. I.e. all handlers listening should have a chance to process the event, even if other handlers fail.

I do not know if this has to do with Azure Service Bus or Rebus (or our setup). I did not experience it when using MSMQ as Transport, but perhaps I just never had several handlers in a Bus'es IoC Container listening to the same event.

So is this a bug (perhaps in our code) or as intendeded ?

Create an extensibility point to name topics

Creating an extensibility point to name topics is useful if you want to use another name convention, changing the default topic name based on type to bounded context.

For example, I would like to change the topic name from this:
image

to this:
image

I think we can create an extension method in AzureServiceBusTransportSettings to define the convention to name topics, like this:

internal Func<string, string> TopicNamingConvention { get; set; }
    = topic => topic.ToValidAzureServiceBusEntityName();

public AzureServiceBusTransportSettings EnableTopicNamingConvention(Func<string, string> convention)
{
    TopicNamingConvention = convention;

    return this;
}

Then, when we are configuring the endpoint:

services.AddRebus(configure => configure
                    .Logging(l => l.ColoredConsole())
                    .Transport(t => 
                        t.UseAzureServiceBus(connectionString, endpoint)
                            .EnablePrefetching(100)
                            .EnableTopicNamingConvention(topic => topic.MyConvention()))

Another way is to create an interface like INamingConvention and implementing that.

Using path in connection string no longer works

We have some services using v3 of rebus. Now we have a new .net core based service using v6.

When sharing a single service us between multiple developers in test environment, we have included the machine name in the connection string for namespacing. e.g. Endpoint=sb://sharedbus.servicebus.windows.net/mymachine;SharedAccessKeyName=....

In the .net core world, this doesn't work. The path-part of the endpoint is ignores.

I'm not sure if it is because of the new Rebus.AzureServiceBus-implementation, or if it is the new azure-service-bus-dotnet, and I have to admit that I don't know rebus nor service bus good enough to know where to start digging...

My rebus setup is simple:

            var bus = Configure
                .With(_activator)
                .Logging(x => x.Serilog())
                .Transport(t => t.UseAzureServiceBus("Endpoint=sb://...servicebus.windows.net/mymachine;...., "MyInput"))
                .Options(o =>
                {
                    o.SimpleRetryStrategy(_rebusOptions.ErrorQueue);
                })
                .Start();

            await bus.Subscribe<MyEventMessage>();

I would expect a mymachine/myinput-queue, and a mymachine/myeventmessage-topic with one subscription. Instead I get myinput-queue and myeventmessage-topic.

Any idea what changed?
Is there a different way of achieving my desired result?

Add '/' back as a valid topicname character in v6

Hi!

Can we have the '/' back as a valid character in v6 ToValidAzureServiceBusTopicName and in a v6.0.7?
v6.0.4 removed the '/' as valid, but it has been valid for a long time and also is valid in v7-a07.

The reason for asking is that we have many topics with '/' and have several microservices and want to migrate them gradually from v3 to v6 (.net core/ Microsoft.AzureServicebus).

v6.0.3 seems not usable for us, because of the bugfix in v6.0.5.

I can spend the time to make a PR. Please let me know if you'd like that!

Thanks in advance!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.