Code Monkey home page Code Monkey logo

Comments (8)

mensfeld avatar mensfeld commented on May 20, 2024

Hey @teodor-pripoae, I'm not 100% sure I get what you want to do ;) do you need something like that:

class MyController < Karafka::BaseController
     self.parser = lambda do |raw_message|
       dynamic_parser.parse(raw_message)
     end
end

If so, it can be done ;) ofc with lambda it will work a bit slower but you will get what you need. Also there's one more way to achieve that (dynamic proxy building) - will show you in couple minutes

from karafka.

mensfeld avatar mensfeld commented on May 20, 2024

@teodor-pripoae
I made some adjustments (lambdas not yet there) so for now you can use something like that:

class ParserProxy
  class ParserError < Karafka::Errors::ParserError; end

  attr_accessor :parser

  def self.proxy(proto_klass)
    new(proto_klass)
  end

  def initialize(klass)
    @klass = klass
  end

  def parse(msg)
    @klass.decode(msg)
  rescue => e
    raise ParserError
  end
end

from karafka.

teodor-pripoae avatar teodor-pripoae commented on May 20, 2024

Well, I tested this first by manually creating a parser proxy with hardcoded protobuf class and I've got into a couple of errors using sidekiq serialization.

# using parser proxy above
class LandingInvitationController < Karafka::BaseController
  self.topic = 'mail.landing_invitation'
  self.parser = ParserProxy.new(Yam::LandingInvitation)

  def perform
    "foo"
  end
end

Sending kafka message:

echo -e "\n\[email protected]\x12\x06barbaz" | kafkacat -b localhost:9092 -t mail.landing_invitation -P

Logs:

# karafka:run
Enqueuing LandingInvitationController - {"message"=>"\[email protected]\x12\x06barbaz", "controller"=>LandingInvitationController, "worker"=>LandingInvitationWorker, "parser"=>#<ParserProxy:0x007fa32a7dc268 @klass=Yam::LandingInvitation>, "topic"=>"mail.landing_invitation", "received_at"=>2015-11-21 16:33:05 +0200} into LandingInvitationWorker

# sidekiq
{"class"=>"LandingInvitationWorker", "args"=>[{"message"=>"\[email protected]\u0012\u0006barbaz", "controller"=>"LandingInvitationController", "worker"=>"LandingInvitationWorker", "parser"=>"#<ParserProxy:0x007fa32a7dc268>", "topic"=>"mail.landing_invitation", "received_at"=>"2015-11-21 16:33:05 +0200"}], "retry"=>true, "queue"=>"default", "jid"=>"ab6a7f34dfcb19076fedaf37", "created_at"=>1448116385.489842, "enqueued_at"=>1448116385.489919}

You can see that message received in sidekiq is different than the message received in the kafka job. I think sidekiq messes with it when encoding to JSON.

from karafka.

teodor-pripoae avatar teodor-pripoae commented on May 20, 2024

If I send the message as base64 encoded protobuf, and change the decoding line to:

@klass.decode(Base64.strict_decode64(msg))

I can see on the logs:

# karafka:run
Enqueuing LandingInvitationController - {"email"=>"[email protected]", "token"=>"barbaz", "controller"=>LandingInvitationController...}
# sidekiq
{"class"=>"LandingInvitationWorker", "args"=>[{"email"=>"[email protected]", "token"=>"barbaz", "controller"=>"LandingInvitationController", "worker"=>"LandingInvitationWorker"...}

I guess sidekiq is trying to encode my object to json. I still want to receive protobuf message and decode it in worker.

from karafka.

teodor-pripoae avatar teodor-pripoae commented on May 20, 2024

I managed to get it working without serializer proxy class, but it requires a little boilerplate:

class LandingInvitationController < Karafka::BaseController
  self.topic = 'mail.landing_invitation'
  self.worker = LandingInvitationJob

  def perform
  end
end

class LandingInvitationJob < BaseJob
  sidekiq_options :queue => :landing_invitation
  serializer Yam::LandingInvitation

  Contract Yam::LandingInvitation => Any
  def process(payload)
    Karafka.logger.info(payload)
  end

  def on_failure
   #do_something
  end
end

# and the custom worker

class BaseJob < ::SidekiqGlass::Worker
  include Sidekiq::Worker
  include Contracts

  attr_accessor :params

  def self.serializer(klass)
    define_method :parse do |message|
      klass.decode(Base64.strict_decode64(message))
    end
  end

  # @param args [Array] controller params and controller topic
  # @note Arguments are provided in Karafka::BaseController enqueue
  def execute(*args)
    self.params = args.first
    Karafka.logger.info("#{self.class}#execute for #{params}")

    if self.respond_to?(:parse)
      process(parse(params["message"]))
    else
      process(params["message"])
    end
  end

  # What action should be taken when execute method fails
  # With after_failure we can provide reentrancy to this worker
  # @param args [Array] controller params and controller topic
  def after_failure(*args)
    self.params = args.first

    unless self.respond_to?(:on_failure)
      Karafka.logger.warn("#{self.class}#on_failure missing for #{params}")
      return
    end

    Karafka.logger.warn("#{self.class}#on_failure for #{args}")
    self.on_failure
  end
end

from karafka.

mensfeld avatar mensfeld commented on May 20, 2024

Yeah - it seems sidekiq is fu**king up data when #to_json
created a separate issue for that #31 - we will fix it soon

from karafka.

mensfeld avatar mensfeld commented on May 20, 2024

I've been able to reproduce:

WaterDrop::Message.new('channel_video_event', "\[email protected]\x12\x06barbaz").send!
Enqueuing CountersController - {"message"=>"\[email protected]\x12\x06barbaz", "controller"=>CountersController, "worker"=>CountersWorker, "parser"=>JSON, "topic"=>:channel_video_event, "received_at"=>2015-11-26 15:53:32 +0100} into CountersWorker
2015-11-26T14:53:50.652Z 16770 TID-cpeig CountersWorker JID-c43181e4748a487a01a74476 INFO: start
I, [2015-11-26T15:53:50.653021 #16770]  INFO -- : CountersWorker#execute for {"message"=>"\[email protected]\u0012\u0006barbaz", "controller"=>"CountersController", "worker"=>"CountersWorker", "parser"=>"JSON", "topic"=>"channel_video_event", "received_at"=>"2015-11-26 15:53:50 +0100"}

Work in progress...

from karafka.

mensfeld avatar mensfeld commented on May 20, 2024

Hey @teodor-pripoae we've fixed your issue. Please use custom interchangers with data that cannot be json parsed via sidekiq:

class TestController < Karafka::BaseController
  self.interchanger = Base64Interchanger
end

class Base64Interchanger
  class << self
    def load(params)
      Base64.encode64(Marshal.dump(params))
    end

    def parse(params)
      Marshal.load(Base64.decode64(params))
    end
  end
end

this should be merged and in master today. I've tested that on your example - seems to work fine :) Will add example like that also to the example app. Thanks!

from karafka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.