Code Monkey home page Code Monkey logo

zio-sqs's Issues

Upgrade to ZIO 2.0

ZIO 2.0 is at Milestone 4, with an RC expected in the next few weeks.
https://github.com/zio/zio/releases/tag/v2.0.0-M4

The API is nearly stable at this point, so any early migration work against this version should pay off towards the official 2.0 release.

The progress is being tracked here:
zio/zio#5470

The Stream Encoding work in progress is the only area where the API might still change before the RC.

We are actively working on a ScalaFix rule that will cover the bulk of the simple API changes:
https://github.com/zio/zio/blob/series/2.x/scalafix/rules/src/main/scala/fix/Zio2Upgrade.scala
We highly recommend starting with that, and then working through any remaining compilation errors :)

To assist with the rest of the migration, we have created this guide:
https://zio.dev/howto/migrate/zio-2.x-migration-guide/

If you would like assistance with the migration from myself or other ZIO contributors, please let us know!

Issue with message attributes

Hi,
I have consume method like:

 private def consume(queueUrl: String) =
        SqsStream(
          queueUrl,
          SqsStreamSettings(
            stopWhenQueueEmpty = false,
            waitTimeSeconds = Some(3),
            visibilityTimeout = Some(sqsConfig.visibilityTimeout),
            autoDelete = false,
            messageAttributeNames = List("TEST", AWSTraceHeader.unwrap.toString)
          )
        ).mapError(AwsError.fromThrowable)

and I send message to the queue:

aws sqs send-message --endpoint-url "https://sqs.eu-west-2.amazonaws.com/666"
--queue-url "https://sqs.eu-west-2.amazonaws.com/666/queue-name"
--message-body '{
"event_type": "create",
"user_id": "666"
}' --message-attributes "AWSTraceHeader={StringValue=TestTenant,DataType=String}"

and message.attributes gives me an empty map.. what I do wrong ? thanks!

No way to fall back to queue's settings for visibilityTimeout and waitTimeSeconds

These two settings are also present on SQS queues themselves, and if a ReceiveMessages call doesn't include them, the queue's values will be used. Currently in zio-sqs, it seems like it's not possible to fall back to the queue's settings -- you can either provide explicit values, or if you don't provide anything, it will use default settings of 30 for visibilityTimeout and 20 for waitTimeSeconds.

Perhaps by making these settings Options, a None value could mean that they won't be set on the ReceiveMessageRequest, and the queue's settings will be used?

For references, these are the only pages so far that I've been able to find in AWS docs that describes this behavior...
For visibilityTimeout: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html
For waitTimeSeconds: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html

Default value for visibilityTimeout in SqsStreamSettings should be set to None

It's misleading because if visibilityTimeout is not overriden to None we can't use the value configured globally on the queue. And since it has a default value, it's easy to forget that this field has been set.

I think it will avoid some future suprises if visibilityTimeout is set to None โ†’ https://github.com/zio/zio-sqs/blob/master/src/main/scala/zio/sqs/SqsStreamSettings.scala#L9

Especially visibilityTimeout has a value of 30 which is the AWS default value so it doesn't provides much value.

The other default values are fine to me because they affect only the consumer behaviour.

GenericAwsError when trying to connecting to Sqs

Hello,

When trying to connect to amazon Sqs queue with a call encapsulated in a ZLayer, I received this error GenericAwsError(software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: event executor terminated)

The same call done directly works.

I attached a small test.tar.gz sbt app with both examples.

Thanks

At least once semantics

I'm looking to migrate to zio-sqs, but right now as far as I can tell there are 2 main modes of operation:

  • autoDelete=true: This is basically at most once semantics. If your handler dies or there is a hard crash, the message you were in the middle of handling is lost forever.
  • autoDelete=false: Manual mode. You have to ensure to call deleteMessage and so on properly yourself.

I think it would be nice if there was something similar to Alpakka's SqsAckSink:
https://doc.akka.io/docs/alpakka/current/sqs.html#updating-message-statuses

Basically through the type system to ensure that you handle what to do with a message after your handler has run. This gives you at least once semantics (or if you ensure your handler is idempotent, then it's effectively exactly once semantics).

A sealed trait with the following cases would cover everything I believe:

MessageAction

  • Done / Delete
  • Skip / Ignore
  • RetryLater(visibilityTimeout) / ChangeMessageVisibility(visibilityTimeout)

Not sure on the specific naming to use, but that's the basic idea.

Is this something that makes sense for zio-sqs?

Release for the latest ZConfig

Hello.

I see there have been no releases since last year. The other ZIO libs have quite advanced, because of that it's difficult to use latest release of zio-sqs together with latest ZConfig eg. Is there a plan to release zio-sqs with the most recent updates? Everything seems to be up to date in master thanks to Steward.

Thanks!

Migrate tests to ZIO Test

Replace scalatest by ZIO Test. Might also be possible to replace randomdatagenerator using ZIO Test generators.

Meta data

Is it possible to add some meta data using this library ?

Missing symbol type for newer zio-aws versions

zio-sqs with newer version of zio-aws (e.g. 3.17.100.3)

Symbol 'type io.github.vigoo.zioaws.sqs.model.Message.ReadOnly' is missing from the classpath.
[error] This symbol is required by 'value zio.sqs.SqsStream.msg'.
[error] Make sure that type ReadOnly is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'SqsStream.class' was compiled against an incompatible version of io.github.vigoo.zioaws.sqs.model.Message.

Parallel consumers from 1 queue

Is there an option to run multiple consumers from 1 queue ?
( Or it can be achieved only by running multiple streams from 1 queue and set proper VisibilityTimeout to avoid duplicates. )

thanks!

Make the SqsStream infinite

The current signature of the SqsStream.apply is

  def apply(
    queueUrl: String,
    settings: SqsStreamSettings = SqsStreamSettings()
  ): ZStream[Sqs, Throwable, Message.ReadOnly]

The stream stops after the first exception that is received in the sqs call. The exceptions can be intermittent, for example,

software.amazon.awssdk.core.exception.SdkClientException: Received an UnknownHostException when attempting to interact with a service. See cause for the exact endpoint that is failing to resolve. If this is happening on an endpoint that previously worked, there may be a network connectivity issue or your DNS cache could be storing endpoints for too long.
....

The next call to sqs after such an exception would be successful.

In most situations (at least those that I've seen) the sqs is supposed to be consumed infinitely. The exceptions are logged and retried.

With the current SqsStream.apply there could be a couple of solutions to achieve the infinite stream:

  1. restart the program when the stream fails
SqsStream(???, ???)
      .runDrain
      .exitCode

In environments like ElasticBeanstalk it's not ideal because the ec2 restart is quite slow.
2) consume a new same stream when the current stream fails

          def infiniteStream(): ZStream[Sqs, Nothing, Message] = {
            SqsStream(???, ???).catchAll(_ => infiniteStream)
          }

In both situations, some additional code is required to continue consuming the messages.

My proposal is to have an additional method in the zio-sqs SqsStream, so that the library provides a short solution for an infinite sqs stream.

Its signature would be

object SqsStream {
...

  def infinite(
    queueUrl: String,
    settings: SqsStreamSettings = SqsStreamSettings()
  ): ZStream[Sqs, Nothing, Either[Throwable, Message.ReadOnly]] = ???
}

If this proposal sounds reasonable, I can provide a pr.

SqsProducer stops working after error from SQS

Hey. We are using zio-sqs version 0.3.2 to receive and produce sqs messages.
For the sending part we see a strange behaviour, in case SQS is not reachable or rejects a request:

The sending fails (which is correct) but the producer is not usable afterwards.
All messages that get produced afterwards are stuck forever (fiber does not return).

Since the code is not very complex I pasted it here.
Are we doing anything wrong, or what could cause this issue?

The producer that we use looks like this:

trait SqsProducer {
  def produce(message: ProducerEvent[String]): IO[Error, ProducerEvent[String]]
}
object SqsProducer {
  def createProducer(
      queueName: String,
      sqsConfig: SqsConfig,
      producerSettings: ProducerSettings
  ): ZManaged[Clock, Throwable, SqsProducer] = {
    def requestQueueUrl(sqs: SqsAsyncClient) =
      ZIO.fromCompletionStage(sqs.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()))

    for {
      sqs      <- Sqs.fromConfig(sqsConfig)
      queueUrl <- ZManaged.fromEffect(requestQueueUrl(sqs))
      producer <- Producer.make(sqs, queueUrl.queueUrl(), Serializer.serializeString, producerSettings)
    } yield new SqsProducer {
      override def produce(message: ProducerEvent[String]): IO[Error, ProducerEvent[String]] =
        producer.produce(message).mapError(Error.SqsAccessError)
    }
  }
}

val producer: SqsProducer = ... 
producer.produce(event) 
// issue with SQS --> fails with SqsAccessError
producer.produce(anotherEvent)
// fiber never returns

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.