Code Monkey home page Code Monkey logo

conduit_sqs's Introduction

ConduitSQS

CircleCI Coveralls Hex.pm Hex.pm Hex.pm Gratipay

A Conduit adapter for SQS.

Installation

The package can be installed by adding conduit_sqs to your list of dependencies in mix.exs:

def deps do
  [{:conduit_sqs, "~> 0.3.0"}]
end

Configuring the Adapter

config :my_app, MyApp.Broker,
  adapter: ConduitSQS,
  access_key_id: [{:system, "AWS_ACCESS_KEY_ID"}, :instance_role],
  secret_access_key: [{:system, "AWS_SECRET_ACCESS_KEY"}, :instance_role]

Options

  • :adapter - The adapter to use, should be ConduitSQS.
  • :access_key_id - The AWS access key ID. See what ExAWS supports.
  • :secret_access_key - The AWS secret access key. See what ExAWS supports.
  • :worker_pool_size - The default number of worker for each subscription. Defaults to 5 when not specified. Can also be overridden on each subscribe.
  • :max_attempts - How many times to retry publishing a message. Defaults to 10 when not specified. Can also be overriden on each publish.
  • :base_backoff_in_ms - The base factor used to exponentially backoff when a request fails. Defaults to 10 when not specified. Can also be overriden on queue, publish, and subscribe.
  • :max_backoff_in_ms - The maximum backoff when a request fails. Defaults to 10_000 when not specified. Can also be overriden on queue, publish, and subscribe.
  • :region - The AWS region to use by default. Defaults to "us-east-1" when not specified. Can also be overriden on queue, publish, and subscribe.

Configuring Queues

Inside the configure block of a broker, you can define queues that will be created at application startup with the options you specify.

defmodule MyApp.Broker do
  configure do
    queue "my-queue"
    queue "my-other-queue.fifo", fifo_queue: true
  end
end

Options

For creating a queue, ConduitSQS supports the same options that are specified in the SQS docs. See the SQS docs for creating a queue to understand what each of the options mean. These options include:

  • :policy
  • :visibility_timeout
  • :maximum_message_size
  • :message_retention_period
  • :approximate_number_of_messages
  • :approximate_number_of_messages_not_visible
  • :created_timestamp
  • :last_modified_timestamp
  • :queue_arn
  • :approximate_number_of_messages_delayed
  • :delay_seconds
  • :receive_message_wait_time_seconds
  • :redrive_policy
  • :fifo_queue
  • :content_based_deduplication

In addition to the SQS options, you can also pass the following:

  • :base_backoff_in_ms - The base factor used to exponentially backoff when a request fails. Defaults to 10 when not specified. Can also be set globally.
  • :max_backoff_in_ms - The maximum backoff when a request fails. Defaults to 10_000 when not specified. Can also be set globally.
  • :region - The AWS region to use. Defaults to "us-east-1" when not specified. Can also be set globally.

Configuring a Subscriber

Inside an incoming block for a broker, you can define subscriptions to queues. Conduit will route messages on those queues to your subscribers.

defmodule MyApp.Broker do
  incoming MyApp do
    subscribe :my_subscriber, MySubscriber, from: "my-queue"
    subscribe :my_other_subscriber, MyOtherSubscriber,
      from: "my-other-queue",
      max_number_of_messages: 10,
      worker_pool_size: 1
  end
end

Options

ConduitSQS requires that you specify a from option, which states which queue to consume from.

For subscribing to a queue, ConduitSQS supports the same options that are specified in the SQS docs. See the SQS docs for receiving a message to understand what each of the options mean. These options include:

  • :attribute_names
  • :message_attribute_names
  • :max_number_of_messages
  • :visibility_timeout
  • :wait_time_seconds

For attribute_names and message_attributes, ConduitSQS will attempt to pull all attributes unless overriden. ConduitSQS defaults max_number_of_messages to 10, which is the largest that SQS supports.

In addition to the SQS options, you can also pass the following:

  • :worker_pool_size - The number of workers that simultaneously consume from that queue. Defaults to 5.
  • :max_demand - Max number of messages in flow. Defaults to 1000.
  • :min_demand - Minumum threshold of messages to cause pulling of more. Defaults to 500.
  • :base_backoff_in_ms - The base factor used to exponentially backoff when a request fails. Defaults to 10 when not specified. Can also be set globally.
  • :max_backoff_in_ms - The maximum backoff when a request fails. Defaults to 10_000 when not specified. Can also be set globally.
  • :region - The AWS region to use. Defaults to "us-east-1" when not specified. Can also be set globally.

Configuring a Publisher

Inside an outgoing block for a broker, you can define publications to queues. Conduit will deliver messages using the options specified. You can override these options, by passing different options to your broker's publish/3.

defmodule MyApp.Broker do
  outgoing do
    publish :something, to: "my-queue"
    publish :something_else,
      to: "my-other-queue",
      delay_seconds: 11
  end
end

Options

For publishing to a queue, ConduitSQS supports the same options that are specified in the SQS docs. See the SQS docs for publishing a message to understand what each of the options mean. These options include:

  • :delay_seconds
  • :message_deduplication_id
  • :message_group_id

These options may also be specified by setting headers on the message being published. Headers on the message will trump any options specified in an outgoing block. For example:

message =
  %Conduit.Message{}
  |> put_header("message_group_id", "22")

ConduitSQS does not allow you to set the message_attributes option. Those will be filled from properties on the message and from the message headers.

In addition to the SQS options, you can also pass the following:

  • :to - The destination queue for the message. If the message already has it's destination set, this option will be ignored.
  • :max_attempts - How many times to retry publishing a message. Defaults to 10 when not specified. Can also be set globally.
  • :base_backoff_in_ms - The base factor used to exponentially backoff when a request fails. Defaults to 10 when not specified. Can also be set globally.
  • :max_backoff_in_ms - The maximum backoff when a request fails. Defaults to 10_000 when not specified. Can also be set globally.
  • :region - The AWS region to use. Defaults to "us-east-1" when not specified. Can also be set globally.

conduit_sqs's People

Contributors

blatyo avatar xward avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

conduit_sqs's Issues

Missing handle_info for :ssl_closed?

I'm experiencing a lot of the following entry in my log files. According to the log entries, there is a missing handle_info for :ssl_closed case.

Elixir.FunctionClauseError no function clause matches 
    lib/conduit_sqs/poller.ex:68 ConduitSQS.Poller.handle_info({:ssl_closed, {:sslsocket, {:gen_tcp, #Port<0.359>, :tls_connection, :undefined}, [#PID<0.2394.0>, #PID<0.2393.0>]}}, %ConduitSQS.Poller.State{adapter_opts: [adapter: ConduitSQS], broker: Cachalot.SQS.Broker, demand: 6, queue: "cachalot-staging-legacy-files", subscriber_opts: [from: "cachalot-staging-legacy-files", worker_pool_size: 4, max_demand: 1, min_demand: 0, base_backoff_in_ms: 100]})
    lib/gen_stage.ex:2188 GenStage.noreply_callback/3
    gen_server.erl:637 :gen_server.try_dispatch/4
    gen_server.erl:711 :gen_server.handle_msg/6
    proc_lib.erl:259 :proc_lib.wake_up/3

version 0.2.6 of conduit_sqs.

Compatibility with gen_stage v0.14+

In most of my recent project, I can't use conduit_sqs since it is using gen_stage v0.14+ and conduit forbid it using v0.13.1.

Any plan to upgrade ? Thx !

rescue from bad network issues

some time there are network issue like nxdomain
so it better to rescue from bad network issues
solution : backoff its polling until one of its requests works

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.