Code Monkey home page Code Monkey logo

Comments (6)

WarcraftYax avatar WarcraftYax commented on August 26, 2024

WarcraftYax@aadb7eb

That's a crude idea of how I imagine it could work (skipped the part of actually uploading / downloading the S3 file, though that's fairly trivial). Would also need to be configured via options (enable / disable / byte threshold to fallback to S3?). + tests which I haven't got running.

Not sure if I'll have the time to finish it off though.

from rebus.amazonsqs.

WarcraftYax avatar WarcraftYax commented on August 26, 2024

Compare branches

Getting a bit further. Should upload and read to/from S3 now, though actually yet to run it...

Still need to configure the dependency injection (should I add the AmazonS3Client and TransferUtility objects to the TransactionContext?).

Will also need to write some tests. Might have time tomorrow.

from rebus.amazonsqs.

WarcraftYax avatar WarcraftYax commented on August 26, 2024

Compare branches

I'm stuck now. Added tests and tied up some loose ends, all seems to work fine under medium load, but then after a while I just get this error thrown repeatedly:

2018-07-27 17:54:19.855 +01:00 [Warning] Unhandled exception 1 while handling message with ID "7046c64e-63f1-4566-86df-3845b52c0d9b"
System.ArgumentException: An item with the same key has already been added.
   at System.ThrowHelper.ThrowArgumentException(ExceptionResource resource)
   at System.Collections.Generic.Dictionary`2.Insert(TKey key, TValue value, Boolean add)
   at Rebus.AmazonSQS.AmazonSQSTransport.<>c__DisplayClass20_0.<SendOutgoingMessages>b__2(OutgoingMessage message)
   at System.Linq.Enumerable.WhereSelectEnumerableIterator`2.MoveNext()
   at System.Collections.Generic.List`1..ctor(IEnumerable`1 collection)
   at System.Linq.Enumerable.ToList[TSource](IEnumerable`1 source)
   at Rebus.AmazonSQS.AmazonSQSTransport.<>c__DisplayClass20_0.<<SendOutgoingMessages>b__1>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Rebus.AmazonSQS.AmazonSQSTransport.<SendOutgoingMessages>d__20.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Rebus.Transport.TransactionContext.<Invoke>d__24.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Rebus.Transport.TransactionContext.<Commit>d__17.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Rebus.Retry.Simple.SimpleRetryStrategyStep.<DispatchWithTrackerIdentifier>d__8.MoveNext()
2018-07-27 17:54:19.855 +01:00 [Error] An error occurred when attempting to complete the transaction context
System.OperationCanceledException: The operation was canceled.
   at System.Threading.CancellationToken.ThrowOperationCanceledException()
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at Rebus.AmazonSQS.AmazonSQSTransport.<>c__DisplayClass23_0.<Receive>b__1()
   at Rebus.Transport.TransactionContext.Invoke(ConcurrentQueue`1 actions)
   at Rebus.Transport.TransactionContext.RaiseAborted()
   at Rebus.Transport.TransactionContext.<Complete>d__19.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Rebus.Workers.ThreadPoolBased.ThreadPoolWorker.<ProcessMessage>d__18.MoveNext()

I thought it might be because I was calling context.GetOrAdd() in parallel and the Dictionary couldn't keep up, but I moved the offending lines elsewhere (see my last 2 commits to my fork) and I'm still getting the issue.

Everything looks promising and it seems to work fine for quite a while, but I'm pretty much stuck now! @mookid8000

from rebus.amazonsqs.

mookid8000 avatar mookid8000 commented on August 26, 2024

Hi @JoshYaxley , sorry for being so late to comment on this issue – I read your suggestion, and then I thought "that's fine", but now I think I realized what you're trying to build 😐

The thing is, Rebus already has a "data bus", but it works in a slightly different way that what you're trying to implement.

If I understand correctly, you are thinking about having a mechanism that kicks in on message sizes above a certain threshold (say 250 kB), and then transparently use S3 to transfer the payload.

Rebus' existing data bus is more explicit, in that it works as a simple implementation of the claim check pattern – the sender "checks in the luggage" like this:

var dataBus = bus.Advanced.DataBus;
var attachment = await dataBus.CreateAttachment(sourceStream);

and then attachment is a DataBusAttachment that simply wraps an attachment ID of type string, so you can pass the attachment or the string along in an event like this:

await bus.Publish(new SomethingBigHappened(attachment.Id));

In the receiving end, the attachment can be downloaded like this:

var attachmentId = receivedEvent.AttachmentId;
using(var source = await DataBusAttachment.OpenRead(attachmentId))
{
    // do something to the source stream in here
}

This has advantages and disadvantages compared to your solution, of course.

What I like about your solution is that it can work transparently as a "shim" of sorts, to even out the differences between transports. If e.g. I am moving a system that runs on MSMQ (where the MAX message size is 4 MB) to SQS, then your mechanism would kick in from time to time to save the day.

That I don't like, is that it's not a good solution as a generic data bus, because there's no way to avoid receiving a large amount of data in the receiving end (at least not one that I can see). This can end up consuming a large amount of RAM, and maybe it's not even necessary – e.g. one of my latest usages of Rebus' data bus involved events with four pretty large attachments, each in the 1-4 GB range of raw data, and only one subscriber needed all four – 2-3 other subscribers could do their work by downloading only one of the attachments.

This way, subscribers can selectively download the data they need, and they can do so in a streaming memory- and CPU-efficient way.

This of course comes at the cost of being more involved to work with, because the developer needs to decide up front that a chunk of the message contents must be represented as an attachment.

So.....

But you know what?! There's no reason why the functionality you're proposing could not be implemented, I just think it should not be implemented at the transport level, it should be higher up in the stack somehow – and then it should simply use Rebus' already-existing data bus to perform the heavy lifting.

As far as I can tell, the following things should be made to make this work:

  • �implementation of IDataBusStorage for S3 should be made silly me – there is one already
  • optional outgoing/incoming pipeline filters should be made, implementing the logic that kicks in when a message is deemed too large
  • Rebus' data bus should be extended with the ability to delete an attachment, so that attachment don't lie around after a big message has been properly handled

The only thing that worries me a little bit, is that Rebus' current IDataBusStorage interface probably needs to be extended with a Task Delete(string id) method, and this is a change that breaks all current implementations (Azure Blobs, Amazon S3, SQL Server, file share – those are the one I can remember right now).

from rebus.amazonsqs.

WarcraftYax avatar WarcraftYax commented on August 26, 2024

Thanks for the reply, and apologies myself for taking time to get back to you.

My initial thoughts were to use the data bus (which I have used before), but wanted to make it as transparent as possible. I added it to the SQS transport simply because AWS recommends you fallback to S3 when the messages are too large.

However, I agree with you that it would be more beneficial to move it up the stack into the pipeline filters. This way we can keep the transparency of the functionality, but also enable it for all transports and all data bus storages.

Modifying the IDataBusStorage interface is effectively a breaking change, so would a major version increment be suitable? As a transition, a possibility could be to add "empty" Task Delete(string id) methods to all the existing data bus extensions that just logs a warning that the message hasn't actually been deleted (would it be possible to generate a compiler warning using #warning?). Then the extensions can be updated one by one over time.

As this direction is completely detached from the Rebus.AmazonSQS project, do you plan on creating new issues in the suitable projects for the new tasks?

Unfortunately, I'm not sure if/when I'll be able to find time to work on them.

from rebus.amazonsqs.

mookid8000 avatar mookid8000 commented on August 26, 2024

Closing this one - it should be solved in a generic fashion, so that it works with all transport/data bus configurations, so I've moved the task to this issue.

from rebus.amazonsqs.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.