szymonpobiega / nservicebus.router Goto Github PK
View Code? Open in Web Editor NEWCross-transport, cross-site and possibly cross-cloud router component for NServiceBus
License: MIT License
Cross-transport, cross-site and possibly cross-cloud router component for NServiceBus
License: MIT License
I am looking to instrument our router instance with some metrics. I was able to get a hook into the prerouting context using a Rule. Is there something similar for failures?
Exception "LocalAddress isn't available since this endpoint is configured to run in send-only mode." is thrown.
Is it possible to enable sendonly endpoints?
My configration looks like this:
busConfiguration.SendOnly();
var transport = busConfiguration.UseTransport<RabbitMQTransport>()
.ConnectionString(ConfigurationManager.ConnectionStrings["NServiceBus/Transport"].ConnectionString)
.TimeToWaitBeforeTriggeringCircuitBreaker(TimeSpan.FromMinutes(10))
.UseConventionalRoutingTopology();
var routing = transport.Routing();
var routedRouting = routing.ConnectToRouter("Transport.Bridge");
routedRouting.RouteToEndpoint(typeof(AddActionableCommand), "Actionables.Service");
Thx
Howdy, so I have 3 docker containers in my example:
Now, having followed every example under the sun and having proved that those examples do work locally, I am still unable to get the Router to route the Events from the Publisher to the Subscriber. Normal messages come through, but no events. I can see the Publisher is publishing the event AND sending the message, but the subscriber only ever gets the message, never the event.
I want to see what's going on with the Router. I want to see its debug output so I can see if the events are even reaching it and/or what's happening with them.
What's the trick to enabling debugging / verbose mode on the router, particularly using the .NET Core Worker Service UseNServiceBusRouter syntax?
Cheers in advance.
Hi,
I'm getting the following error with multiple command types. The strange this is that it doesn't happen all the time and sometimes I can replay the messages and it works and other times it continues to fail. Note: when not using Routing Bridge these issues don't happen.
We are using the following serializeration:
configuration.UseSerialization();
configuration.AddDeserializer();
NServiceBus.MessageDeserializationException: An error occurred while attempting to extract logical messages from incoming physical message a14e31b0-a31b-493d-90fc-ac3200432251 ---> System.Exception: Could not determine type for node: 'SetupCheckForDuplicateRecords'.
at NServiceBus.XmlDeserialization.InferNodeType(XmlNode node, Object parent)
at NServiceBus.XmlDeserialization.Process(XmlNode node, Object parent, Type nodeType)
at NServiceBus.XmlDeserialization.Deserialize(Stream stream, IList`1 messageTypesToDeserialize)
at NServiceBus.DeserializeMessageConnector.Extract(IncomingMessage physicalMessage)
at NServiceBus.DeserializeMessageConnector.ExtractWithExceptionHandling(IncomingMessage message)
--- End of inner exception stack trace ---
at NServiceBus.DeserializeMessageConnector.ExtractWithExceptionHandling(IncomingMessage message)
at NServiceBus.DeserializeMessageConnector.d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.SubscriptionReceiverBehavior.d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.InvokeAuditPipelineBehavior.d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.ProcessingStatisticsBehavior.d__0.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at Jitb.NSB.Commons.JitbUnitOfWorkBehavior.d__2.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.TransportReceiveToPhysicalMessageConnector.d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.MainPipelineExecutor.d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.Msmq.ReceiveStrategy.d__7.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.Msmq.TransactionScopeStrategy.d__2.MoveNext()
I saw a similar issue (https://discuss.particular.net/t/json-deserialization-error-when-not-using-shared-class-library-for-messages/1702), but that issue was fixed in 3.7.1. I'm currently running 3.8,1.
Thanks,
Eric
Hi,
i would like to isolate each queue in it's own schema, is it a good idea ? how can i do this on the router ?
Does the router has to live in the same schema as the interface added ? (i have a routing problem doing that, because the router tries to send the message to it's own schema, i guess because it does not know the schema of the destination ?)
What is the best practice to use the router inside an enterprise with many endpoint ?
One single catalog and many schema ? If so do i need many routers as many schema are present ?
Thanks for the help.
While upgrading NServiceBus in an application, we realised table poison (whatever name it is assigned via PoisonQueueName
) is no longer created.
Router
Expected result | Actual result |
---|---|
table poison being created |
no poison table found |
When forcing references to previous packages ie
everything works as expected
As in this mode NServiceBus transports use the receive connection to perform sends.
We are trying to migrate a RMQ publisher to ASB, while keeping a subscriber at RMQ. According to the Readme it is possible to do that by enabling the migration mode for the publisher first. The problem is that the RMQ subscriber is not handling the events since we enabled transport migration for the publisher.
How to reproduce?
var routing = busConfiguration.EnableTransportMigration<RabbitMQTransport, AzureServiceBusTransport>(
rmq =>
{
rmq.ConnectionString(_configuration["ConnectionStringRMQ"]);
rmq.UseConventionalRoutingTopology();
},
asb =>
{
asb.ConnectionString(_configuration["ConnectionStringASB"]);
});
Result: the RMQ subscriber is not handling the published event.
Is this something that is expected? Or is our configuration incorrect?
I've created a simple sample solution with three console applications: a RabbitMQ endpoint, a RabbitMQ to Azure Service Bus (ASB) router, and an ASB endpoint. The router successfully routes an NServiceBus ICommand from the RabbitMQ endpoint to the ASB endpoint. Works great with no issues! However, if the RabbitMQ endpoint attempts to publish an IEvent that is subscribed to by ASB endpoint, the event never makes it's way to ASB. I assume this is meant to work? Both RabbitMQ and ASB have native pub/sub ability.
I can see that most of the necessary infrastructure for publishing the IEvent is set up properly:
However, the published event never makes it's way to my ASB endpoint. Upon further investigation, it appears the problem may be with the RabbitMQ side of the communication. I found that if I manually create a binding between the exchange for my IEvent and the router exchange (which was automatically created) then my published IEvent makes it's way to the ASB endpoint as expected. This seems to indicate that the issue may lie in a binding not being properly created in RabbitMQ for forwarding IEvents to the router exchange.
This behavior can be easily confirmed. I have simple code that can be made available that demonstrates this. So my questions are these:
Here is the core code of my sample:
RabbitMQ endpoint configuration:
var endpointConfiguration = new EndpointConfiguration(Settings.SampleClientEndpointName);
endpointConfiguration.License(NServiceBusLicense.Text);
endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
endpointConfiguration.UsePersistence<InMemoryPersistence>();
endpointConfiguration.EnableInstallers();
var recoverability = endpointConfiguration.Recoverability();
recoverability.Immediate(immediate => immediate.NumberOfRetries(0));
recoverability.Delayed(delayed => delayed.NumberOfRetries(0));
endpointConfiguration.SetDiagnosticsPath($@"C:\Temp\NSB\{Settings.SampleClientEndpointName}");
endpointConfiguration.EnableInstallers();
var routingSettings = endpointConfiguration.UseTransport<RabbitMQTransport>()
.ConnectionString(Settings.RabbitMqConnectionString)
.UseConventionalRoutingTopology()
.Transactions(TransportTransactionMode.ReceiveOnly)
.Routing();
var bridge = routingSettings.ConnectToRouter(Settings.SampleRouterEndpointName);
bridge.RouteToEndpoint(typeof(TestCommand), Settings.SampleServerEndpointName);
var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);
RabbitMQ to ASB router configuration:
var routerConfig = new RouterConfiguration(Settings.SampleRouterEndpointName);
routerConfig.AddInterface<RabbitMQTransport>(RabbitMqInterfaceName, t =>
{
t.ConnectionString(Settings.RabbitMqConnectionString);
t.UseConventionalRoutingTopology();
});
routerConfig.AddInterface<AzureServiceBusTransport>(AzureServiceBusInterfaceName, t =>
{
t.ConnectionString(Settings.AzureServiceBusConnectionString);
t.TopicName(Settings.TopicName);
});
var staticRouting = routerConfig.UseStaticRoutingProtocol();
staticRouting.AddForwardRoute(RabbitMqInterfaceName, AzureServiceBusInterfaceName);
routerConfig.AutoCreateQueues();
var router = NServiceBus.Router.Router.Create(routerConfig);
await router.Start().ConfigureAwait(false);
Azure Service Bus endpoint configuration:
var endpointConfiguration = new EndpointConfiguration(Settings.SampleServerEndpointName);
endpointConfiguration.License(NServiceBusLicense.Text);
endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
endpointConfiguration.UsePersistence<InMemoryPersistence>();
endpointConfiguration.EnableInstallers();
var recoverability = endpointConfiguration.Recoverability();
recoverability.Immediate(immediate => immediate.NumberOfRetries(0));
recoverability.Delayed(delayed => delayed.NumberOfRetries(0));
endpointConfiguration.SetDiagnosticsPath($@"C:\Temp\NSB\{Settings.SampleServerEndpointName}");
endpointConfiguration.UseTransport<AzureServiceBusTransport>()
.ConnectionString(Settings.AzureServiceBusConnectionString)
.Transactions(TransportTransactionMode.ReceiveOnly)
.EnablePartitioning()
.PrefetchMultiplier(10)
.TopicName(Settings.TopicName);
var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);
And again, let me point out, that if I manually create a binding in RabbitMQ between the IEvent exchange and the router exchange then my events DO work. I just shouldn't need to manually create this binding for events to work. I need to understand how the router, or my use of it, will automatically create the necessary exchange bindings for IEvents in RabbitMQ.
Any insight that you can provide will be greatly appreciated.
I have successfully implemented an endpoint (v7.5.0 using .net 4.8) that runs on a windows server and utilizes the router to bridge between the MSMQ and AWS. It runs correctly from multiple developers machines but we have into some issues moving it into our development environment.
In our dev environment we have added the SQS queues urls to the proxy and the generic SNS url us-east-2 to the proxy as well.
I also have both the router and endpoint itself setting up the client factory to configure to use the SQS client with a SQS config object to set the properties for the proxy.
When we run this all the SQS queues get created properly from behind the proxy, but the SNS topics and subscriptions are not created from behind the proxy. Looking at CloudTrail I don’t see any SNS activity except from our local machines.
I wanted to see if there is anything special with the SNS component that needs to be configured differently for it to work from behind a proxy. I am already doing custom config on the router so it will work based on previous responses to questions I have posted in the forums.
extensions.ClientFactory(() => {
return (!useProxy)? new AmazonSQSClient(): new AmazonSQSClient(new AmazonSQSConfig
{
ProxyHost = proxyUrl,
ProxyPort = proxyPort
});
});
var ctor = typeof(MessageMetadataRegistry).GetConstructor(BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance, null, new[] { typeof(Func<Type, bool>) }, null);
#pragma warning disable CS0618 // Type or member is obsolete
settings.Set(ctor.Invoke(new object[] { (Func<Type, bool>)isMessageType }));
#pragma warning restore CS0618 // Type or member is obsolete
Update - 9/13/2021
It appears as though the SQS traffic is https and is successfully going through the proxy.
The SNS traffic appears to be going through the firewall as tcp over port 443.
I am trying to setup a connection between an AzureServiceBus Publisher and an Amazon SQS Subscriber.
The following is my router code:
static async Task Main()
{
Console.Title = "Demo.Router";
var routerConfig = new RouterConfiguration("Demo.Router");
var sqsInterface = routerConfig.AddInterface<SqsTransport>("SQS", t =>
{
t.ClientFactory(() =>
new AmazonSQSClient(
new BasicAWSCredentials(accessKey, accessSecret),
RegionEndpoint.GetBySystemName(regionName)));
});
var mongoDatabase = new MongoClient(connectionString).GetDatabase(MongoUrl.Create(connectionString).DatabaseName);
sqsInterface.EnableMessageDrivenPublishSubscribe(new SubscriptionPersister(mongoDatabase));
var azureServiceBusInterface = routerConfig.AddInterface<AzureServiceBusTransport>("ASB",
t => {
t.ConnectionString(connectionString);
var topology = t.UseEndpointOrientedTopology();
topology.RegisterPublisher(typeof(ArticlePublished),
"queue@connectionString");
var sanitization = t.Sanitization();
sanitization.UseStrategy<Sha1Sanitization>();
var settings = t.GetSettings();
var builder = new ConventionsBuilder(settings);
builder.DefiningEventsAs(e =>
e.Namespace != null &&
e.Namespace.ToLowerInvariant().EndsWith("events"));
settings.Set<NServiceBus.Conventions>(builder.Conventions);
var serializer = Tuple.Create(new NewtonsoftSerializer() as SerializationDefinition, settings);
settings.Set("MainSerializer", serializer);
});
var staticRouting = routerConfig.UseStaticRoutingProtocol();
staticRouting.AddForwardRoute("SQS", "ASB");
routerConfig.AutoCreateQueues();
var router = NServiceBus.Router.Router.Create(routerConfig);
await router.Start().ConfigureAwait(false);
Console.WriteLine("Press <enter> to exit");
Console.ReadLine();
await router.Stop().ConfigureAwait(false);
}
}
My SQS Subscriber has the following connection to the Router:
var bridge = transport.Routing().ConnectToRouter("Demo.Router");
bridge.RegisterPublisher(typeof(ArticlePublished), "queue@connectionString");
I can see the SQS router queue message for "Subscribe" message intent.
The Router blows up with an error when I believe it is trying to setup the subscription in AzureServiceBus:
The remote server returned an error: (400) Bad Request. The specified HTTP verb (GET) is not valid. TrackingId:XXXX, SystemTracker:YYYY:Queue:queue
Any guidance on how I am not setting this up properly would be appreciated.
Hi there,
I've been playing around with the router component, in preparation for a piece of migration work I'm working on.
At the moment we have an MSMQ on premise solution, but newer components are coming on board using ASB.
I wanted to use the router the route commands sent from the MSMQ side of things into ASB to be processed there, and then also to route events published at the MSMQ side of things into ASB for processing.
I have been prototyping the solution (based on the mixedtransport solution), commands work great - I send one, it turns up for processing in my Azure Service Bus, all great. But when I try to deal with events though, when I publish an event in the MSMQ side of things, I always end up with exactly two copies of the event in ASB.
I'm at the point now where I think I'm either missing something obvious (or being stupid!) :)
I popped the subscription storage for the router into a SQL db table so I could just look at it, and it looks fine to me.
Any thoughts?
Hi, been trying to setup mixed transports using this project. My Endpoint/Routing instances are
When I run them all I get the exception:
Interface Moving poison message to the error queue NServiceBus.Router.UnforwardableMessageException: No route for destination DirectEvents.WebApp
(Where DirectEvents.WebApp
is the RabbitMQ Endpoint)
To see where I was going wrong I downloaded and ran the sample solution from here - https://docs.particular.net/samples/router/mixed-transports/
And when I run the server/router/client here I run into basically the same issue:
Interface Moving poison message to the error queue NServiceBus.Router.UnforwardableMessageException: No route for destination Samples.Router.MixedTransports.Server
I'm using:
Happy to post a sample solution if that would help - but maybe the sample project download on the Particular page might be enough as they produce the same error?
If it helps, I can send commands from the Rabbit endpoint, through the router to MSMQ endpoint absolutely fine. Just the event pub/sub is not working, as with the sample project.
Thanks in advance.
I'm working on getting a router set up so that a new SqsTransport endpoint can interact with some AzureServiceBusTransport endpoints. I'm not sure what I'm doing wrong, but so far I can only successfully send a command across the router from an ASB endpoint and have it handled by the SQS endpoint. When the SQS endpoint publishes an event, the ASB endpoint never handles it.
Rather than post all of the code, here's a repo you can clone to reproduce the issue
As far as I can tell, there are no errors, so I think I'm just missing something or registering the publisher incorrectly.
I'm running .NET Core 5.0 worker apps using:
Hi Szymon,
We have encountered the problem when trying to register MSMQ publisher which is located on different physical machine than the Router we connect to.
On the client side we have the following code:
var routing = c.UseTransport<RabbitMQTransport>().Routing(); var bridge = routing.ConnectToRouter("Router"); bridge.RegisterPublisher(typeof(SubscribedEvent), "Endpoint");
which leads to following errors in "Router" logs:
NServiceBus.Unicast.Queuing.QueueNotFoundException: Failed to send message to address: [Endpoint@ROUTER-HOST] ---> System.Messaging.MessageQueueException: The queue does not exist or you do not have sufficient permissions to perform the operation.
and, of course, the queue doesn't exist at ROUTER-HOST, we would like to send subscription to Endpoint@ANOTHER-HOST.
I suppose that this kind of routing should be done on client (connector) side, i.e. by specyfing "NServiceBus.Bridge.DestinationSites"
header but this doesn't seem to be supported, looking at RouterSubscribeBehavior
,
Could you please provide some guidelines how this kind of subscription routing can be achieved?
I am trying to route Azure Service Bus events in an endpoint-oriented topology to SQS. I am using the mixed-transport template as a starting point.
When my router runs I always get an error about the "MainSerializer" not being set, even though I set it like you do in the AcceptanceTests (
var azureServiceBusInterface = routerConfig.AddInterface<AzureServiceBusTransport>("ASB",
t => {
t.ConnectionString(connectionString);
t.UseEndpointOrientedTopology();
var settings = t.GetSettings();
var builder = new ConventionsBuilder(settings);
settings.Set<NServiceBus.Conventions>(builder.Conventions);
var serializer = Tuple.Create(new NewtonsoftSerializer() as SerializationDefinition, new SettingsHolder());
settings.Set("MainSerializer", serializer);
});
The error fires on this line:
var router = NServiceBus.Router.Router.Create(routerConfig);
I am using NServiceBus 7.1.6 with NServiceBus.Azure.Transports.WindowsAzureServiceBus 9.1.2 and NServiceBus.Router 3.3.0
Hi!
We have this setup
machines: A, B and C.
Machine | Description |
---|---|
A | dotnet core running in docker using rabbitmq as transport |
B | Router, with two interfaces: rabbitmq and MSMQ |
B | Endpoint, Bar (.NET Framework) |
C | Endpoint, Foo (.NET Framework) |
Sending BarCommand from A via B Router to B Bar endpoints works great.
How can A send FooCommand to C (Foo) via B?
I have tried:
1.
bridge.RouteToEndpoint(typeof(FooCommand), "Foo@C");
but this fails with invalid address
routerConfig.AddInterface<MsmqTransport>("MSMQ", t => {
t.Routing().InstanceMappingFile().FilePath("instance-mapping.xml");
});
<?xml version="1.0" encoding="utf-8" ?>
<endpoints>
<endpoint name="Foo">
<instance machine="C"/>
</endpoint>
</endpoints>
But the router still delivers FooCommand to its local machine Foo@B
I have the following setup:
Each service that handles messages has a receive endpoint hosted in a separate process. This is the only place I have message handlers and sagas etc. A pure messaging host of sorts. In addition, each of these services also have a web API and/or a web application. In these web APIs/applications I'm using send-only endpoints to publish events and send commands (to their own receive endpoint).
To ensure that database write operations and message sending will be atomic operations, I plan to use the SQL Server transport with a one-way forward route to Azure Service Bus in my send-only endpoints. Just as demonstrated in NServiceBus.Connector.SqlServer.
The NServiceBus.Connector.SqlServer package works well. However, it seems like all interfaces in NServiceBus.Router are send and receive, requiring queues to exist in all connected transports. This feels a bit unnecessary in the scenario above, where the queues in Azure Service Bus will never be used? It's not a huge pain, but there will potentially be double the amount of queues where only half of them are used.
NServiceBus.Raw contains send-only raw endpoints, but if my perception is correct, there are no way of configuring NServiceBus.Router interfaces to use such raw endpoints?
Hi, apologies if I've misunderstood something here, but given the following code:
var transport = configuration.UseTransport<MsmqTransport>();
var routing = transport.Routing();
var router1 = routing.ConnectToRouter(Router1Name);
var router2 = routing.ConnectToRouter(Router2Name);
router1.RouteToEndpoint(typeof(Command1), Service1Name);
router2.RouteToEndpoint(typeof(Command2), Service2Name);
Only router2
's routes would be registered in the endpoint. Anything that I think I've set up for router1
would result in a No destination specified...
message.
Presumably this is because this line var router2 = routing.ConnectToRouter(Router2Name);
overwrites this line var router1 = routing.ConnectToRouter(Router1Name);
?
If I'm correct, just wondering is this by design or is it a bug?
In our useage of NServiceBus.Router
we've come across a few occassions where we've had to roll our own solution for achieving this and it'd be great if it could be changed.
Thanks in advance for taking a look. If I'm wrong or have misunderstood, then great - just let me know.
Thanks, Rob
Hi,
I'm using the router to route some non-nservicebus messages into our nservicebus system. It works fine, after I discovered the problem. Thanks for the great work.
As the title says, messages that are missing intent are dropped silently.
I handled this in our system with a custom rule that throws UnforwardableMessageException
, but I think this rule should be in the default router, so that messages are moved to the poison queue instead of being dropped.
Happy to do a PR with my (simple) rule.
We use this library to do routing from a SQL queue to an MSMQ. We have experienced that the program stopped processing messages from the SQL queue, without any signal that we were able to detect.
After the fact we have been able to find some traces in logs, indicating that there had been connectivity issues with the SQL database.
After digging through the code, it seems like the default behaviour for endpoints, when the peek or receive circuit breaker triggers, is to stop the endpoint. But, it does not stop the application it's running in. In those cases, we have an application that appears to be running, but it has stopped peeking on the SQL queues. In those cases, we prefer to just kill the application, and let other mechanisms try to restart the application if possible.
Fortunately, there seems to be a way to hook into the interfaces critical error handling, by setting a callback function in the settings, like this
var sqlInterface = _routerConfig.AddInterface<SqlServerTransport>("SQL", t =>
{
t.GetSettings().Set("onCriticalErrorAction", OnCriticalError()); // The signature of the callback is Func<ICriticalErrorContext, Task>
// other settings
});
First, is this an appropriate way to hook up the critical error callback, or is this actually in internal interface, where the settings key might change?
Maybe a small note in the documentation about this? Or maybe the option to hook into the critical error could be made available in the configuration?
2022-01-13 14:52:20.497 ERROR Persistent error while processing messages in Bridge. Entering throttled mode.
System.NullReferenceException: Object reference not set to an instance of an object.
at TLV.TryDecodeTLV(String tlvString, Action`2 valueCallback)
at ReplyTraceRule.UnwrapCorrelationIdAndSetTraceHeader(BaseForwardRuleContext context)
at ReplyTraceRule.AddTraceHeadersForRoutingBackReply(BaseForwardRuleContext context)
I was testing a specific scenario where the NServiceBus.Router package is used as a bridge between an 'island' with SQL-server as transport and another island that uses Azure Service Bus as transport mechanism. I've noticed that the subscriptions on the SQL transport island are no longer created after I've enabled the router. Or in other words it looks that the default SQL-transport behaviour of NServiceBus is broken after enabling the router.
I have created a small sample solution and added it to GitHub that contains some console applications just to validate this scenario.
For the SQL transport, the subscriber should send new subscribe messages to the publisher. In the sample the SqlAsbBridge.SqlHandler
will send these to the SqlAsbBridge.SqlPublisher
. This does work if we disable the router connector. So we need to add the following lines of code in comment in order to skip the router connector:
// Add routing logic to ASB:
var router = routing.ConnectToRouter("PoCRouter");
router.RegisterPublisher(typeof(SqlActionCompletedEvent), publisherEndpointName: "SqlPublisher");
In that case the MessageDrivenSubscribeTerminator
of NServiceBus will take care of this subscribe behaviour and everything is ok.
As we want to validate if the subscribe logic on the SQL transport side also work if we enable the router, we can clear the subscriptions table and remove any subscribe messages in order to take a clean start.
If we now enable the router connector and restart the SqlAsbBridge.SqlHandler
with the extra lines of code, we will no longer see the subscribe messages for the messages on the SQL island inside the queues. The subscribe messages for messages that will use the router are added perfectly (in this case the ones for the ASB island).
Some debugging learned me that the RouterConnectionFeature
inside the NServiceBus.Router.Connector
will register a new Behavior<ISubscribeContext>
. That's the RouterSubscribeBehavior
in the following snippet:
// Snippet from RouterConnectionFeature
context.Pipeline.Register(new ForwardSiteMessagesToRouterBehavior(settings.RouterAddress), "Routes messages sent to sites to the bridge.");
context.Pipeline.Register(new RoutingHeadersBehavior(settings.SendRouteTable), "Sets the ultimate destination endpoint on the outgoing messages.");
context.Pipeline.Register(b => new RouterSubscribeBehavior(subscriberAddress, context.Settings.EndpointName(), settings.RouterAddress, b.Build<IDispatchMessages>(), settings.PublisherTable, nativePubSub), "Dispatches the subscribe request via a router.");
context.Pipeline.Register(b => new RouterUnsubscribeBehavior(subscriberAddress, context.Settings.EndpointName(), settings.RouterAddress, b.Build<IDispatchMessages>(), settings.PublisherTable, nativePubSub),
This RouterSubscribeBehavior
will take care of the subscribe logic for messages that need the router. That's fine but depending on the invokeTerminator
flag the existing MessageDrivenSubscribeTerminator
of NServiceBus itself will be called or not:
public override async Task Invoke(ISubscribeContext context, Func<Task> next)
{
var eventType = context.EventType;
if (publisherTable.TryGetValue(eventType, out var publisherEndpoint))
{
Logger.Debug($"Sending subscribe request for {eventType.AssemblyQualifiedName} to router queue {routerAddress} to be forwarded to {publisherEndpoint}");
var subscriptionMessage = ControlMessageFactory.Create(MessageIntentEnum.Subscribe);
subscriptionMessage.Headers[Headers.SubscriptionMessageType] = eventType.AssemblyQualifiedName;
subscriptionMessage.Headers[Headers.ReplyToAddress] = subscriberAddress;
subscriptionMessage.Headers[Headers.SubscriberTransportAddress] = subscriberAddress;
subscriptionMessage.Headers[Headers.SubscriberEndpoint] = subscriberEndpoint;
subscriptionMessage.Headers["NServiceBus.Bridge.DestinationEndpoint"] = publisherEndpoint;
subscriptionMessage.Headers[Headers.TimeSent] = DateTimeExtensions.ToWireFormattedString(DateTime.UtcNow);
subscriptionMessage.Headers[Headers.NServiceBusVersion] = "6.3.1"; //The code has been copied from 6.3.1
var transportOperation = new TransportOperation(subscriptionMessage, new UnicastAddressTag(routerAddress));
var transportTransaction = context.Extensions.GetOrCreate<TransportTransaction>();
await dispatcher.Dispatch(new TransportOperations(transportOperation), transportTransaction, context.Extensions).ConfigureAwait(false);
}
if (invokeTerminator) // This flag is false for SQL transport and so the terminator is not invoked
{
await next().ConfigureAwait(false);
}
}
As this flag is false for the SQL-transport, the NServiceBus code is not called and the subscriptions for the SQL island itself are not added.
Should we invoke this terminator or is there a specific reason why we don't invoke it?
Thanks in advance for your help!
Hi Simon, any plan to support the latest version of NServiceBus?
Thank you!
When setting up a router between a multi-schema SQL transport & a multi-topic ASB transport all subscriptions are created in the routers own ASB topic, instead of the topic where the event is published, causing the SQL subscribers not to receive the event.
Please see Azure Service Bus Transport issue #511 for more context.
and fails if it is not.
Hi,
Is there a way to configure the router to transform messages on the way through? I can see there are Rules which look like they might be useful but I can't tell whether the messages are immutable from within a rule.
Thanks
Adam
Hi!
The router is working fine when the message is processed by a handler. IHandleMessages/context.Reply and correlationId is not changed.
But, when the message is processed by a Saga, ReplyToOriginator will set the correlationid to the originating messageid.
Line 110:
and line 11:
Any plans on supporting this scenario?
Adding TLV to the outgoing MessageId might solve this issue together with the CorrelationId.
SqlSubscriptionStorage currently takes a Func which is problematic when there is async work to be done (in my case, acquiring an Access Token). It would be better if it was a Func<Task>
The Sql Transport supports this (see UseCustomSqlConnectionFactory)
I'd be happy to submit a PR but naturally this would be a breaking change.
I have an NSB router responsible for Routing MSMQ messages to our Azure ServiceBus instance.
I have observed when I stop MSMQ and subsequently start it again the NServiceBus Router is unable to re-connect to the MSMQ instance.
System.Messaging.MessageQueueException (0x80004005): An invalid handle was passed to the function.
at System.Messaging.MessageEnumerator.MoveNext(TimeSpan timeout)
at NServiceBus.Transport.Msmq.MessagePump.<InnerProcessMessages>d__6.MoveNext()
Previous versions of the Router appeared to shut down the router. Whilst the latest version 3.9.2 appears to just continually poll MSMQ.
Is there a way this could be reported as a critical error and then the service could be restarted?
Hi,
I am trying to connect a MSMQ subscriber to an Azure Service Bus Publisher.
I created a NServicebus.Router to connect the two, but I had forgotten to add a Rule Name shortener to it. Once I had added that to the Router it worked fine. I did spend an awful lot of time trying to find the problem. The ASB Publisher throws an exception on startup so it is obvious what the problem is, but that exception is getting absorbed by the Router.
I have forked the repo and added a test to show the problem.
Would it be possible to get the Router to throw the error at startup?
I'm trying to migrate some subscribers to RabbitMQ, while publishers are still on Azure Service Bus.
This does not seem to work if the publisher is a SendOnly endpoint.
Following exception is thrown:
2020-03-20 15:25:31.109 ERROR ThrottlingRawEndpointConfig1[[NServiceBus.RabbitMQTransport, NServiceBus.Transport.RabbitMQ, Version=5.0.0.0, Culture=neutral, PublicKeyToken=9fc386479f8a226c]] Error processing a message. Continuing in throttled mode. Microsoft.Azure.ServiceBus.MessagingEntityNotFoundException: Put token failed. status-code: 404, status-description: The messaging entity 'sb://testnamespaceiaoao.servicebus.windows.net/legacy-publisher' could not be found. To know more visit https://aka.ms/sbResourceMgrExceptions. TrackingId:732270da-9516-4813-bc61-3f1cbaa626c1_G5, SystemTracker:seabookdev.servicebus.windows.net:legacy-publisher, Timestamp:2020-03-20T14:25:32. at Microsoft.Azure.ServiceBus.Core.MessageSender.OnSendAsync(IList
1 messageList)
at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func1 operation, TimeSpan operationTimeout) at Microsoft.Azure.ServiceBus.RetryPolicy.RunOperation(Func
1 operation, TimeSpan operationTimeout)
at Microsoft.Azure.ServiceBus.Core.MessageSender.SendAsync(IList1 messageList) at PostroutingTerminator.Terminate(PostroutingContext context) at NServiceBus.Router.ChainTerminator
1.Invoke(T context, Func2 next) at NServiceBus.Router.TerminatorInvocationRule
1.Invoke(T context, Func2 next) at NServiceBus.Router.Migrator.ShadowForwardSubscribeRule.Terminate(ForwardSubscribeContext context) at NServiceBus.Router.ChainTerminator
1.Invoke(T context, Func2 next) at NServiceBus.Router.TerminatorInvocationRule
1.Invoke(T context, Func2 next) at SubscribePreroutingTerminator.Terminate(SubscribePreroutingContext context) at NServiceBus.Router.ChainTerminator
1.Invoke(T context, Func2 next) at NServiceBus.Router.TerminatorInvocationRule
1.Invoke(T context, Func2 next) at PreroutingToSubscribePreroutingFork.Invoke(PreroutingContext context, Func
2 next)
at ThrottlingRawEndpointConfig`1.<>c__DisplayClass2_0.<b__0>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at NServiceBus.Transport.RabbitMQ.MessagePump.Process(BasicDeliverEventArgs message)
Hi,
i'm trying to learn how the router works, so i took this sample https://docs.particular.net/samples/router/mixed-transports/ and tring to switch rabbitmq with SqlServer :
var endpointConfiguration = new EndpointConfiguration("Samples.Router.MixedTransports.Server");
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.ConnectionString("host=localhost");
transport.UseConventionalRoutingTopology();
endpointConfiguration.UsePersistence<InMemoryPersistence>();
var recoverability = endpointConfiguration.Recoverability();
recoverability.Immediate(immediate => immediate.NumberOfRetries(0));
recoverability.Delayed(delayed => delayed.NumberOfRetries(0));
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.AuditProcessedMessagesTo("audit");
endpointConfiguration.EnableInstallers();
var endpointInstance = await Endpoint.Start(endpointConfiguration).ConfigureAwait(false);
Console.WriteLine("Press <enter> to exit.");
Console.ReadLine();
await endpointInstance.Stop().ConfigureAwait(false);`
The exception is
A chain has to have at least one terminator: ForwardPublishContext.'
What am i missing ?
Thanks in advance.
I’ve created an MSMQ based Publisher that can publish event messages to an Azure Service Bus based Subscriber. I order to accomplish this, I’ve implemented NServiceBus.Router into the publisher (running in same process). This works fine. Subscriber can send subscription request and publisher stores it in subscription storage as originating from router. Publisher can publish event messages to subscriber.
However, for subscriptions where subscription/rule name exceeds 50 characters, at which point the name shorteners kicks in, then the subscription request is not picked up by the publisher, and it is not stored in subscription storage, which means publisher cannot publish event messages for that type.
Any way to fix this?
I use MSMQ to Azure Service Bus route. During MSMQ interface setup I need to setup subscription persistence:
var subscriptionStorage = new InMemorySubscriptionStorage(); msmqInterface.EnableMessageDrivenPublishSubscribe(subscriptionStorage);
I see there is sql subscription persistence implementation out of the box.
Is there anything similar for RavenDb?
Is it possible to use RavenDb subscription persistence from NServiceBus.RavenDb nuget package without custom 'ISubscriptionStorage' implementation?
I'm trying to set up a router for sending messages from AzureServiceBus to SQS but am getting the following exception in the SQS interface when starting the router:
System.Collections.Generic.KeyNotFoundException: The given key (NServiceBus.Unicast.Messages.MessageMetadataRegistry) was not present in the dictionary.
at NServiceBus.Settings.SettingsHolder.Get(String key) in /_/src/NServiceBus.Core/Settings/SettingsHolder.cs:line 91
at NServiceBus.Settings.SettingsHolder.Get[T]() in /_/src/NServiceBus.Core/Settings/SettingsHolder.cs:line 70
at NServiceBus.Transport.SQS.Configure.SqsTransportInfrastructure..ctor(ReadOnlySettings settings) in /_/src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs:line 24
at NServiceBus.SqsTransport.Initialize(SettingsHolder settings, String connectionString) in /_/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs:line 42
at NServiceBus.Raw.InitializableRawEndpoint.Initialize()
at ThrottlingRawEndpointConfig`1.Create()
at Interface`1.Initialize(InterfaceChains interfaces, RootContext rootContext)
at RouterImpl.Initialize()
at RouterImpl.Start()
Here is my router configuration code:
...
var routerConfig = new RouterConfiguration($"ASBToSQS.Router");
var azureInterface = routerConfig.AddInterface<AzureServiceBusTransport>("Azure", t =>
{
t.ConnectionString(context.Configuration["NServiceBus:Router:AzureConnectionString"]);
t.Transactions(TransportTransactionMode.ReceiveOnly);
t.SubscriptionRuleNamingConvention((entityType) =>
{
var entityPathOrName = entityType.Name;
if (entityPathOrName.Length >= 50)
{
return entityPathOrName.Split('.').Last();
}
return entityPathOrName;
});
});
var sqsInterface = routerConfig.AddInterface<SqsTransport>("SQS", t =>
{
// using default environment variable credentials
});
var staticRouting = routerConfig.UseStaticRoutingProtocol();
staticRouting.AddForwardRoute("Azure", "SQS");
routerConfig.AutoCreateQueues();
...
I've tried calling .EnableMessageDrivenPublishSubscribe(...)
using sql subscription storage because I found this issue and it seemed like that was the only notable difference in setups, but I'm still getting the same error.
This is running in a .NET Core 5.0 worker app using:
I'm sure I'm just missing something but am out of ideas. Any help would be greatly appreciated.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.