Code Monkey home page Code Monkey logo

nservicebus.router's People

Contributors

andreasohlund avatar davecluderay avatar heskandari avatar katostoelen avatar szymonpobiega avatar

Stargazers

 avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

nservicebus.router's Issues

Instrumenting metrics?

I am looking to instrument our router instance with some metrics. I was able to get a hook into the prerouting context using a Rule. Is there something similar for failures?

Enable on SendOnly endpoints

Exception "LocalAddress isn't available since this endpoint is configured to run in send-only mode." is thrown.

Is it possible to enable sendonly endpoints?

My configration looks like this:

busConfiguration.SendOnly();
var transport = busConfiguration.UseTransport<RabbitMQTransport>()
    .ConnectionString(ConfigurationManager.ConnectionStrings["NServiceBus/Transport"].ConnectionString)
    .TimeToWaitBeforeTriggeringCircuitBreaker(TimeSpan.FromMinutes(10))
    .UseConventionalRoutingTopology();
var routing = transport.Routing();

var routedRouting = routing.ConnectToRouter("Transport.Bridge");
routedRouting.RouteToEndpoint(typeof(AddActionableCommand), "Actionables.Service");

Thx

How to get logged output from the Router

Howdy, so I have 3 docker containers in my example:

  1. Publisher (SqlTransport) which sends messages / publishes events on a sql isntance
  2. Router Configured to route messages from Sql Transport from the Publisher to Rabbit MQ of my Subscriber
  3. Subscriber (RabbitMQ) which receives messages / events from the Publisher via the Router

Now, having followed every example under the sun and having proved that those examples do work locally, I am still unable to get the Router to route the Events from the Publisher to the Subscriber. Normal messages come through, but no events. I can see the Publisher is publishing the event AND sending the message, but the subscriber only ever gets the message, never the event.

I want to see what's going on with the Router. I want to see its debug output so I can see if the events are even reaching it and/or what's happening with them.

What's the trick to enabling debugging / verbose mode on the router, particularly using the .NET Core Worker Service UseNServiceBusRouter syntax?

Cheers in advance.

NServiceBus.MessageDeserializationException error

Hi,

I'm getting the following error with multiple command types. The strange this is that it doesn't happen all the time and sometimes I can replay the messages and it works and other times it continues to fail. Note: when not using Routing Bridge these issues don't happen.

We are using the following serializeration:
configuration.UseSerialization();
configuration.AddDeserializer();

NServiceBus.MessageDeserializationException: An error occurred while attempting to extract logical messages from incoming physical message a14e31b0-a31b-493d-90fc-ac3200432251 ---> System.Exception: Could not determine type for node: 'SetupCheckForDuplicateRecords'.
at NServiceBus.XmlDeserialization.InferNodeType(XmlNode node, Object parent)
at NServiceBus.XmlDeserialization.Process(XmlNode node, Object parent, Type nodeType)
at NServiceBus.XmlDeserialization.Deserialize(Stream stream, IList`1 messageTypesToDeserialize)
at NServiceBus.DeserializeMessageConnector.Extract(IncomingMessage physicalMessage)
at NServiceBus.DeserializeMessageConnector.ExtractWithExceptionHandling(IncomingMessage message)
--- End of inner exception stack trace ---
at NServiceBus.DeserializeMessageConnector.ExtractWithExceptionHandling(IncomingMessage message)
at NServiceBus.DeserializeMessageConnector.d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.SubscriptionReceiverBehavior.d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.InvokeAuditPipelineBehavior.d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.ProcessingStatisticsBehavior.d__0.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Jitb.NSB.Commons.JitbUnitOfWorkBehavior.d__2.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.TransportReceiveToPhysicalMessageConnector.d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.MainPipelineExecutor.d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.Msmq.ReceiveStrategy.d__7.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.Msmq.TransactionScopeStrategy.d__2.MoveNext()

I saw a similar issue (https://discuss.particular.net/t/json-deserialization-error-when-not-using-shared-class-library-for-messages/1702), but that issue was fixed in 3.7.1. I'm currently running 3.8,1.

Thanks,
Eric

Router vs SqlServerTransport vs Schema

Hi,
i would like to isolate each queue in it's own schema, is it a good idea ? how can i do this on the router ?
Does the router has to live in the same schema as the interface added ? (i have a routing problem doing that, because the router tries to send the message to it's own schema, i guess because it does not know the schema of the destination ?)
What is the best practice to use the router inside an enterprise with many endpoint ?
One single catalog and many schema ? If so do i need many routers as many schema are present ?
Thanks for the help.

"poison" table no longer created

While upgrading NServiceBus in an application, we realised table poison (whatever name it is assigned via PoisonQueueName) is no longer created.

Steps to reproduce

  • Use sample update-and-publish solution.
  • Build (as of now, it will use NServiceBus 7.3.0 via NServiceBus.Router 3.8.0)
  • Start Router
Expected result Actual result
table poison being created no poison table found

When forcing references to previous packages ie

  • NServiceBus v7.2.*
  • NServiceBus.Router v3.7.*

everything works as expected

Enabling Transport Migration for publisher is not handled by RMQ subscriber

We are trying to migrate a RMQ publisher to ASB, while keeping a subscriber at RMQ. According to the Readme it is possible to do that by enabling the migration mode for the publisher first. The problem is that the RMQ subscriber is not handling the events since we enabled transport migration for the publisher.

How to reproduce?

  1. The RMQ subscriber continues to run.
  2. I disabled RabbitMQTransport in the published and enabled transport migration with:
var routing = busConfiguration.EnableTransportMigration<RabbitMQTransport, AzureServiceBusTransport>(
    rmq =>
    {
        rmq.ConnectionString(_configuration["ConnectionStringRMQ"]);
        rmq.UseConventionalRoutingTopology();
    },
    asb =>
    {
        asb.ConnectionString(_configuration["ConnectionStringASB"]);
    });
  1. Publish an event
  2. Even after restarting the subscriber, the event is not handled.

Result: the RMQ subscriber is not handling the published event.

Is this something that is expected? Or is our configuration incorrect?

RabbitMQ to Azure Service Bus events don't get routed

I've created a simple sample solution with three console applications: a RabbitMQ endpoint, a RabbitMQ to Azure Service Bus (ASB) router, and an ASB endpoint. The router successfully routes an NServiceBus ICommand from the RabbitMQ endpoint to the ASB endpoint. Works great with no issues! However, if the RabbitMQ endpoint attempts to publish an IEvent that is subscribed to by ASB endpoint, the event never makes it's way to ASB. I assume this is meant to work? Both RabbitMQ and ASB have native pub/sub ability.

I can see that most of the necessary infrastructure for publishing the IEvent is set up properly:

  1. In RabbitMQ an exchange is created for the IEvent to be published, like expected.
  2. I also see in RabbitMQ exchanges and queues created for both the RabbitMQ endpoint and the router endpoint.
  3. I see in the Azure portal queues created for both the ASB endpoint and the router, as expected.
  4. I see in the Azure portal my topic created with a proper subscription for my IEvent.

However, the published event never makes it's way to my ASB endpoint. Upon further investigation, it appears the problem may be with the RabbitMQ side of the communication. I found that if I manually create a binding between the exchange for my IEvent and the router exchange (which was automatically created) then my published IEvent makes it's way to the ASB endpoint as expected. This seems to indicate that the issue may lie in a binding not being properly created in RabbitMQ for forwarding IEvents to the router exchange.

This behavior can be easily confirmed. I have simple code that can be made available that demonstrates this. So my questions are these:

  1. Are IEvents published by a RabbitMQ endpoint meant to work with NServiceBus.Router?
  2. If so, what can I do so that the IEvents I publish from my RabbitMQ endpoint will be bound to the router endpoint and be forwarded? Is there some NServiceBus.Router option/configuration/trick I can use so that my published events will be forwarded to the router and make their way to ASB? I assume I must be overlooking something obvious but I can't see what it is.

Here is the core code of my sample:

RabbitMQ endpoint configuration:

            var endpointConfiguration = new EndpointConfiguration(Settings.SampleClientEndpointName);

            endpointConfiguration.License(NServiceBusLicense.Text);
            endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
            endpointConfiguration.UsePersistence<InMemoryPersistence>();
            endpointConfiguration.EnableInstallers();
            var recoverability = endpointConfiguration.Recoverability();
            recoverability.Immediate(immediate => immediate.NumberOfRetries(0));
            recoverability.Delayed(delayed => delayed.NumberOfRetries(0));
            endpointConfiguration.SetDiagnosticsPath($@"C:\Temp\NSB\{Settings.SampleClientEndpointName}");
            endpointConfiguration.EnableInstallers();

            var routingSettings = endpointConfiguration.UseTransport<RabbitMQTransport>()
                .ConnectionString(Settings.RabbitMqConnectionString)
                .UseConventionalRoutingTopology()
                .Transactions(TransportTransactionMode.ReceiveOnly)
                .Routing();

            var bridge = routingSettings.ConnectToRouter(Settings.SampleRouterEndpointName);
            bridge.RouteToEndpoint(typeof(TestCommand), Settings.SampleServerEndpointName);

            var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);

RabbitMQ to ASB router configuration:

            var routerConfig = new RouterConfiguration(Settings.SampleRouterEndpointName);

            routerConfig.AddInterface<RabbitMQTransport>(RabbitMqInterfaceName, t =>
            {
                t.ConnectionString(Settings.RabbitMqConnectionString);
                t.UseConventionalRoutingTopology();
            });

            routerConfig.AddInterface<AzureServiceBusTransport>(AzureServiceBusInterfaceName, t =>
            {
                t.ConnectionString(Settings.AzureServiceBusConnectionString);
                t.TopicName(Settings.TopicName);
            });

            var staticRouting = routerConfig.UseStaticRoutingProtocol();
            staticRouting.AddForwardRoute(RabbitMqInterfaceName, AzureServiceBusInterfaceName);

            routerConfig.AutoCreateQueues();

            var router = NServiceBus.Router.Router.Create(routerConfig);

            await router.Start().ConfigureAwait(false);

Azure Service Bus endpoint configuration:

            var endpointConfiguration = new EndpointConfiguration(Settings.SampleServerEndpointName);

            endpointConfiguration.License(NServiceBusLicense.Text);
            endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
            endpointConfiguration.UsePersistence<InMemoryPersistence>();
            endpointConfiguration.EnableInstallers();
            var recoverability = endpointConfiguration.Recoverability();
            recoverability.Immediate(immediate => immediate.NumberOfRetries(0));
            recoverability.Delayed(delayed => delayed.NumberOfRetries(0));
            endpointConfiguration.SetDiagnosticsPath($@"C:\Temp\NSB\{Settings.SampleServerEndpointName}");

            endpointConfiguration.UseTransport<AzureServiceBusTransport>()
                .ConnectionString(Settings.AzureServiceBusConnectionString)
                .Transactions(TransportTransactionMode.ReceiveOnly)
                .EnablePartitioning()
                .PrefetchMultiplier(10)
                .TopicName(Settings.TopicName);

            var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);

And again, let me point out, that if I manually create a binding in RabbitMQ between the IEvent exchange and the router exchange then my events DO work. I just shouldn't need to manually create this binding for events to work. I need to understand how the router, or my use of it, will automatically create the necessary exchange bindings for IEvents in RabbitMQ.

Any insight that you can provide will be greatly appreciated.

SQS/SNS - MSMQ - Router not working behind a proxy

I have successfully implemented an endpoint (v7.5.0 using .net 4.8) that runs on a windows server and utilizes the router to bridge between the MSMQ and AWS. It runs correctly from multiple developers machines but we have into some issues moving it into our development environment.
In our dev environment we have added the SQS queues urls to the proxy and the generic SNS url us-east-2 to the proxy as well.
I also have both the router and endpoint itself setting up the client factory to configure to use the SQS client with a SQS config object to set the properties for the proxy.

When we run this all the SQS queues get created properly from behind the proxy, but the SNS topics and subscriptions are not created from behind the proxy. Looking at CloudTrail I don’t see any SNS activity except from our local machines.

I wanted to see if there is anything special with the SNS component that needs to be configured differently for it to work from behind a proxy. I am already doing custom config on the router so it will work based on previous responses to questions I have posted in the forums.

            extensions.ClientFactory(() => {
            return (!useProxy)? new AmazonSQSClient(): new AmazonSQSClient(new AmazonSQSConfig
            {
                ProxyHost = proxyUrl,
                ProxyPort = proxyPort
            });
        });

        var ctor = typeof(MessageMetadataRegistry).GetConstructor(BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance, null, new[] { typeof(Func<Type, bool>) }, null);

#pragma warning disable CS0618 // Type or member is obsolete
settings.Set(ctor.Invoke(new object[] { (Func<Type, bool>)isMessageType }));
#pragma warning restore CS0618 // Type or member is obsolete

Update - 9/13/2021
It appears as though the SQS traffic is https and is successfully going through the proxy.
The SNS traffic appears to be going through the firewall as tcp over port 443.

Azure events to SQS

I am trying to setup a connection between an AzureServiceBus Publisher and an Amazon SQS Subscriber.

The following is my router code:

        static async Task Main()
        {
            Console.Title = "Demo.Router";

            var routerConfig = new RouterConfiguration("Demo.Router");

            var sqsInterface = routerConfig.AddInterface<SqsTransport>("SQS", t =>
            {
                t.ClientFactory(() =>
                    new AmazonSQSClient(
                        new BasicAWSCredentials(accessKey, accessSecret),
                        RegionEndpoint.GetBySystemName(regionName)));
            });

            var mongoDatabase = new MongoClient(connectionString).GetDatabase(MongoUrl.Create(connectionString).DatabaseName);

            sqsInterface.EnableMessageDrivenPublishSubscribe(new SubscriptionPersister(mongoDatabase));

            var azureServiceBusInterface = routerConfig.AddInterface<AzureServiceBusTransport>("ASB",
                t => {
                    t.ConnectionString(connectionString);
                    var topology = t.UseEndpointOrientedTopology();

                    topology.RegisterPublisher(typeof(ArticlePublished),
                        "queue@connectionString");

                    var sanitization = t.Sanitization();
                    sanitization.UseStrategy<Sha1Sanitization>();

                    var settings = t.GetSettings();
                    var builder = new ConventionsBuilder(settings);
                    builder.DefiningEventsAs(e =>
                        e.Namespace != null &&
                        e.Namespace.ToLowerInvariant().EndsWith("events"));

                    settings.Set<NServiceBus.Conventions>(builder.Conventions);

                    var serializer = Tuple.Create(new NewtonsoftSerializer() as SerializationDefinition, settings);
                    settings.Set("MainSerializer", serializer);
                });

            
            var staticRouting = routerConfig.UseStaticRoutingProtocol();

            staticRouting.AddForwardRoute("SQS", "ASB");

            routerConfig.AutoCreateQueues();
            var router = NServiceBus.Router.Router.Create(routerConfig);

            await router.Start().ConfigureAwait(false);

            Console.WriteLine("Press <enter> to exit");
            Console.ReadLine();

            await router.Stop().ConfigureAwait(false);
        }
    }

My SQS Subscriber has the following connection to the Router:

	        var bridge = transport.Routing().ConnectToRouter("Demo.Router");
                bridge.RegisterPublisher(typeof(ArticlePublished), "queue@connectionString");

I can see the SQS router queue message for "Subscribe" message intent.
The Router blows up with an error when I believe it is trying to setup the subscription in AzureServiceBus:

The remote server returned an error: (400) Bad Request. The specified HTTP verb (GET) is not valid. TrackingId:XXXX, SystemTracker:YYYY:Queue:queue

Any guidance on how I am not setting this up properly would be appreciated.

MSMQ / ASB subscription duplicate events

Hi there,

I've been playing around with the router component, in preparation for a piece of migration work I'm working on.

At the moment we have an MSMQ on premise solution, but newer components are coming on board using ASB.

I wanted to use the router the route commands sent from the MSMQ side of things into ASB to be processed there, and then also to route events published at the MSMQ side of things into ASB for processing.

I have been prototyping the solution (based on the mixedtransport solution), commands work great - I send one, it turns up for processing in my Azure Service Bus, all great. But when I try to deal with events though, when I publish an event in the MSMQ side of things, I always end up with exactly two copies of the event in ASB.

I'm at the point now where I think I'm either missing something obvious (or being stupid!) :)

I popped the subscription storage for the router into a SQL db table so I could just look at it, and it looks fine to me.

Any thoughts?

Sample project throws exception "NServiceBus.Router.UnforwardableMessageException"

Hi, been trying to setup mixed transports using this project. My Endpoint/Routing instances are

  • RabbitMq (net core 2.2 - publishes events)
  • Router (Rabbit incoming, MSMQ outgoing) (net 472)
  • MSMQ (net 472, subscribes to and handles events)

When I run them all I get the exception:

Interface Moving poison message to the error queue NServiceBus.Router.UnforwardableMessageException: No route for destination DirectEvents.WebApp

(Where DirectEvents.WebApp is the RabbitMQ Endpoint)

To see where I was going wrong I downloaded and ran the sample solution from here - https://docs.particular.net/samples/router/mixed-transports/

And when I run the server/router/client here I run into basically the same issue:

Interface Moving poison message to the error queue NServiceBus.Router.UnforwardableMessageException: No route for destination Samples.Router.MixedTransports.Server

I'm using:

  • NServiceBus 7.1.10
  • NServiceBus.Persistence.Sql 4.5.1
  • NServiceBus.RabbitMq 5.1.2
  • NServiceBus.Router 3.5.0
  • NServiceBus.Router.Connector 3.5.0
  • NServiceBus.Transport.Msmq 1.0.1

Happy to post a sample solution if that would help - but maybe the sample project download on the Particular page might be enough as they produce the same error?

If it helps, I can send commands from the Rabbit endpoint, through the router to MSMQ endpoint absolutely fine. Just the event pub/sub is not working, as with the sample project.

Thanks in advance.

SQS events not handled by AzureServiceBus

I'm working on getting a router set up so that a new SqsTransport endpoint can interact with some AzureServiceBusTransport endpoints. I'm not sure what I'm doing wrong, but so far I can only successfully send a command across the router from an ASB endpoint and have it handled by the SQS endpoint. When the SQS endpoint publishes an event, the ASB endpoint never handles it.

Rather than post all of the code, here's a repo you can clone to reproduce the issue

As far as I can tell, there are no errors, so I think I'm just missing something or registering the publisher incorrectly.

I'm running .NET Core 5.0 worker apps using:

  • NServiceBus 7.4.0
  • NServiceBus.Transport.AzureServiceBus 1.9.0
  • NServiceBus.AmazonSQS 5.3.1
  • NServiceBus.Router 3.9.0

Unable to provide destination machine/site for subscriptions registered via bridge.RegisterPublisher

Hi Szymon,

We have encountered the problem when trying to register MSMQ publisher which is located on different physical machine than the Router we connect to.
On the client side we have the following code:
var routing = c.UseTransport<RabbitMQTransport>().Routing(); var bridge = routing.ConnectToRouter("Router"); bridge.RegisterPublisher(typeof(SubscribedEvent), "Endpoint");

which leads to following errors in "Router" logs:
NServiceBus.Unicast.Queuing.QueueNotFoundException: Failed to send message to address: [Endpoint@ROUTER-HOST] ---> System.Messaging.MessageQueueException: The queue does not exist or you do not have sufficient permissions to perform the operation.
and, of course, the queue doesn't exist at ROUTER-HOST, we would like to send subscription to Endpoint@ANOTHER-HOST.

I suppose that this kind of routing should be done on client (connector) side, i.e. by specyfing "NServiceBus.Bridge.DestinationSites" header but this doesn't seem to be supported, looking at RouterSubscribeBehavior,

Could you please provide some guidelines how this kind of subscription routing can be achieved?

Question about AzureServiceBusTransport configuration

I am trying to route Azure Service Bus events in an endpoint-oriented topology to SQS. I am using the mixed-transport template as a starting point.

When my router runs I always get an error about the "MainSerializer" not being set, even though I set it like you do in the AcceptanceTests (

var serializer = Tuple.Create(new NewtonsoftSerializer() as SerializationDefinition, new SettingsHolder());
).

            var azureServiceBusInterface = routerConfig.AddInterface<AzureServiceBusTransport>("ASB",
                t => {
                    t.ConnectionString(connectionString);
                    t.UseEndpointOrientedTopology();
			
                    var settings = t.GetSettings();
                    var builder = new ConventionsBuilder(settings);
                    settings.Set<NServiceBus.Conventions>(builder.Conventions);

                    var serializer = Tuple.Create(new NewtonsoftSerializer() as SerializationDefinition, new SettingsHolder());
                    settings.Set("MainSerializer", serializer);
                });

The error fires on this line:

var router = NServiceBus.Router.Router.Create(routerConfig);

I am using NServiceBus 7.1.6 with NServiceBus.Azure.Transports.WindowsAzureServiceBus 9.1.2 and NServiceBus.Router 3.3.0

Physical routing of MSMQ messages

Hi!

We have this setup

machines: A, B and C.

Machine Description
A dotnet core running in docker using rabbitmq as transport
B Router, with two interfaces: rabbitmq and MSMQ
B Endpoint, Bar (.NET Framework)
C Endpoint, Foo (.NET Framework)

Sending BarCommand from A via B Router to B Bar endpoints works great.

How can A send FooCommand to C (Foo) via B?

I have tried:
1.

bridge.RouteToEndpoint(typeof(FooCommand), "Foo@C");

but this fails with invalid address

  1. instance-mapping.xml in the router project.
routerConfig.AddInterface<MsmqTransport>("MSMQ", t => {
  t.Routing().InstanceMappingFile().FilePath("instance-mapping.xml");
});
<?xml version="1.0" encoding="utf-8" ?>
<endpoints>
  <endpoint name="Foo">
    <instance machine="C"/>
  </endpoint>
</endpoints>

But the router still delivers FooCommand to its local machine Foo@B

Send-only router interface

I have the following setup:

Each service that handles messages has a receive endpoint hosted in a separate process. This is the only place I have message handlers and sagas etc. A pure messaging host of sorts. In addition, each of these services also have a web API and/or a web application. In these web APIs/applications I'm using send-only endpoints to publish events and send commands (to their own receive endpoint).

To ensure that database write operations and message sending will be atomic operations, I plan to use the SQL Server transport with a one-way forward route to Azure Service Bus in my send-only endpoints. Just as demonstrated in NServiceBus.Connector.SqlServer.

The NServiceBus.Connector.SqlServer package works well. However, it seems like all interfaces in NServiceBus.Router are send and receive, requiring queues to exist in all connected transports. This feels a bit unnecessary in the scenario above, where the queues in Azure Service Bus will never be used? It's not a huge pain, but there will potentially be double the amount of queues where only half of them are used.

NServiceBus.Raw contains send-only raw endpoints, but if my perception is correct, there are no way of configuring NServiceBus.Router interfaces to use such raw endpoints?

An endpoint cannot use multiple routers

Hi, apologies if I've misunderstood something here, but given the following code:

var transport = configuration.UseTransport<MsmqTransport>();
var routing = transport.Routing();

var router1 = routing.ConnectToRouter(Router1Name);
var router2 = routing.ConnectToRouter(Router2Name);

router1.RouteToEndpoint(typeof(Command1), Service1Name);
router2.RouteToEndpoint(typeof(Command2), Service2Name);

Only router2's routes would be registered in the endpoint. Anything that I think I've set up for router1 would result in a No destination specified... message.

Presumably this is because this line var router2 = routing.ConnectToRouter(Router2Name); overwrites this line var router1 = routing.ConnectToRouter(Router1Name);?

If I'm correct, just wondering is this by design or is it a bug?

In our useage of NServiceBus.Router we've come across a few occassions where we've had to roll our own solution for achieving this and it'd be great if it could be changed.

Thanks in advance for taking a look. If I'm wrong or have misunderstood, then great - just let me know.

Thanks, Rob

Messages without MessageIntent are consumed and dropped

Hi,
I'm using the router to route some non-nservicebus messages into our nservicebus system. It works fine, after I discovered the problem. Thanks for the great work.

As the title says, messages that are missing intent are dropped silently.

I handled this in our system with a custom rule that throws UnforwardableMessageException, but I think this rule should be in the default router, so that messages are moved to the poison queue instead of being dropped.

Happy to do a PR with my (simple) rule.

How to handle errors in the interfaces

We use this library to do routing from a SQL queue to an MSMQ. We have experienced that the program stopped processing messages from the SQL queue, without any signal that we were able to detect.

After the fact we have been able to find some traces in logs, indicating that there had been connectivity issues with the SQL database.

After digging through the code, it seems like the default behaviour for endpoints, when the peek or receive circuit breaker triggers, is to stop the endpoint. But, it does not stop the application it's running in. In those cases, we have an application that appears to be running, but it has stopped peeking on the SQL queues. In those cases, we prefer to just kill the application, and let other mechanisms try to restart the application if possible.

Fortunately, there seems to be a way to hook into the interfaces critical error handling, by setting a callback function in the settings, like this

var sqlInterface = _routerConfig.AddInterface<SqlServerTransport>("SQL", t =>
{
   t.GetSettings().Set("onCriticalErrorAction", OnCriticalError()); // The signature of the callback is Func<ICriticalErrorContext, Task>
  // other settings 
});

First, is this an appropriate way to hook up the critical error callback, or is this actually in internal interface, where the settings key might change?

Maybe a small note in the documentation about this? Or maybe the option to hook into the critical error could be made available in the configuration?

Handle messages with missing CorrelationId headers

2022-01-13 14:52:20.497 ERROR Persistent error while processing messages in Bridge. Entering throttled mode.
System.NullReferenceException: Object reference not set to an instance of an object.
   at TLV.TryDecodeTLV(String tlvString, Action`2 valueCallback)
   at ReplyTraceRule.UnwrapCorrelationIdAndSetTraceHeader(BaseForwardRuleContext context)
   at ReplyTraceRule.AddTraceHeadersForRoutingBackReply(BaseForwardRuleContext context)

Native subscription logic not called after enabling the router (SQL-server transport)

I was testing a specific scenario where the NServiceBus.Router package is used as a bridge between an 'island' with SQL-server as transport and another island that uses Azure Service Bus as transport mechanism. I've noticed that the subscriptions on the SQL transport island are no longer created after I've enabled the router. Or in other words it looks that the default SQL-transport behaviour of NServiceBus is broken after enabling the router.

Steps to reproduce

I have created a small sample solution and added it to GitHub that contains some console applications just to validate this scenario.

Subscribe messages are sent to the publisher (router connector disabled)

For the SQL transport, the subscriber should send new subscribe messages to the publisher. In the sample the SqlAsbBridge.SqlHandler will send these to the SqlAsbBridge.SqlPublisher. This does work if we disable the router connector. So we need to add the following lines of code in comment in order to skip the router connector:

// Add routing logic to ASB:
var router = routing.ConnectToRouter("PoCRouter");
router.RegisterPublisher(typeof(SqlActionCompletedEvent), publisherEndpointName: "SqlPublisher");

In that case the MessageDrivenSubscribeTerminator of NServiceBus will take care of this subscribe behaviour and everything is ok.

Clear all existing subscriptions and subscribe messages from the queues

As we want to validate if the subscribe logic on the SQL transport side also work if we enable the router, we can clear the subscriptions table and remove any subscribe messages in order to take a clean start.

Enable the router connector and restart all apps

If we now enable the router connector and restart the SqlAsbBridge.SqlHandler with the extra lines of code, we will no longer see the subscribe messages for the messages on the SQL island inside the queues. The subscribe messages for messages that will use the router are added perfectly (in this case the ones for the ASB island).

What I've learned from debugging the code

Some debugging learned me that the RouterConnectionFeature inside the NServiceBus.Router.Connector will register a new Behavior<ISubscribeContext>. That's the RouterSubscribeBehavior in the following snippet:

// Snippet from RouterConnectionFeature
context.Pipeline.Register(new ForwardSiteMessagesToRouterBehavior(settings.RouterAddress), "Routes messages sent to sites to the bridge.");
context.Pipeline.Register(new RoutingHeadersBehavior(settings.SendRouteTable), "Sets the ultimate destination endpoint on the outgoing messages.");
context.Pipeline.Register(b => new RouterSubscribeBehavior(subscriberAddress, context.Settings.EndpointName(), settings.RouterAddress, b.Build<IDispatchMessages>(), settings.PublisherTable, nativePubSub), "Dispatches the subscribe request via a router.");
context.Pipeline.Register(b => new RouterUnsubscribeBehavior(subscriberAddress, context.Settings.EndpointName(), settings.RouterAddress, b.Build<IDispatchMessages>(), settings.PublisherTable, nativePubSub), 

This RouterSubscribeBehavior will take care of the subscribe logic for messages that need the router. That's fine but depending on the invokeTerminator flag the existing MessageDrivenSubscribeTerminator of NServiceBus itself will be called or not:

public override async Task Invoke(ISubscribeContext context, Func<Task> next)
{
	var eventType = context.EventType;
	if (publisherTable.TryGetValue(eventType, out var publisherEndpoint))
	{

		Logger.Debug($"Sending subscribe request for {eventType.AssemblyQualifiedName} to router queue {routerAddress} to be forwarded to {publisherEndpoint}");

		var subscriptionMessage = ControlMessageFactory.Create(MessageIntentEnum.Subscribe);

		subscriptionMessage.Headers[Headers.SubscriptionMessageType] = eventType.AssemblyQualifiedName;
		subscriptionMessage.Headers[Headers.ReplyToAddress] = subscriberAddress;
		subscriptionMessage.Headers[Headers.SubscriberTransportAddress] = subscriberAddress;
		subscriptionMessage.Headers[Headers.SubscriberEndpoint] = subscriberEndpoint;
		subscriptionMessage.Headers["NServiceBus.Bridge.DestinationEndpoint"] = publisherEndpoint;
		subscriptionMessage.Headers[Headers.TimeSent] = DateTimeExtensions.ToWireFormattedString(DateTime.UtcNow);
		subscriptionMessage.Headers[Headers.NServiceBusVersion] = "6.3.1"; //The code has been copied from 6.3.1

		var transportOperation = new TransportOperation(subscriptionMessage, new UnicastAddressTag(routerAddress));
		var transportTransaction = context.Extensions.GetOrCreate<TransportTransaction>();
		await dispatcher.Dispatch(new TransportOperations(transportOperation), transportTransaction, context.Extensions).ConfigureAwait(false);
	}

	if (invokeTerminator) // This flag is false for SQL transport and so the terminator is not invoked
	{
		await next().ConfigureAwait(false);
	}
}

As this flag is false for the SQL-transport, the NServiceBus code is not called and the subscriptions for the SQL island itself are not added.

Should we invoke this terminator or is there a specific reason why we don't invoke it?

Thanks in advance for your help!

Transform messages

Hi,

Is there a way to configure the router to transform messages on the way through? I can see there are Rules which look like they might be useful but I can't tell whether the messages are immutable from within a rule.

Thanks

Adam

Sagas and ReplyToOriginator

Hi!

The router is working fine when the message is processed by a handler. IHandleMessages/context.Reply and correlationId is not changed.

But, when the message is processed by a Saga, ReplyToOriginator will set the correlationid to the originating messageid.

Line 110:

https://github.com/Particular/NServiceBus/blob/65fee422bdd79a3e5d4adc3dcc40353cb5b9dd82/src/NServiceBus.Core/Sagas/Saga.cs

and line 11:

https://github.com/Particular/NServiceBus/blob/65fee422bdd79a3e5d4adc3dcc40353cb5b9dd82/src/NServiceBus.Core/Correlation/AttachCorrelationIdBehavior.cs

Any plans on supporting this scenario?

Adding TLV to the outgoing MessageId might solve this issue together with the CorrelationId.

SqlSubscriptionStorage should use Func<Task<DbConnection>>

SqlSubscriptionStorage currently takes a Func which is problematic when there is async work to be done (in my case, acquiring an Access Token). It would be better if it was a Func<Task>

The Sql Transport supports this (see UseCustomSqlConnectionFactory)

I'd be happy to submit a PR but naturally this would be a breaking change.

NServiceBus Router does not appear to survive MSMQ restarts

I have an NSB router responsible for Routing MSMQ messages to our Azure ServiceBus instance.

I have observed when I stop MSMQ and subsequently start it again the NServiceBus Router is unable to re-connect to the MSMQ instance.

System.Messaging.MessageQueueException (0x80004005): An invalid handle was passed to the function.
   at System.Messaging.MessageEnumerator.MoveNext(TimeSpan timeout)
   at NServiceBus.Transport.Msmq.MessagePump.<InnerProcessMessages>d__6.MoveNext()

Previous versions of the Router appeared to shut down the router. Whilst the latest version 3.9.2 appears to just continually poll MSMQ.

Is there a way this could be reported as a critical error and then the service could be restarted?

ASB - Rule too long exception silently absorbed

Hi,
I am trying to connect a MSMQ subscriber to an Azure Service Bus Publisher.
I created a NServicebus.Router to connect the two, but I had forgotten to add a Rule Name shortener to it. Once I had added that to the Router it worked fine. I did spend an awful lot of time trying to find the problem. The ASB Publisher throws an exception on startup so it is obvious what the problem is, but that exception is getting absorbed by the Router.

I have forked the repo and added a test to show the problem.

Would it be possible to get the Router to throw the error at startup?

Cannot use "RegisterPublisher" for SendOnly enpoints in Azure Service Bus

I'm trying to migrate some subscribers to RabbitMQ, while publishers are still on Azure Service Bus.
This does not seem to work if the publisher is a SendOnly endpoint.
Following exception is thrown:
2020-03-20 15:25:31.109 ERROR ThrottlingRawEndpointConfig1[[NServiceBus.RabbitMQTransport, NServiceBus.Transport.RabbitMQ, Version=5.0.0.0, Culture=neutral, PublicKeyToken=9fc386479f8a226c]] Error processing a message. Continuing in throttled mode. Microsoft.Azure.ServiceBus.MessagingEntityNotFoundException: Put token failed. status-code: 404, status-description: The messaging entity 'sb://testnamespaceiaoao.servicebus.windows.net/legacy-publisher' could not be found. To know more visit https://aka.ms/sbResourceMgrExceptions. TrackingId:732270da-9516-4813-bc61-3f1cbaa626c1_G5, SystemTracker:seabookdev.servicebus.windows.net:legacy-publisher, Timestamp:2020-03-20T14:25:32. at Microsoft.Azure.ServiceBus.Core.MessageSender.OnSendAsync(IList1 messageList)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func1 operation, TimeSpan operationTimeout) at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageSender.SendAsync(IList1 messageList) at PostroutingTerminator.Terminate(PostroutingContext context) at NServiceBus.Router.ChainTerminator1.Invoke(T context, Func2 next) at NServiceBus.Router.TerminatorInvocationRule1.Invoke(T context, Func2 next) at NServiceBus.Router.Migrator.ShadowForwardSubscribeRule.Terminate(ForwardSubscribeContext context) at NServiceBus.Router.ChainTerminator1.Invoke(T context, Func2 next) at NServiceBus.Router.TerminatorInvocationRule1.Invoke(T context, Func2 next) at SubscribePreroutingTerminator.Terminate(SubscribePreroutingContext context) at NServiceBus.Router.ChainTerminator1.Invoke(T context, Func2 next) at NServiceBus.Router.TerminatorInvocationRule1.Invoke(T context, Func2 next) at PreroutingToSubscribePreroutingFork.Invoke(PreroutingContext context, Func2 next)
at ThrottlingRawEndpointConfig`1.<>c__DisplayClass2_0.<b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(BasicDeliverEventArgs message)

Question about router between MSMQ and SqlServer ('A chain has to have at least one terminator: ForwardPublishContext.')

Hi,
i'm trying to learn how the router works, so i took this sample https://docs.particular.net/samples/router/mixed-transports/ and tring to switch rabbitmq with SqlServer :

    var endpointConfiguration = new EndpointConfiguration("Samples.Router.MixedTransports.Server");

    var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
    transport.ConnectionString("host=localhost");
    transport.UseConventionalRoutingTopology();

    endpointConfiguration.UsePersistence<InMemoryPersistence>();

    var recoverability = endpointConfiguration.Recoverability();
    recoverability.Immediate(immediate => immediate.NumberOfRetries(0));
    recoverability.Delayed(delayed => delayed.NumberOfRetries(0));

    endpointConfiguration.SendFailedMessagesTo("error");
    endpointConfiguration.AuditProcessedMessagesTo("audit");
    endpointConfiguration.EnableInstallers();

    var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);

    Console.WriteLine("Press <enter> to exit.");
    Console.ReadLine();

    await endpointInstance.Stop().ConfigureAwait(false);`

The exception is

A chain has to have at least one terminator: ForwardPublishContext.'

image

What am i missing ?
Thanks in advance.

ASB subscription/rule name shortener preventing subscription requests through NServiceBus Router

I’ve created an MSMQ based Publisher that can publish event messages to an Azure Service Bus based Subscriber. I order to accomplish this, I’ve implemented NServiceBus.Router into the publisher (running in same process). This works fine. Subscriber can send subscription request and publisher stores it in subscription storage as originating from router. Publisher can publish event messages to subscriber.

However, for subscriptions where subscription/rule name exceeds 50 characters, at which point the name shorteners kicks in, then the subscription request is not picked up by the publisher, and it is not stored in subscription storage, which means publisher cannot publish event messages for that type.

Any way to fix this?

Use RavenDb subscription persistence along with NServiceBus.Router and msmq transport

I use MSMQ to Azure Service Bus route. During MSMQ interface setup I need to setup subscription persistence:
var subscriptionStorage = new InMemorySubscriptionStorage(); msmqInterface.EnableMessageDrivenPublishSubscribe(subscriptionStorage);

I see there is sql subscription persistence implementation out of the box.
Is there anything similar for RavenDb?
Is it possible to use RavenDb subscription persistence from NServiceBus.RavenDb nuget package without custom 'ISubscriptionStorage' implementation?

SQS to AzureServiceBus - The given key (NServiceBus.Unicast.Messages.MessageMetadataRegistry) was not present in the dictionary.

I'm trying to set up a router for sending messages from AzureServiceBus to SQS but am getting the following exception in the SQS interface when starting the router:

System.Collections.Generic.KeyNotFoundException: The given key (NServiceBus.Unicast.Messages.MessageMetadataRegistry) was not present in the dictionary.
   at NServiceBus.Settings.SettingsHolder.Get(String key) in /_/src/NServiceBus.Core/Settings/SettingsHolder.cs:line 91
   at NServiceBus.Settings.SettingsHolder.Get[T]() in /_/src/NServiceBus.Core/Settings/SettingsHolder.cs:line 70
   at NServiceBus.Transport.SQS.Configure.SqsTransportInfrastructure..ctor(ReadOnlySettings settings) in /_/src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs:line 24
   at NServiceBus.SqsTransport.Initialize(SettingsHolder settings, String connectionString) in /_/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs:line 42
   at NServiceBus.Raw.InitializableRawEndpoint.Initialize()
   at ThrottlingRawEndpointConfig`1.Create()
   at Interface`1.Initialize(InterfaceChains interfaces, RootContext rootContext)
   at RouterImpl.Initialize()
   at RouterImpl.Start()

Here is my router configuration code:

...
var routerConfig = new RouterConfiguration($"ASBToSQS.Router");
var azureInterface = routerConfig.AddInterface<AzureServiceBusTransport>("Azure", t =>
{
    t.ConnectionString(context.Configuration["NServiceBus:Router:AzureConnectionString"]);

    t.Transactions(TransportTransactionMode.ReceiveOnly);
    t.SubscriptionRuleNamingConvention((entityType) =>
    {
        var entityPathOrName = entityType.Name;
        if (entityPathOrName.Length >= 50)
        {
            return entityPathOrName.Split('.').Last();
        }

        return entityPathOrName;
    });
});

var sqsInterface = routerConfig.AddInterface<SqsTransport>("SQS", t =>
{
    // using default environment variable credentials
});

var staticRouting = routerConfig.UseStaticRoutingProtocol();

staticRouting.AddForwardRoute("Azure", "SQS");

routerConfig.AutoCreateQueues();
...

I've tried calling .EnableMessageDrivenPublishSubscribe(...) using sql subscription storage because I found this issue and it seemed like that was the only notable difference in setups, but I'm still getting the same error.

This is running in a .NET Core 5.0 worker app using:

  • NServiceBus 7.4.4
  • NServiceBus.Transport.AzureServiceBus 1.9.0
  • NServiceBus.AmazonSQS 5.3.1
  • NServiceBus.Router 3.8.1

I'm sure I'm just missing something but am out of ideas. Any help would be greatly appreciated.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.