Code Monkey home page Code Monkey logo

rawrabbit's Introduction

Looking for documentation of 1.x? Click here

RawRabbit

Build Status Documentation Status NuGet GitHub release Slack Status

Quick introduction

RawRabbit is a modern .NET framework for communication over RabbitMQ. The modular design and middleware oriented architecture makes the client highly customizable while providing sensible default for topology, routing and more. Documentation for version 2.x of the is currently found under /docs.

Configure, enrich and extend

RawRabbit is configured with RawRabbitOptions, an options object that makes it possible to register client configuration, plugins as well as override internal services

var client = RawRabbitFactory.CreateSingleton(new RawRabbitOptions
{
  ClientConfiguration = new ConfigurationBuilder()
    .SetBasePath(Directory.GetCurrentDirectory())
    .AddJsonFile("rawrabbit.json")
    .Build()
    .Get<RawRabbitConfiguration>(),
  Plugins = p => p
    .UseProtobuf()
    .UsePolly(c => c
        .UsePolicy(queueBindPolicy, PolicyKeys.QueueBind)
        .UsePolicy(queueDeclarePolicy, PolicyKeys.QueueDeclare)
        .UsePolicy(exchangeDeclarePolicy, PolicyKeys.ExchangeDeclare)
    ),
  DependencyInjection = ioc => ioc
    .AddSingleton<IChannelFactory, CustomChannelFactory>()
});

Publish/Subscribe

Set up strongly typed publish/subscribe in just a few lines of code.

var client = RawRabbitFactory.CreateSingleton();
await client.SubscribeAsync<BasicMessage>(async msg =>
{
  Console.WriteLine($"Received: {msg.Prop}.");
});

await client.PublishAsync(new BasicMessage { Prop = "Hello, world!"});

Request/Response

RawRabbits request/response (RPC) implementation uses the direct reply-to feature for better performance and lower resource allocation.

var client = RawRabbitFactory.CreateSingleton();
client.RespondAsync<BasicRequest, BasicResponse>(async request =>
{
  return new BasicResponse();
});

var response = await client.RequestAsync<BasicRequest, BasicResponse>();

Ack, Nack, Reject and Retry

Unlike many other clients, basic.ack, basic.nack and basic.reject are first class citizen in the message handler

var client = RawRabbitFactory.CreateSingleton();
await client.SubscribeAsync<BasicMessage>(async msg =>
{
  if(UnableToProcessMessage(msg))
  {
    return new Nack(requeue: true);
  }
  ProcessMessage(msg)
  return new Ack();
});

In addition to the basic acknowledgements, RawRabbit also support delayed retries

var client = RawRabbitFactory.CreateSingleton();
await client.SubscribeAsync<BasicMessage>(async msg =>
{
  try
  {
    ProcessMessage(msg)
    return new Ack();
  }
  catch (Exception e)
  {
    return Retry.In(TimeSpan.FromSeconds(30));
  }
});

Granular control for each call

Add or change properties in the IPipeContext to tailor calls for specific type of messages. This makes it possible to modifly the topology features for calls, publish confirm timeout, consumer concurrency and much more

await subscriber.SubscribeAsync<BasicMessage>(received =>
{
  receivedTcs.TrySetResult(received);
  return Task.FromResult(true);
}, ctx => ctx
  .UseSubscribeConfiguration(cfg => cfg
    .Consume(c => c
      .WithRoutingKey("custom_key")
      .WithConsumerTag("custom_tag")
      .WithPrefetchCount(2)
      .WithNoLocal(false))
    .FromDeclaredQueue(q => q
      .WithName("custom_queue")
      .WithAutoDelete()
      .WithArgument(QueueArgument.DeadLetterExchange, "dlx"))
    .OnDeclaredExchange(e=> e
      .WithName("custom_exchange")
      .WithType(ExchangeType.Topic))
));

rawrabbit's People

Contributors

ankurkh1 avatar brettnagy avatar cemremengu avatar cortex93 avatar eugene-g avatar jayrulez avatar johnbaker avatar lordmike avatar nengberg avatar pardahlman avatar ritasker avatar sharparam avatar videege avatar wrathza avatar zidad avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rawrabbit's Issues

More unit testing

There are a lot of integration tests, but we should add some unit tests for some of the more important classes.

Add default attributes

If not provided, these defaults should be used:
Port: 5672
Username: guest
Password: guest
VirtualHost: /

For example it should be possible to just pass localhost as a connection string which should be treated as guest:guest@localhost:5672/

KeyNotFoundException during Request/Response

When doing RPC calls in quick succession the consumer is removed and a new one is not created and the KeyNotFoundException is thrown.

I've pinpointed the problem to RawRabbit.Operations.Requester.cs in the GetOrCreateConsumerForType method. There is a check to verify that _typeToConsumer contains the correct response type but the _disposeConsumerTimer is triggered before it is actually used.

This may be related to Issue #36

Support Microsoft.Framework.Logging in RawRabbit

RawRabbit needs logging - and the future of dot.net logging lays in Microsoft.Framework.Logging.

The ambition is to keep all pre-release dependencies (like those for vNext beta8) in RawRabbit.vNext, so that's where the wire-up for the logging goes. However, RawRabbit can get a temporary interface IRawLogger that is one-to-one with the ILogger interface so that in a vNext world, you just resolve the ILogger and injects it in the IRawLogger adapter and otherwise you just have to implement the IRawLogger with you favourite logging framework and register it in BusClientFactory.

Update QueueArgument with LazyQueue

It should be simple to use Lazy Queues. Therefore we should

  • Add QueueArgument for lazy queue
  • Add <summary> comments on arguments
  • Add section in documentation

Make connection string parser more robust

In the Configuration section of README.md, there's some instructions on how the connectionString should look like. However, the parser is really strict and if you for example don't add a semicolon (;) efter the last broker, that broker won't be matched. The port number should also be optional and default to :5672 and the virutal host default to /. Below is a list if what should be considered valid connectionStrings:

  • brokers=guest:guest@localhost (no port number or vhost)
  • brokers=guest:guest@localhsot:5672/ (port number and vhost)
  • brokers=user1:pass1@host/vhost,user2:pass2@host2:5672 (vhost on first, port on second)
  • requestTimeout=10;brokers=guest:guest@localhost (requestTimeout before brokers)

Append GlobalMessageId to routingkey

In some scenarios, it makes sense to track a Message Sequence through its GlobalMessageId. It should be possible to do so with #.[globalmessageid].

In order for this to work, we need to

  • Change default type of exchange to Topic to leverage wild card routing
  • Add a configuration option, RouteWithGlobalId, so that we don't break compatibility with previous versions

Add logger packages

We should give some options to the default ConsoleLogger, like adding an adapter to Serilog

Guard against KeyNotFound in RequestTimer dictionary

Currently, in Requester.cs, there's a line of code where we expect a dictionary to contain a certain key

_requestTimerDictionary[args.BasicProperties.CorrelationId]?.Dispose();

If the response timer has expired, or adding a new entry in the dictionary failed that key may not exists. This should be handled more gracious.

Automatic tests for connection lost scenarios

Up until this point, testing involving unstable connections has been done manually. It would be neat to add this to our integration test suit. A few suggestions for test:

  • Make sure that connection and topology is properly restored after connection shutdown
  • Other?

Improve handling multiple RPC Requests

There's one line of code that have been bugging me for a couple of days, where we do .Wait() on the message handler for Responder. By replacing this with a chained ContinueWith, we can achieve higher performance for Responder when under heavy work load.

Message Type serialization without assembly version

Generic types like GenericType<TFirst, TSecond> have a fully qualified name like

RawRabbit.IntegrationTests.TestMessages.GenericResponse`2[[RawRabbit.IntegrationTests.TestMessages.First, RawRabbit.IntegrationTests, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null],[RawRabbit.IntegrationTests.TestMessages.Second, RawRabbit.IntegrationTests, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null]]

The type name is sent in the basic properties of the message, and then used to create a Type instance that will be used when deserializing the message. The problem here is that Version, Culture and PublicKeyToken are present. If the version is wrong, we might not be able to create the type instance (even though the class itself is identical between versions). However, it is enough to have type name and assembly name to create the type instance. That is, this string (should) be enough:

RawRabbit.IntegrationTests.TestMessages.GenericResponse`2[[RawRabbit.IntegrationTests.TestMessages.First, RawRabbit.IntegrationTests],[RawRabbit.IntegrationTests.TestMessages.Second, RawRabbit.IntegrationTests]]

In BasicPropertiesProvider.cs, we should have a strategy to create type such that

  • It does not include version, culture and public key token for main class or generic arguments
  • It should do this recursively, GenericMessage<GenericArgument<NestedGeneric<NestedAgain>>> should contain no version, culture or public key token

Correct order of arguments passed to Assert.Equal

The tests that uses Assert.Equal often passes the arguments in the wrong order.

The call should be done like this:
Assert.Equal(expected, actual)
But in most places it is like this:
Assert.Equal(actual, expected)

This is not noticed when all tests are green but it complicates debugging when tests are failing.

Unexpected connection close when publishing multiple messages

When publishing a lot of messages (1'000'000+) there are some flip sides, like

RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=504, text="CHANNEL_ERROR - expected 'channel.open'", classId=60, methodId=40, cause=
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
   at RabbitMQ.Client.Framing.Impl.Model._Private_ChannelOpen(String outOfBand)
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateNonRecoveringModel()
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
   at RawRabbit.Common.ChannelFactory.<>c.<CreateChannelAsync>b__12_0(Task`1 c)

This should be fixed

Refactor Operations: Publisher

In the pursuit of high performance, we're looking at refactoring operations so that they efficiently can handle multiple operations on different threads. Publisher.cs will be the first operation that will hold

  • TopologyProvider for creating exchanges, queues and bindings
  • CreateChannelAsync will be used rather than the sync method
  • GetAcTask takes channel as argument rather than working on one channel at a time.

Subscribers are terminated

A bug was introduced in 1.8.9; in Subscriber.cs the ConsumerFactory is called without the IModel argument, which uses GetChannel (rather than CreateChannel). This is wrong

Corner case: PublishAcknowledge times out and recieves ack in parallell

My system had a slight hick-up and the timer for publish acknowledge expired on one thread ([12]) while another thread ([27]) received an ack for the same message. This caused the application to throw a KeyNotFoundException.

This happens very seldom, but should be fixed nevertheless. See logs below

2016-01-21 09:25:47,554 [27] [Information]  Recieved ack for 1 with multiple set to 'False'
2016-01-21 09:25:48,640 [12] [Warning]  Ack for 1 has timed out.
2016-01-21 09:25:49,258 [27] [Debug]  Disposed ack timer for 1
2016-01-21 09:25:50,881 [27] [Debug]  Message with delivery tag 1 has been acknowledged by broker.
2016-01-21 09:25:52,159 [12] [Fatal]  Unhandled exception: "System.Collections.Generic.KeyNotFoundException: The given key was not present in the dictionary.
   at System.Collections.Concurrent.ConcurrentDictionary`2.get_Item(TKey key)
   at RawRabbit.Common.PublishAcknowledger.<>c__DisplayClass8_0.<GetAckTask>b__0(Object state)
   at System.Threading.TimerQueueTimer.CallCallbackInContext(Object state)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.TimerQueueTimer.CallCallback()
   at System.Threading.TimerQueueTimer.Fire()
   at System.Threading.TimerQueue.FireNextTimers()
   at System.Threading.TimerQueue.AppDomainTimerCallback()"

Improve extraction of Application Name

There's a private method in NamingConvention called GetApplicationName. The algorithm today isn't to refined. Scenarios that should be handled:

  • "C:\\Projects\\Temp\\My.Console.App\\bin\\Debug\\RawException.vshost.exe" (console app)
  • "C:\\Projects\\Temp\\My.Application\\My.Application.Web\\bin" (hosted in IIS)
  • "C:\\Application\\PRODUCTION\\My.Windows.Service\\2.2.1.916\\" (windows service, deployed with version number)

We expect the Application Name to be MyConsoleApp, MyApplicationWeb and MyWindowsService.

This method could be public and unit tested.

IoC: Honour registered IConnectionFactory

In ChannelFactory.cs a ConnectionFactory is created in the ctor

public ChannelFactory(RawRabbitConfiguration config, IClientPropertyProvider propsProvider)
{
    _connectionFactory = new ConnectionFactory
    {
        VirtualHost =  config.VirtualHost,
        UserName = config.Username,
        Password = config.Password,
        Port = config.Port,
        AutomaticRecoveryEnabled = config.AutomaticRecovery,
        TopologyRecoveryEnabled = config.TopologyRecovery,
        NetworkRecoveryInterval = config.RecoveryInterval,
        ClientProperties = propsProvider.GetClientProperties(config)
    };
}

The ConnectionFactory class implements the interface IConnectionFactory, so theoretically any implementation of the IConnectionFactory would suffice. Therefore, we should make sure that the ChannelFactory takes a IConnectionFactory as a constructor argument.

Extension method for message sequences

The ambition with RawRabbit is to provide a neat api for publish/subscribe pattern as well as request/response (a.k.a. RPC). However, it makes sense to have support for chained messages, and the RawRabbit.Extensions framework is just the right place for it.

vNext & default ChannelFactory

The vNext IServiceCollection extension AddRawRabbit does not seem to setup RawRabbit correctly for use in a (ASP.NET 5) web-context.

  1. it registers ChannelFactory as a singleton.
  2. IBusClient is registered as Transient with concrete type BaseBusClient.

Since the client is registered as transient it will be constructed and disposed of once per request for each service dependency of that request which has a dependency on IBusClient.

BaseBusClient in turn diposes of it's ChannelFactory instance.

The ChannelFactory is however, in fact, never actually disposed and will be reused for subsequent activation of another IBusClient. Calling dipose on it does render the ChannelFactory useless for this purpose, as it disposes of it's _threadChannels.

I'm not sure what the intended behavior is here, but I'm guessing ChannelFactory should actually be used/registered as a singleton.

The use case for generating a new BusClient for each service requesting it makes sense, mostly in environments which are intended for message processing. (Maybe not so much in web-based environments where the BusClient might be used to push something on the queue.) However, as soon as the first BusClient is disposed of, all other existing instances will fall flat. (Issue 1?)

Actual issue:
BaseBusClient should not dispose of it's dependencies.
I think it would be safe to delegate this to the DI container? Ie. with the default configuration, they should never be disposed.

Switch to direct RPC

As raw rabbit is all about speed (?) I'd really like to see that we rely on RabbitMQ 3.4.0 or later and make use of direct RPC to avoid the overhead of creating a separate response queue per RPC-request.

Update documentation

There are several sections that needs to be added or updated, including:

  • Building locally
  • Contributing
  • Examples

And possibly more. Since we're using a readthedocs wiki, we might want to reference it from the README

Add extension for re-defining topology featuers

In the current version, the default exchange type is Direct. However, in order to use wild card routing with global message id (added in #73) we need to use Topic exchanges. Since exchanges can not be updated, but rather deleted and re-declared, it would make sense to have an extension that given an exchange

  • Gathers information about bindings
  • Delete the exchange and re-declare it according to configuration
  • Re-bind queues to exchange

The API could look something like

client.UpdateTopologyAsync(t => t
    .ForExchange("my_exchange")
    .UseConfiguration(e => e
        .WithType(ExchangeType.Topic)
        .WithDurability(false))
);

Backward compatible Message Serialization

The BasicProperty header was changed in commit 6fe8897 in order to support long typenames (i.e. generic types). However, it removed the Type header from the basic props, breaking backwards compatibility with earlier versions. This should be fixed.

Add method to gracefully shut down client

The IBusClient<TContext> interface should have a method Shutdown() that

  • Closes all consumers so that no new messages are consumed
  • Gracefully wait until all prefetch messages have been processed then close connection

Update MessageSequence Extension

One weakness in how the MessageSequence extension is working is that it listens to messages with the same routing key as any other subscriber for that message. This means that if the message it is listening for is published a lot, it will receive it multiple times while still only being interested in a couple of those. When #73 is done, we should make sure that the extension binds for a specific message id ([routingkey].[globalmessageid). This way, only messages for the specific message sequence is routed to the queue. Once a message sequence is finished, it should remove the bindings for that sequence. As a consequence, to this we will have a slight performance hit in creating a sequence as it will have to make more roundtrips to the broker

Support more parameters to connectionString

Today, the only (optional) parameter supported in RawRabbit is requestTimeout. The connectionString should should support all the parameters available at the RawRabbitConfiguration configuration object.

A few examples of connectionStrings that should be supported

  • guest:guest:localhost:5672/?autoCloseConnection=true
  • guest:guest:localhost:5672/?autoCloseConnection=true&requestTimeout=10
  • guest:guest:localhost:5672/?requestTimeout=10&autoCloseConnection=true
  1. The order of the parameters should not matter
  2. Default values should be used for parameters omitted in the connectionString

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.