Code Monkey home page Code Monkey logo

deimos's Introduction


CircleCI Gem Version

A Ruby framework for marrying Kafka, a schema definition like Avro, and/or ActiveRecord and provide a useful toolbox of goodies for Ruby-based Kafka development. Built on Phobos and hence Ruby-Kafka.

Additional Documentation

Please see the following for further information not covered by this readme:

Installation

Add this line to your application's Gemfile:

gem 'deimos-ruby'

And then execute:

$ bundle

Or install it yourself as:

$ gem install deimos-ruby

Versioning

We use a version of semver for this gem. Any change in previous behavior (something works differently or something old no longer works) is denoted with a bump in the minor version (0.4 -> 0.5). Patch versions are for bugfixes or new functionality which does not affect existing code. You should be locking your Gemfile to the minor version:

gem 'deimos-ruby', '~> 1.1'

Configuration

For a full configuration reference, please see the configuration docs .

Schemas

Deimos was originally written only supporting Avro encoding via a schema registry. This has since been expanded to a plugin architecture allowing messages to be encoded and decoded via any schema specification you wish.

Currently we have the following possible schema backends:

  • Avro Local (use pure Avro)
  • Avro Schema Registry (use the Confluent Schema Registry)
  • Avro Validation (validate using an Avro schema but leave decoded - this is useful for unit testing and development)
  • Mock (no actual encoding/decoding).

Note that to use Avro-encoding, you must include the avro_turf gem in your Gemfile.

Other possible schemas could include Protobuf, JSONSchema, etc. Feel free to contribute!

To create a new schema backend, please see the existing examples here.

Producers

Producers will look like this:

class MyProducer < Deimos::Producer

  class << self
  
    # Optionally override the default partition key logic, which is to use
    # the payload key if it's provided, and nil if there is no payload key.
    def partition_key(payload)
      payload[:my_id]
    end
   
    # You can call publish / publish_list directly, or create new methods
    # wrapping them.
    
    def send_some_message(an_object)
      payload = {
        'some-key' => an_object.foo,
        'some-key2' => an_object.bar
      }
      # You can also publish an array with self.publish_list(payloads)
      # You may specify the topic here with self.publish(payload, topic: 'my-topic')
      # You may also specify the headers here with self.publish(payload, headers: { 'foo' => 'bar' })
      self.publish(payload)
    end
    
  end
  
  
end

Auto-added Fields

If your schema has a field called message_id, and the payload you give your producer doesn't have this set, Deimos will auto-generate a message ID. It is highly recommended to give all schemas a message_id so that you can track each sent message via logging.

You can also provide a field in your schema called timestamp which will be auto-filled with the current timestamp if not provided.

Coerced Values

Deimos will do some simple coercions if you pass values that don't exactly match the schema.

  • If the schema is :int or :long, any integer value, or a string representing an integer, will be parsed to Integer.
  • If the schema is :float or :double, any numeric value, or a string representing a number, will be parsed to Float.
  • If the schema is :string, if the value implements its own to_s method, this will be called on it. This includes hashes, symbols, numbers, dates, etc.

Instrumentation

Deimos will send ActiveSupport Notifications. You can listen to these notifications e.g. as follows:

  Deimos.subscribe('produce') do |event|
    # event is an ActiveSupport::Notifications::Event
    # you can access time, duration, and transaction_id
    # payload contains :producer, :topic, and :payloads
    data = event.payload
  end

The following events are produced (in addition to the ones already produced by Phobos and RubyKafka):

  • produce_error - sent when an error occurs when producing a message.
    • producer - the class that produced the message
    • topic
    • exception_object
    • payloads - the unencoded payloads
  • encode_messages - sent when messages are being schema-encoded.
    • producer - the class that produced the message
    • topic
    • payloads - the unencoded payloads
  • db_producer.produce - sent when the DB producer sends messages for the DB backend. Messages that are too large will be caught with this notification - they will be deleted from the table and this notification will be fired with an exception object.
    • topic
    • exception_object
    • messages - the batch of messages (in the form of Deimos::KafkaMessages) that failed - this should have only a single message in the batch.
  • batch_consumption.valid_records - sent when the consumer has successfully upserted records. Limited by max_db_batch_size.
    • consumer: class of the consumer that upserted these records
    • records: Records upserted into the DB (of type ActiveRecord::Base)
  • batch_consumption.invalid_records - sent when the consumer has rejected records returned from filtered_records. Limited by max_db_batch_size.
    • consumer: class of the consumer that rejected these records
    • records: Rejected records (of type Deimos::ActiveRecordConsume::BatchRecord)

Similarly:

  Deimos.subscribe('produce_error') do |event|	
    data = event.payloads 	
    Mail.send("Got an error #{event.exception_object.message} on topic #{data[:topic]} with payloads #{data[:payloads]}")	
  end	
      
  Deimos.subscribe('encode_messages') do |event|	
    # ... 
  end	

Kafka Message Keys

Topics representing events rather than domain data don't need keys. However, best practice for domain messages is to schema-encode message keys with a separate schema.

This enforced by requiring producers to define a key_config directive. If any message comes in with a key, the producer will error out if key_config is not defined.

There are three possible configurations to use:

  • key_config none: true - this indicates that you are not using keys at all for this topic. This must be set if your messages won't have keys - either all your messages in a topic need to have a key, or they all need to have no key. This is a good choice for events that aren't keyed - you can still set a partition key.
  • key_config plain: true - this indicates that you are not using an encoded key. Use this for legacy topics - new topics should not use this setting.
  • key_config schema: 'MyKeySchema-key' - this tells the producer to look for an existing key schema named MyKeySchema-key in the schema registry and to encode the key using it. Use this if you've already created a key schema or the key value does not exist in the existing payload (e.g. it is a compound or generated key).
  • key_config field: 'my_field' - this tells the producer to look for a field named my_field in the value schema. When a payload comes in, the producer will take that value from the payload and insert it in a dynamically generated key schema. This key schema does not need to live in your codebase. Instead, it will be a subset of the value schema with only the key field in it.

If your value schema looks like this:

{
  "namespace": "com.my-namespace",
  "name": "MySchema",
  "type": "record",
  "doc": "Test schema",
  "fields": [
    {
      "name": "test_id",
      "type": "string",
      "doc": "test string"
    },
    {
      "name": "some_int",
      "type": "int",
      "doc": "test int"
    }
  ]
}

...setting key_config field: 'test_id' will create a key schema that looks like this:

{
  "namespace": "com.my-namespace",
  "name": "MySchema_key",
  "type": "record",
  "doc": "Key for com.my-namespace.MySchema",
  "fields": [
    {
      "name": "test_id",
      "type": "string",
      "doc": "test string"
    }
  ]
}

If you publish a payload { "test_id" => "123", "some_int" => 123 }, this will be turned into a key that looks like { "test_id" => "123"} and schema-encoded before being sent to Kafka.

If you are using plain or schema as your config, you will need to have a special payload_key key to your payload hash. This will be extracted and used as the key (for plain, it will be used directly, while for schema it will be encoded first against the schema). So your payload would look like { "test_id" => "123", "some_int" => 123, payload_key: "some_other_key"}. Remember that if you're using schema, the payload_key must be a hash, not a plain value.

Consumers

Here is a sample consumer:

class MyConsumer < Deimos::Consumer

  # Optionally overload this to consider a particular exception
  # "fatal" only for this consumer. This is considered in addition
  # to the global `fatal_error` configuration block. 
  def fatal_error?(exception, payload, metadata)
    exception.is_a?(MyBadError)
  end

  def consume(payload, metadata)
    # Same method as Phobos consumers.
    # payload is an schema-decoded hash.
    # metadata is a hash that contains information like :key and :topic.
    # In general, your key should be included in the payload itself. However,
    # if you need to access it separately from the payload, you can use
    # metadata[:key]
  end
end

Fatal Errors

The recommended configuration is for consumers not to raise errors they encounter while consuming messages. Errors can be come from a variety of sources and it's possible that the message itself (or what downstream systems are doing with it) is causing it. If you do not continue on past this message, your consumer will essentially be stuck forever unless you take manual action to skip the offset.

Use config.consumers.reraise_errors = false to swallow errors. You can use instrumentation to handle errors you receive. You can also specify "fatal errors" either via global configuration (config.fatal_error) or via overriding a method on an individual consumer (def fatal_error).

Batch Consumption

Instead of consuming messages one at a time, consumers can receive a batch of messages as an array and then process them together. This can improve consumer throughput, depending on the use case. Batch consumers behave like other consumers in regards to key and payload decoding, etc.

To enable batch consumption, ensure that the delivery property of your consumer is set to inline_batch.

Batch consumers will invoke the consume_batch method instead of consume as in this example:

class MyBatchConsumer < Deimos::Consumer

  def consume_batch(payloads, metadata)
    # payloads is an array of schema-decoded hashes.
    # metadata is a hash that contains information like :keys, :topic, 
    # and :first_offset.
    # Keys are automatically decoded and available as an array with
    # the same cardinality as the payloads. If you need to iterate
    # over payloads and keys together, you can use something like this:
 
    payloads.zip(metadata[:keys]) do |_payload, _key|
      # Do something 
    end
  end
end

Saving data to Multiple Database tables

This feature is implemented and tested with MySQL database ONLY.

Sometimes, the Kafka message needs to be saved to multiple database tables. For example, if a User topic provides you metadata and profile image for users, we might want to save it to multiple tables: User and Image.

  • Return associations as keys in record_attributes to enable this feature.
  • The bulk_import_id_column config allows you to specify column_name on record_class which can be used to retrieve IDs after save. Defaults to bulk_import_id. This config is required if you have associations but optional if you do not.

You must override the record_attributes (and optionally column and key_columns) methods on your consumer class for this feature to work.

  • record_attributes - This method is required to map Kafka messages to ActiveRecord model objects.
  • columns(klass) - Should return an array of column names that should be used by ActiveRecord klass during SQL insert operation.
  • key_columns(messages, klass) - Should return an array of column name(s) that makes a row unique.
class User < ApplicationRecord
  has_many :images
end

class MyBatchConsumer < Deimos::ActiveRecordConsumer

  record_class User

  def record_attributes(payload, _key)
    {
      first_name: payload.first_name,
      images: [
                {
                  attr1: payload.image_url
                },
                {
                  attr2: payload.other_image_url
                }
              ]
    }
  end
  
  def key_columns(klass)
    case klass
    when User
      nil # use default
    when Image
      ["image_url", "image_name"]
    end
  end

  def columns(klass)
    case klass
    when User
      nil # use default
    when Image
      klass.columns.map(&:name) - [:created_at, :updated_at, :id]
    end
  end
end

Rails Integration

Producing

Deimos comes with an ActiveRecordProducer. This takes a single or list of ActiveRecord objects or hashes and maps it to the given schema.

An example would look like this:

class MyProducer < Deimos::ActiveRecordProducer

  # The record class should be set on every ActiveRecordProducer.
  # By default, if you give the producer a hash, it will re-fetch the
  # record itself for use in the payload generation. This can be useful
  # if you pass a list of hashes to the method e.g. as part of a 
  # mass import operation. You can turn off this behavior (e.g. if you're just
  # using the default functionality and don't need to override it) 
  # by setting `refetch` to false. This will avoid extra database fetches.
  record_class Widget, refetch: false

  # Optionally override this if you want the message to be 
  # sent even if fields that aren't in the schema are changed.
  def watched_attributes
    super + ['a_non_schema_attribute']
  end

  # If you want to just use the default functionality you can leave this
  # method out entirely. You only need to use it if you want to massage
  # the payload in some way, e.g. adding fields that don't exist on the
  # record itself.
  def generate_payload(attributes, record)
    super # generates payload based on the record and schema
  end
  
end

# or `send_event` with just one Widget
MyProducer.send_events([Widget.new(foo: 1), Widget.new(foo: 2)])
MyProducer.send_events([{foo: 1}, {foo: 2}])

Disabling Producers

You can disable producers globally or inside a block. Globally:

Deimos.config.producers.disabled = true

For the duration of a block:

Deimos.disable_producers do
  # code goes here
end

For specific producers only:

Deimos.disable_producers(Producer1, Producer2) do
  # code goes here
end

KafkaSource

There is a special mixin which can be added to any ActiveRecord class. This will create callbacks which will automatically send messages to Kafka whenever this class is saved. This even includes using the activerecord-import gem to import objects (including using on_duplicate_key_update). However, it will not work for update_all, delete or delete_all, and naturally will not fire if using pure SQL or Arel.

Note that these messages are sent during the transaction, i.e. using after_create, after_update and after_destroy. If there are questions of consistency between the database and Kafka, it is recommended to switch to using the DB backend (see next section) to avoid these issues.

When the object is destroyed, an empty payload with a payload key consisting of the record's primary key is sent to the producer. If your topic's key is from another field, you will need to override the deletion_payload method.

class Widget < ActiveRecord::Base
  include Deimos::KafkaSource

  # Class method that defines an ActiveRecordProducer(s) to take the object
  # and turn it into a payload.
  def self.kafka_producers
    [MyProducer]
  end
  
  def deletion_payload
    { payload_key: self.uuid }
  end

  # Optional - indicate that you want to send messages when these events
  # occur.
  def self.kafka_config
    {
      :update => true,
      :delete => true,
      :import => true,
      :create => true
    }
  end
  
end

Controller Mixin

Deimos comes with a mixin for ActionController which automatically encodes and decodes schema payloads. There are some advantages to encoding your data in e.g. Avro rather than straight JSON, particularly if your service is talking to another backend service rather than the front-end browser:

  • It enforces a contract between services. Solutions like OpenAPI do this as well, but in order for the client to know the contract, usually some kind of code generation has to happen. Using schemas ensures both sides know the contract without having to change code. In addition, OpenAPI is now a huge and confusing format, and using simpler schema formats can be beneficial.
  • Using Avro or Protobuf ensures both forwards and backwards compatibility, which reduces the need for versioning since both sides can simply ignore fields they aren't aware of.
  • Encoding and decoding using Avro or Protobuf is generally faster than straight JSON, and results in smaller payloads and therefore less network traffic.

To use the mixin, add the following to your WhateverController:

class WhateverController < ApplicationController
  include Deimos::Utils::SchemaControllerMixin

  request_namespace 'my.namespace.requests'
  response_namespace 'my.namespace.responses'
  
  # Add a "schemas" line for all routes that should encode/decode schemas.
  # Default is to match the schema name to the route name.
  schemas :index
  # will look for: my.namespace.requests.Index.avsc
  #                my.namespace.responses.Index.avsc 
  
  # Can use mapping to change the schema but keep the namespaces,
  # i.e. use the same schema name across the two namespaces
  schemas create: 'CreateTopic'
  # will look for: my.namespace.requests.CreateTopic.avsc
  #                my.namespace.responses.CreateTopic.avsc 

  # If all routes use the default, you can add them all at once
  schemas :index, :show, :update

  # Different schemas can be specified as well
  schemas :index, :show, request: 'IndexRequest', response: 'IndexResponse'

  # To access the encoded data, use the `payload` helper method, and to render it back,
  # use the `render_schema` method.
  
  def index
    response = { 'response_id' => payload['request_id'] + 'hi mom' }
    render_schema(response)
  end
end

To make use of this feature, your requests and responses need to have the correct content type. For Avro content, this is the avro/binary content type.

Database Backend

Deimos provides a way to allow Kafka messages to be created inside a database transaction, and send them asynchronously. This ensures that your database transactions and Kafka messages related to those transactions are always in sync. Essentially, it separates the message logic so that a message is first validated, encoded, and saved in the database, and then sent on a separate thread. This means if you have to roll back your transaction, it also rolls back your Kafka messages.

This is also known as the Transactional Outbox pattern.

To enable this, first generate the migration to create the relevant tables:

rails g deimos:db_backend

You can now set the following configuration:

config.producers.backend = :db

This will save all your Kafka messages to the kafka_messages table instead of immediately sending to Kafka. Now, you just need to call

Deimos.start_db_backend!

You can do this inside a thread or fork block. If using Rails, you can use a Rake task to do this:

rails deimos:db_producer

This creates one or more threads dedicated to scanning and publishing these messages by using the kafka_topics table in a manner similar to Delayed Job. You can pass in a number of threads to the method:

Deimos.start_db_backend!(thread_count: 2) # OR
THREAD_COUNT=5 rails deimos:db_producer

If you want to force a message to send immediately, just call the publish_list method with force_send: true. You can also pass force_send into any of the other methods that publish events, like send_event in ActiveRecordProducer.

A couple of gotchas when using this feature:

  • This may result in high throughput depending on your scale. If you're using Rails < 5.1, you should add a migration to change the id column to BIGINT. Rails >= 5.1 sets it to BIGINT by default.
  • This table is high throughput but should generally be empty. Make sure you optimize/vacuum this table regularly to reclaim the disk space.
  • Currently, threads allow you to scale the number of topics but not a single large topic with lots of messages. There is an issue opened that would help with this case.

For more information on how the database backend works and why it was implemented, please see Database Backends.

Consuming

Deimos provides an ActiveRecordConsumer which will take a payload and automatically save it to a provided model. It will take the intersection of the payload fields and the model attributes, and either create a new record or update an existing record. It will use the message key to find the record in the database.

To delete a record, simply produce a message with the record's ID as the message key and a null payload.

Note that to retrieve the key, you must specify the correct key encoding configuration.

A sample consumer would look as follows:

class MyConsumer < Deimos::ActiveRecordConsumer
  record_class Widget

  # Optional override of the way to fetch records based on payload and
  # key. Default is to use the key to search the primary key of the table.
  # Only used in non-batch mode.
  def fetch_record(klass, payload, key)
    super
  end

  # Optional override on how to set primary key for new records. 
  # Default is to set the class's primary key to the message's decoded key. 
  # Only used in non-batch mode.
  def assign_key(record, payload, key)
    super
  end

  # Optional override of the default behavior, which is to call `destroy`
  # on the record - e.g. you can replace this with "archiving" the record
  # in some way. 
  # Only used in non-batch mode.
  def destroy_record(record)
    super
  end
 
  # Optional override to change the attributes of the record before they
  # are saved.
  def record_attributes(payload, key)
    super.merge(:some_field => 'some_value')
  end

  # Optional override to change the attributes used for identifying records
  def record_key(payload)
    super
  end

  # Optional override, returns true by default.
  # When this method returns true, a record corresponding to the message
  # is created/updated.
  # When this method returns false, message processing is skipped and a
  # corresponding record will NOT be created/updated.
  def process_message?(payload)
    super
  end
end

Generating Tables and Models

Deimos provides a generator that takes an existing schema and generates a database table based on its fields. By default, any complex sub-types (such as records or arrays) are turned into JSON (if supported) or string columns.

Before running this migration, you must first copy the schema into your repo in the correct path (in the example above, you would need to have a file {SCHEMA_ROOT}/com/my-namespace/MySchema.avsc).

To generate a model and migration, run the following:

rails g deimos:active_record TABLE_NAME FULL_SCHEMA_NAME

Example:

rails g deimos:active_record my_table com.my-namespace.MySchema

...would generate:

db/migrate/1234_create_my_table.rb
app/models/my_table.rb

Batch Consumers

Deimos also provides a batch consumption mode for ActiveRecordConsumer which processes groups of messages at once using the ActiveRecord backend.

Batch ActiveRecord consumers make use of the activerecord-import to insert or update multiple records in bulk SQL statements. This reduces processing time at the cost of skipping ActiveRecord callbacks for individual records. Deleted records (tombstones) are grouped into delete_all calls and thus also skip destroy callbacks.

Batch consumption is used when the delivery setting for your consumer is set to inline_batch.

Note: Currently, batch consumption only supports only primary keys as identifiers out of the box. See the specs for an example of how to use compound keys.

By default, batches will be compacted before processing, i.e. only the last message for each unique key in a batch will actually be processed. To change this behaviour, call compacted false inside of your consumer definition.

A sample batch consumer would look as follows:

class MyConsumer < Deimos::ActiveRecordConsumer
  schema 'MySchema'
  key_config field: 'my_field'
  record_class Widget

  # Controls whether the batch is compacted before consuming.
  # If true, only the last message for each unique key in a batch will be
  # processed.
  # If false, messages will be grouped into "slices" of independent keys
  # and each slice will be imported separately.
  #
  # compacted false


  # Optional override of the default behavior, which is to call `delete_all`
  # on the associated records - e.g. you can replace this with setting a deleted
  # flag on the record. 
  def remove_records(records)
    super
  end
 
  # Optional override to change the attributes of the record before they
  # are saved.
  def record_attributes(payload, key)
    super.merge(:some_field => 'some_value')
  end
end

Database Poller

Another method of fetching updates from the database to Kafka is by polling the database (a process popularized by Kafka Connect). Deimos provides a database poller, which allows you the same pattern but with all the flexibility of real Ruby code, and the added advantage of having a single consistent framework to talk to Kafka.

One of the disadvantages of polling the database is that it can't detect deletions. You can get over this by configuring a mixin to send messages only on deletion, and use the poller to handle all other updates. You can reuse the same producer for both cases to handle joins, changes/mappings, business logic, etc.

To enable the poller, generate the migration:

rails g deimos:db_poller

Run the migration:

rails db:migrate

Add the following configuration:

Deimos.configure do
  db_poller do
    producer_class 'MyProducer' # an ActiveRecordProducer
  end
  db_poller do
    producer_class 'MyOtherProducer'
    run_every 2.minutes
    delay 5.seconds # to allow for transactions to finish
    full_table true # if set, dump the entire table every run; use for small tables
  end
end

All the information around connecting and querying the database lives in the producer itself, so you don't need to write any additional code. You can define one additional method on the producer:

class MyProducer < Deimos::ActiveRecordProducer
  ...
  def poll_query(time_from:, time_to:, column_name:, min_id:)
    # Default is to use the timestamp `column_name` to find all records
    # between time_from and time_to, or records where `updated_at` is equal to
    # `time_from` but its ID is greater than `min_id`. This is called
    # successively as the DB is polled to ensure even if a batch ends in the
    # middle of a timestamp, we won't miss any records.
    # You can override or change this behavior if necessary.
  end
end

To run the DB poller:

rake deimos:db_poller

Note that the DB poller creates one thread per configured poller, and is currently designed not to be scaled out - i.e. it assumes you will only have one process running at a time. If a particular poll takes longer than the poll interval (i.e. interval is set at 1 minute but it takes 75 seconds) the next poll will begin immediately following the first one completing.

To Post-Process records that are sent to Kafka:

You need to define one additional method in your producer class to post-process the messages sent to Kafka.

class MyProducer < Deimos::ActiveRecordProducer
  ...
  def post_process(batch)
    # If you need to do some extra actions with
    # the collection of elements you just sent to Kafka
    # write some code here
  end
end

Note that the poller will retry infinitely if it encounters a Kafka-related error such as a communication failure. For all other errors, it will retry once by default.

State-based pollers

By default, pollers use timestamps and IDs to determine the records to publish. However, you can set a different mode whereby it will include all records that match your query, and when done, will update a state and/or timestamp column which should remove it from that query. With this algorithm, you can ignore the updated_at and id columns.

To configure a state-based poller:

db_poller do
  mode :state_based
  state_column :publish_state # the name of the column to update state to
  publish_timestamp_column :published_at # the column to update when publishing succeeds
  published_state 'published' # the value to put into the state_column when publishing succeeds
  failed_state 'publish_failed' the value to put into the state_column when publishing fails
end

Running consumers

Deimos includes a rake task. Once it's in your gemfile, just run

rake deimos:start

This will automatically set an environment variable called DEIMOS_RAKE_TASK, which can be useful if you want to figure out if you're inside the task as opposed to running your Rails server or console. E.g. you could start your DB backend only when your rake task is running.

Generated Schema Classes

Deimos offers a way to generate classes from Avro schemas. These classes are documented with YARD to aid in IDE auto-complete, and will help to move errors closer to the code.

Add the following configurations for schema class generation:

config.schema.generated_class_path 'path/to/generated/classes' # Defaults to 'app/lib/schema_classes'

Run the following command to generate schema classes in your application. It will generate classes for every configured consumer or producer by Deimos.configure:

bundle exec rake deimos:generate_schema_classes

Add the following configurations to start using generated schema classes in your application's Consumers and Producers:

config.schema.use_schema_classes true

Additionally, you can enable or disable the usage of schema classes for a particular consumer or producer with the use_schema_classes config. See Configuration.

Note that if you have a schema in your repo but have not configured a producer or consumer, the generator will generate a schema class without a key schema.

One additional configuration option indicates whether nested records should be generated as top-level classes or would remain nested inside the generated class for its parent schema. The default is to nest them, as a flattened structure can have one sub-schema clobber another sub-schema defined in a different top-level schema.

config.schema.nest_child_schemas = false # Flatten all classes into one directory

You can generate a tombstone message (with only a key and no value) by calling the YourSchemaClass.tombstone(key) method. If you're using a :field key config, you can pass in just the key scalar value. If using a key schema, you can pass it in as a hash or as another schema class.

Consumer

The consumer interface uses the decode_message method to turn JSON hash into the Schemas generated Class and provides it to the consume/consume_batch methods for their use.

Examples of consumers would look like this:

class MyConsumer < Deimos::Consumer
  def consume(payload, metadata)
    # Same method as Phobos consumers but payload is now an instance of Deimos::SchemaClass::Record
    # rather than a hash. metadata is still a hash that contains information like :key and :topic.
    # You can interact with the schema class instance in the following way: 
    do_something(payload.test_id, payload.some_int)
    # The original behaviour was as follows:
    do_something(payload[:test_id], payload[:some_int])
  end
end
class MyActiveRecordConsumer < Deimos::ActiveRecordConsumer
  record_class Widget
  # Any method that expects a message payload as a hash will instead
  # receive an instance of Deimos::SchemaClass::Record.
  def record_attributes(payload, key)
    # You can interact with the schema class instance in the following way:
    super.merge(:some_field => "some_value-#{payload.test_id}")
    # The original behaviour was as follows:
    super.merge(:some_field => "some_value-#{payload[:test_id]}")
  end
end

Producer

Similarly to the consumer interface, the producer interface for using Schema Classes in your app relies on the publish/publish_list methods to convert a provided instance of a Schema Class into a hash that can be used freely by the Kafka client.

Examples of producers would look like this:

class MyProducer < Deimos::Producer
  class << self 
    # @param test_id [String]
    # @param some_int [Integer]
    def self.send_a_message(test_id, some_int)
      # Instead of sending in a Hash object to the publish or publish_list method,      
      # you can initialize an instance of your schema class and send that in.
      message = Schemas::MySchema.new(
        test_id: test_id,
        some_int: some_int
      )
      self.publish(message)
      self.publish_list([message])
    end
  end
end
class MyActiveRecordProducer < Deimos::ActiveRecordProducer
  record_class Widget
  # @param payload [Deimos::SchemaClass::Record]
  # @param _record [Widget]
  def self.generate_payload(attributes, _record)
    # This method converts your ActiveRecord into a Deimos::SchemaClass::Record. You will be able to use super
    # as an instance of Schemas::MySchema and set values that are not on your ActiveRecord schema.
    res = super
    res.some_value = "some_value-#{res.test_id}"
    res
  end
end

Metrics

Deimos includes some metrics reporting out the box. It ships with DataDog support, but you can add custom metric providers as well.

The following metrics are reported:

  • consumer_lag - for each partition, the number of messages it's behind the tail of the partition (a gauge). This is only sent if config.consumers.report_lag is set to true.
  • handler - a count of the number of messages received. Tagged with the following:
    • topic:{topic_name}
    • status:received
    • status:success
    • status:error
    • time:consume (histogram)
      • Amount of time spent executing handler for each message
    • Batch Consumers - report counts by number of batches
      • status:batch_received
      • status:batch_success
      • status:batch_error
      • time:consume_batch (histogram)
        • Amount of time spent executing handler for entire batch
    • time:time_delayed (histogram)
      • Indicates the amount of time between the timestamp property of each payload (if present) and the time that the consumer started processing the message.
  • publish - a count of the number of messages received. Tagged with topic:{topic_name}
  • publish_error - a count of the number of messages which failed to publish. Tagged with topic:{topic_name}
  • pending_db_messages_max_wait - the number of seconds which the oldest KafkaMessage in the database has been waiting for, for use with the database backend. Tagged with the topic that is waiting. Will send a value of 0 with no topics tagged if there are no messages waiting.
  • db_producer.insert - the number of messages inserted into the database for publishing. Tagged with topic:{topic_name}
  • db_producer.process - the number of DB messages processed. Note that this is not the same as the number of messages published if those messages are compacted. Tagged with topic:{topic_name}

Configuring Metrics Providers

See the metrics field under Configuration. View all available Metrics Providers here

Custom Metrics Providers

Using the above configuration, it is possible to pass in any generic Metrics Provider class as long as it exposes the methods and definitions expected by the Metrics module. The easiest way to do this is to inherit from the Metrics::Provider class and implement the methods in it.

See the Mock provider as an example. It implements a constructor which receives config, plus the required metrics methods.

Also see deimos.rb under Configure metrics to see how the metrics module is called.

Tracing

Deimos also includes some tracing for kafka consumers. It ships with DataDog support, but you can add custom tracing providers as well.

Trace spans are used for when incoming messages are schema-decoded, and a separate span for message consume logic.

Configuring Tracing Providers

See the tracing field under Configuration. View all available Tracing Providers here

Custom Tracing Providers

Using the above configuration, it is possible to pass in any generic Tracing Provider class as long as it exposes the methods and definitions expected by the Tracing module. The easiest way to do this is to inherit from the Tracing::Provider class and implement the methods in it.

See the Mock provider as an example. It implements a constructor which receives config, plus the required tracing methods.

Also see deimos.rb under Configure tracing to see how the tracing module is called.

Testing

Deimos comes with a test helper class which provides useful methods for testing consumers.

In spec_helper.rb:

RSpec.configure do |config|
  config.include Deimos::TestHelpers
end

Test Configuration

# The following can be added to a rpsec file so that each unit 
# test can have the same settings every time it is run
around(:each) do |example|
  Deimos::TestHelpers.unit_test!
  example.run
  Deimos.config.reset!
end

# Similarly you can use the Kafka test helper
around(:each) do |example|
  Deimos::TestHelpers.kafka_test!
  example.run
  Deimos.config.reset!
end

# Kakfa test helper using schema registry
around(:each) do |example|
  Deimos::TestHelpers.full_integration_test!
  example.run
  Deimos.config.reset!
end

With the help of these helper methods, rspec examples can be written without having to tinker with Deimos settings. This also prevents Deimos setting changes from leaking in to other examples.

This does not take away the ability to configure Deimos manually in individual examples. Deimos can still be configured like so:

    it 'should not fail this random test' do
      
      Deimos.configure do |config|
        config.consumers.fatal_error = proc { true }
        config.consumers.reraise_errors = false
      end
      ...
      expect(some_object).to be_truthy
      ...
      Deimos.config.reset!
    end

If you are using one of the test helpers in an around(:each) block and want to override few settings for one example, you can do it like in the example shown above. These settings would only apply to that specific example and the Deimos config should reset once the example has finished running.

Test Usage

In your tests, you now have the following methods available:

# Pass a consumer class (not instance) to validate a payload against it.
# This will fail if the payload does not match the schema the consumer
# is set up to consume.
test_consume_message(MyConsumer, 
                    { 'some-payload' => 'some-value' }) do |payload, metadata|
      # do some expectation handling here
end

# You can also pass a topic name instead of the consumer class as long
# as the topic is configured in your Deimos configuration:
test_consume_message('my-topic-name',
                    { 'some-payload' => 'some-value' }) do |payload, metadata|
      # do some expectation handling here
end

# Alternatively, you can test the actual consume logic:
test_consume_message(MyConsumer, 
                    { 'some-payload' => 'some-value' }, 
                    call_original: true)
                    
# Test that a given payload is invalid against the schema:
test_consume_invalid_message(MyConsumer, 
                            { 'some-invalid-payload' => 'some-value' })
                            
# For batch consumers, there are similar methods such as:
test_consume_batch(MyBatchConsumer,
                   [{ 'some-payload' => 'some-value' },
                    { 'some-payload' => 'some-other-value' }]) do |payloads, metadata|
  # Expectations here
end

## Producing
                            
# A matcher which allows you to test that a message was sent on the given
# topic, without having to know which class produced it.                         
expect(topic_name).to have_sent(payload, key=nil, partition_key=nil, headers=nil)

# Inspect sent messages
message = Deimos::Backends::Test.sent_messages[0]
expect(message).to eq({
  message: {'some-key' => 'some-value'},
  topic: 'my-topic',
  headers: { 'foo' => 'bar' },
  key: 'my-id'
})

Test Utilities

There is also a helper method that will let you test if an existing schema would be compatible with a new version of it. You can use this in your Ruby console but it would likely not be part of your RSpec test:

require 'deimos/test_helpers'
# Can pass a file path, a string or a hash into this:
Deimos::TestHelpers.schemas_compatible?(schema1, schema2)

You can use the InlineConsumer class to help with integration testing, with a full external Kafka running.

If you have a consumer you want to test against messages in a Kafka topic, use the consume method:

Deimos::Utils::InlineConsumer.consume(
  topic: 'my-topic', 
  frk_consumer: MyConsumerClass,
  num_messages: 5
  )

This is a synchronous call which will run the consumer against the last 5 messages in the topic. You can set num_messages to a number like 1_000_000 to always consume all the messages. Once the last message is retrieved, the process will wait 1 second to make sure they're all done, then continue execution.

If you just want to retrieve the contents of a topic, you can use the get_messages_for method:

Deimos::Utils::InlineConsumer.get_messages_for(
  topic: 'my-topic',
  schema: 'my-schema',
  namespace: 'my.namespace',
  key_config: { field: 'id' },
  num_messages: 5
)

This will run the process and simply return the last 5 messages on the topic, as hashes, once it's done. The format of the messages will simply be

{
  payload: { key: value }, # payload hash here
  key: "some_value" # key value or hash here
}

Both payload and key will be schema-decoded as necessary according to the key config.

You can also just pass an existing producer or consumer class into the method, and it will extract the necessary configuration from it:

Deimos::Utils::InlineConsumer.get_messages_for(
  topic: 'my-topic',
  config_class: MyProducerClass,
  num_messages: 5
)

Utilities

You can use your configured schema backend directly if you want to encode and decode payloads outside of the context of sending messages.

backend = Deimos.schema_backend(schema: 'MySchema', namespace: 'com.my-namespace')
encoded = backend.encode(my_payload)
decoded = backend.decode(my_encoded_payload)
coerced = backend.coerce(my_payload) # coerce to correct types
backend.validate(my_payload) # throws an error if not valid
fields = backend.schema_fields # list of fields defined in the schema

You can also do an even faster encode/decode:

encoded = Deimos.encode(schema: 'MySchema', namespace: 'com.my-namespace', payload: my_payload)
decoded = Deimos.decode(schema: 'MySchema', namespace: 'com.my-namespace', payload: my_encoded_payload)

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/flipp-oss/deimos .

You can/should re-generate RBS types when methods or classes change by running the following:

rbs collection install # if you haven't done it 
rbs collection update
bundle exec sord --hide-private --no-sord-comments sig/defs.rbs --tags 'override:Override'

Linting

Deimos uses Rubocop to lint the code. Please run Rubocop on your code before submitting a PR. The GitHub CI will also run rubocop on your pull request.


Sponsored by

deimos's People

Contributors

2rba avatar alemdomarco avatar angad-singh avatar ariana-flipp avatar colinmroberts avatar csingh avatar dependabot[bot] avatar dkharytonenko-blackbird avatar dorner avatar dvvrt avatar eduardopoleoflipp avatar harsha-flipp avatar imactia avatar jloustel avatar jmccance avatar lionelpereira avatar michaelcondo avatar mwolfeflipp avatar newbray avatar rwilsonncsa avatar yannp avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

deimos's Issues

Disable producers in development if not configured

Currently, Deimos assumes that if Kafka is configured, it should send messages to it (with the exception of RSpecs when producers are stubbed). In development mode, it should detect on startup if Kafka is actually running as configured, and if not, it should run in test mode where messages get sent to an array. Currently, it crashes, meaning devs would have to manually change the configuration or that they must run Kafka even if they aren't actively testing the Kafka parts of their app.

ActiveRecordConsumer does not replicate the behaviour of ActiveRecord::Persistence#touch

The ActiveRecordConsumer does not update records when a producer produces a message where the only change to the record is a change to the updated_at field (ex. when triggered by the touch method).

As seen on line 74, the updated_at and created_at fields are not included in the return hash used by the consume method to update the record.

This becomes problematic as the record's updated_at timestamp will not reflect the timestamp associated with the original record and does not replicate the behaviour of the ActiveRecord touch method. As shown here, the timestamps of the record will retain their original values.

Extract Executor and relevant classes into a separate gem

There are other use cases for this pattern (a looping construct that checks if it should exit after each loop, using a thread pool). It was largely copied from Phobos but there are at least two or three uses just in Deimos and I think it could be helpful elsewhere.

DB backend errors when using Avro-encoded key

When using the DB backend and an Avro-encoded key, the DB backend errors out when trying to persist the KafkaMessage.

TypeError: can't quote ActiveSupport::HashWithIndifferentAccess
from /Users/jmccance/.asdf/installs/ruby/2.6.3/lib/ruby/gems/2.6.0/gems/activerecord-6.0.3.4/lib/active_record/connection_adapters/abstract/quoting.rb:231:in `_quote'

I think the issue is here in lib/deimos/backends/db.rb, when m.partition_key is nil and m.key is a Hash.

partition_key: m.partition_key || m.key

Encoded Error Logging while consuming messages

While testing the new deimos upgrade on Godfather, found an issue where error logging is not human readable in the metadata and data fields.
Here is the link to a cloudwatch log showing the error logging.

Remove monkey patches

These have been there for a long time but may not be necessary. If they are, consider adding a PR to the gems they're patching instead of leaving them in here.

This includes the activerecord-import patch in KafkaSource. Especially with the new insert_all method, it's flaky and won't always be what people want.

Crash in `produce_error` method

In Deimos::Instrumentation, it tries accessing the metadata of the message. This is related to an old monkey patch which we've removed and always crashes:

undefined method `metadata' for #<Kafka::PendingMessage:0x0000000bb714b0>
/lib/deimos/instrumentation.rb:51:in `block in send_produce_error'

We need to rewrite this so this doesn't happen.

Add DummyProducer utility class

Have a single simple method to produce a string or hash to a Kafka topic, without having to define a producer class. This is useful when debugging connectivity issues. Schema can be optional.

Support bulk imports in import_without_validations_or_callbacks

When dealing with bulk imports for DB Backend records, we should add better support for refreshing records from the DB. Specifically, in import_without_validations_or_callbacks in kafka_source.rb, we currently do a single DB query for each record that is being imported. When there is a bulk import, especially with thousands of records, this is extremely slow.

If we could bulk-query the list of IDs and match them up with the records being imported, this would make this import much more efficient

Schema registry does not use topic prefix for its subject

The schema registry registers the topic name as the subject of the schema. However, this does not take the configured producer topic prefix into account. This can result in clobbering of one topic against another, or schema evolution errors.

Introduce concept of "fatal errors"

Allow on a global or per-consumer level the ability to define an error as a "fatal error" and reraise it even if config.reraise_errors is set to false.

We can provide a block in the configuration and a method to override in the consumer which would be passed the exception object and the payloads.

For example, the global block could check to see if the database is down (via connection.active?) and reraise the error if so. This would prevent consumers from consuming messages nonstop when a critical resource is missing.

Better way to handle deletes in `generate_payload`

Currently generate_payload is called even for deletion events via KafkaSource. This forces implementing code to check to see if the only key is the payload key and return otherwise. We should intentionally not call generate_payload on deletion events.

Add DDL for configuration

Currently configuration is split up between the producer/consumer class, a configure app-wide block, and phobos.yml. We should strike our own way for this and add a DDL similar to Rails routes where we can specify overall configuration and producer/consumer configs in a single place.

Truth is, even if we can combine the configure block and the .yml file together that would be a win. I'm not 100% sure if the producer/consumer config should live with the individual class or centralized.

Don't resend messages when splitting up batches

DB Producer has a feature where, when the batch is too large and gets rejected by Kafka, it splits the batch up and tries again. This is fine when going from the full batch to smaller batches, but when splitting it up further, it may be resending messages unnecessarily:

  • Batch of 1000 gets split up into 10 batches of 100 of varying size
  • The first 5 batches get sent, but the 6th one is too large
  • DB Producer splits the original batch back up into 100 batches of 10 each, including the first 500 messages

These messages shouldn't be resent.

Lag calculation can be incorrect

Currently, Deimos's lag reporting is calculated based on the current offset and latest offset. Latest offset is only updated when a message comes in.

If no messages are being consumed (e.g. some bug in the client library or a crash), the lag calculation will not try to re-retrieve the latest offset. This means that lag is increasing but the lag reporter won't realize.

Change this so latest offset is always recalculated every heartbeat.

Upgrade to Avro 1.9

We use the avro-patches gem because when we first made this, Avro hadn't updated its official gem in years. Now that 1.9 is finally released, we should remove avro-patches and update the avro gem.

Use Datadog APM for Kafka

As of version 0.38, ddtrace-rb supports ruby-kafka out of the box:
DataDog/dd-trace-rb#1070

We should look at the existing DD metrics / tracing to see if it makes sense for us to just use the existing ruby-kafka integration or if we should keep our custom things. The integration is a bit chatty so we might need to do some filtering first: https://github.com/DataDog/dd-trace-rb/blob/master/docs/GettingStarted.md#processing-pipeline

The other thing to think about is that we have configurable backends, and we might need a way to suppress particular metrics/tracing for Kafka but allow it for other things.

Print configuration values on bootup

Deimos has a number of default configurations, such as not using Avro encoding and decoding when run in development mode. Most of these configurations are implicit and not easily found without digging quite a bit into the code, but can cause cryptic errors when for example, an Avro encoded message is being consumed but not decoded with the correct decoder setting.

Printing configured settings on bootup would add some much needed clarity to avoid these problems.

Fail explicitly when no topic given

Currently, if a producer is configured to have a blank topic (e.g. an environment variable that has no value), Deimos doesn't error out until it actually tries to produce to the topic. RubyKafka also doesn't error out, it just says "can't assign partitions". This is super confusing. Deimos should fail immediately when it tries to produce and no topic is configured.

cc: @kentkhlui

Add Rake task to auto-send ActiveRecordProducer events

Currently, our workflow is that we send events on callbacks. This works for most situations, but for codebases that need to make heavy use of import / insert_all / update_all, it becomes tricky and cumbersome to remember to send events every time.

For these use cases, a better pattern is to implement a separate task which polls the relevant tables and sends events for records whose updated_at column is recent. This ensures that no changes are missed, and also has the advantage of batching all updates at once.

Downside of this pattern is that there may be many events being sent unnecessarily (if columns the producer doesn't care about are changed) as well as more, possibly significant DB reads. It also introduces at least some delay between when the DB is updated and when Kafka is notified.

Another downside is that it can't handle deletes. There is unfortunately no easy workaround for this. App code can use a combination of KafkaSource (for deletes only) and this pattern.

This feature should add another table, e.g. kafka_source_updates to store the most recently seen updated_at column. The Rake task should use an Executor to continually poll the database and use ActiveRecordProducer to send relevant events. There should be some overlap (1 second? 5 seconds?) between the most recently seen updated_at and the first updated_at it searches for.

KafkaSource crashes on bulk-import if import hooks are disabled

If you have a KafkaSource with import hooks disabled:

  def self.kafka_config
    {
      update: true,
      delete: true,
      import: false,
      create: true
    }
  end

Doing a bulk import that only updates (no inserts) on that class will fail with the following error:

NoMethodError: undefined method `num_inserts' for nil:NilClass

This seems to be because the early return in the import_without_validations_or_callbacks monkey patch does not return the results from super

        results = super
        return unless self.kafka_config[:import] #### Here
        return if array_of_attributes.empty? #### And here

The activerecord-import gem later tries to use the results of the monkey patched method like this and crashes (because it's nil). See import.rb:735

      return_obj.num_inserts = 0 if return_obj.num_inserts.nil?

Generate models and migrations based on schema

This is for the use case where we use Deimos essentially as a database sink via ActiveRecordConsumer.

It would be super neat if we could just feed Deimos a schema and tell it to create Rails migrations, models and ActiveRecordConsumers for it. Devs could go in and make changes to them, but the defaults would be there for them.

Note that I do not want to do it the other way around because schemas should be carefully created and not just based on whatever's in the database.

DB Producer can fail if all messages in batch were unique

In the DB producer, if a compacted topic has a batch with all-unique keys, it will fail when calling compact_messages with this error:

NoMethodError:
       undefined method `reverse!' for nil:NilClass

E.g. consider this sample test which should pass:

      it 'should no make any changes when there are no duplicates' do
        Deimos.configure { |c| c.db_producer.compact_topics = :all }
        expect(producer.compact_messages(deduped_batch)).to eq(deduped_batch)
      end

This appears to be because compact batch is calling reverse! on uniq!. uniq! will return nil when no changes were made to the array, i.e. it was unique already. This method should use uniq instead

Use insert_all if on Rails 6

Currently we're using activerecord-import for the DB producer. We should detect if the app is on Rails 6 and use insert_all - this will remove the dependency for activerecord-import for newer apps.

Cache schemas across execution

With the release of AvroTurf 1.0 and this PR dasch/avro_turf#111 we should now be able to cache schemas across the full execution rather than having one schema cached per producer/consumer. We should be able to have a singleton encoder/decoder now.

The previous problem is that we'd have multiple schemas defining the same subschema (e.g. "Status") and AvroTurf would crash.

Match matchers on diffing messages

Currently, Deimos::TestHelpers tries to be smart about reporting the diff between the closest message sent and the reported one. However, in its matching and reporting, it treats matchers as a value rather than something to match against a value - i.e. anything is always treated as different from the other hash rather than the same.

See if we can make this smarter so our error messages are more useful and there isn't a lot of noise about AnyArgMatcher being different from the given value.

Unscope find request for ActiveRecordConsumer

Currently, ActiveRecordConsumer will try to find the record by doing a where query. If there is a default scope, this query may not return the object and it will crash when trying to insert it again.

Add accessor for custom schema stores

This will allow more functionality when working with local (cached) schemas. All we really need to do is expose the schema store as an attr_accessor and pass it into the AvroDataDecoder and AvroDataEncoder.

TestHelpers might clobber messages sent in before_each

Currently, TestHelpers will clear out the sent messages in the test backend before each test. However, it uses RSpec's before hook to do this. Depending on how the test files are loaded, this may blow away messages that were actually sent in a test's before hook.

This should be replaced with prepend_before to make sure it doesn't exhibit this behavior.

Shut down sync producers if persistent_connections is set

Currently Deimos shuts down async producers on app shutdown. Phobos 1.8.3 adds a persistent_connections setting so that even sync producers require shutdown. We should call that manually the same way we call async shutdown if that setting is set.

Add more unit and integration tests

Unit test coverage leaves a lot to be desired as it was mainly tested in the wild. We need to increase our coverage and add integration tests actually talking to Kafka and the database. Preferably a docker-compose for running integration tests locally.

Add ConsumerProducer feature

A common use case for Kafka is to take in one topic and output a different one, either filtering (use a different topic, same schema), mapping (different schema) or both.

We should create a class that acts as both a producer and consumer, which can provide not only its own schema but also the "target" topic and schema. The API should include a include_payload? method as well as a generate_payload method.

For the mapping case, the class should be smart enough to map the source to the target payload based on the schemas, meaning we could theoretically make a mapper that needs zero code.

Do not call generate_payload when payload is nil

In ActiveRecordConsumer, it will call generate_payload even when deleting a record and the payload consists of nothing but a payload_key. This should not call the method since there should be no payload to generate.

Use associations with nested records in ActiveRecordConsumer

We can make the default behavior smarter when dealing with nested records by checking to see if the field matches an association. If so, we can create / update the associated records as well with the same logic. We've already seen this pattern a few times.

Extract configuration to a gem

I think the configuration method we use here is a reusable pattern and there may be other gem authors who might be interested in using it. I thought we could call it Optimal - it would be a relatively small gem but it would be nice to have it out!

Support method sending in generate_payload if they match schema field names

When an ActiveRecord class has a method with the same name as a field in the schema, the generate_payload method should be able to populate the payload with that field.

An example of this is when initial is a field in the kafka schema, there should be no additional code to add to generate_payload

class Widget < ApplicationRecord
  # Quite literally the first letter of the name
  def initial
    name.first
  end
end
# == Schema Information
#
# Table name: widgets
#
#  name   :string(6)        not null, primary key

Display diff with most recent message as well as closest message

Deimos::TestHelpers by default will print a diff of the closest message to the expected message. This often gets confused by differing fields. Often the thing we really want to see is a diff against the most recent message, as that tends to be the one tested. We should print the diffs for both of them (if they aren't the same message).

DB Producer will resend messages if deletion fails

If the DB producer tries to delete a set of messages and it fails for some reason (e.g. deadlock), there is no retry logic; it just fails out. This means another producer can pick it up and will send it again.

We should add retries while deleting messages (up to 5? Infinite? Only on lock wait timeout errors?) to increase the likelihood that the messages don't get sent more than once.

Deimos::Producer#publish unable to consolidate BigDecimal and float types for sub-records

Deimos::Producer#publish unable to consolidate BigDecimal and float types for sub-records.

Currently:
calling publish returns the following error after item was converted into a sub-record:
Avro::SchemaValidator::ValidationError:
at .item.price expected type float, got BigDecimal with value 0.37999e3
at .item.original_price expected type float, got BigDecimal with value 0.39999e3

      # @param job [Bullwhip::Job] The Bullwhip job object
      # @param item [Item] The item object
      # return [Hash] Returns a ItemScraperResponse payload
      def generate_payload(job, item)
        # Possible defect in Deimos causes SchemaValidator to be unable to
        # consolidate BigData and float types for subtype records

        { external_id: job.external_id,
          merchant_name_identifier: job.parameters[:merchant_name_identifier],
          url: job.parameters[:url],
          language: job.parameters[:language],
          item: item.attributes }
      end

PREVIOUSLY:
No errors when item attributes were sent within the payload directly.

      def generate_payload(job, item)
        { external_id: job.external_id,
          merchant_name_identifier: job.parameters[:merchant_name_identifier],
          url: job.parameters[:url],
          language: job.parameters[:language] }.merge(item)

Change InlineConsumer to use fetch_messages

Currently we have a rather complex process to consume messages dynamically via the InlineConsumer. I recently discovered that the Kafka client can fetch messages directly via the fetch_messages API call. This can hugely simplify the implementation and also make it way faster (it can take several seconds right now because it actually starts and stops a consumer).

Validate that producer is defined when including KafkaSource

When a class includes KafkaSource but doesn't define a producer (by overriding kafka_producers or kafka_producer), we get a SystemStackError: stack level too deep. We should instead add a check to see if both methods are nil and if so, raise an error.

ActiveRecordConsumer assumes key is id when deleting or updating

Currently, ActiveRecordConsumer will delete or update a record by finding it based on the key. This assumes that the key is always the primary key of the record, which isn't always true. We should refactor this into a fetch_record method which can be overridden similar to other methods.

Even better, use the key schema to fetch records.

send_produce_error crashes

This is a relic of old code which was removed and doesn't work (metadata isn't a thing). We should replace this by instantiating a Deimos::KafkaMessage with the given data and calling decoded_message, which doesn't require us to store the decoded message or producer on the message itself.

timestamp-millis and timestamp-micros fields are serialized incorrectly

When serializing an Avro message with fields that use the timestamp-millis or timestamp-micros logical types, the AvroSchemaCoercerer truncates Time types down to seconds since epoch instead of millis.

  • Ruby 2.6.3
  • Rails 6.0.3.4
  • Deimos 1.8.3

Example Schema

{
  "namespace": "my.company",
  "name": "Payload",
  "type": "record",
  "fields": [
    {
      "name": "timestamp-millis-field",
      "type": "long",
      "logicalType": "timestamp-millis"
    }
  ]
}

Message Hash

{
  'timestamp-millis-field' => Time.utc(2020, 11, 12, 13, 14, 15, 999000)
}

Expected Output

{
  "timestamp-millis-field": 1605186855999 // 2020-11-12T13:14:15.999Z
}

Actual Output

{
  "timestamp-millis-field": 1605186855 // 1970-01-19T07:53:06.000Z
}

Add support for Phobos 1.9+

Phobos will be deprecating and then changing its behavior starting with 2.0. Deimos should work with 1.9.x and use the new interface, allowing us to upgrade to 2.0.

Send DB backend messages by partition instead of partition_key

Currently, DB backend producers can scale out the number of topics but not a single large topic with lots of partitions. To ensure order, only a single thread can send messages from a topic right now.

We should be changing this so that we save the partition of a message instead of a partition_key (using RubyKafka's Partitioner class to generate the partition). We should then add a configuration to allow locking particular topics (or all topics) by partition instead of just by topic. (For small topics this would be overkill.)

This will require a PR to Phobos to allow sending messages with a partition, which it currently does not support, as well as changing the save logic, the read logic, and the migration template.

Inline consumer should use retries when seeking

Currently, InlineConsumer will try to seek to the necessary offset, and if it can't (e.g. LeaderNotAvailable error), it'll just ignore the seek. This may cause integration tests to fail. We should replace this logic with sleep + retries (up to a max of 3), similar to what RubyKafka does in this case.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.