Comments (8)
Hey @teodor-pripoae, I'm not 100% sure I get what you want to do ;) do you need something like that:
class MyController < Karafka::BaseController
self.parser = lambda do |raw_message|
dynamic_parser.parse(raw_message)
end
end
If so, it can be done ;) ofc with lambda it will work a bit slower but you will get what you need. Also there's one more way to achieve that (dynamic proxy building) - will show you in couple minutes
from karafka.
@teodor-pripoae
I made some adjustments (lambdas not yet there) so for now you can use something like that:
class ParserProxy
class ParserError < Karafka::Errors::ParserError; end
attr_accessor :parser
def self.proxy(proto_klass)
new(proto_klass)
end
def initialize(klass)
@klass = klass
end
def parse(msg)
@klass.decode(msg)
rescue => e
raise ParserError
end
end
from karafka.
Well, I tested this first by manually creating a parser proxy with hardcoded protobuf class and I've got into a couple of errors using sidekiq serialization.
# using parser proxy above
class LandingInvitationController < Karafka::BaseController
self.topic = 'mail.landing_invitation'
self.parser = ParserProxy.new(Yam::LandingInvitation)
def perform
"foo"
end
end
Sending kafka message:
echo -e "\n\[email protected]\x12\x06barbaz" | kafkacat -b localhost:9092 -t mail.landing_invitation -P
Logs:
# karafka:run
Enqueuing LandingInvitationController - {"message"=>"\[email protected]\x12\x06barbaz", "controller"=>LandingInvitationController, "worker"=>LandingInvitationWorker, "parser"=>#<ParserProxy:0x007fa32a7dc268 @klass=Yam::LandingInvitation>, "topic"=>"mail.landing_invitation", "received_at"=>2015-11-21 16:33:05 +0200} into LandingInvitationWorker
# sidekiq
{"class"=>"LandingInvitationWorker", "args"=>[{"message"=>"\[email protected]\u0012\u0006barbaz", "controller"=>"LandingInvitationController", "worker"=>"LandingInvitationWorker", "parser"=>"#<ParserProxy:0x007fa32a7dc268>", "topic"=>"mail.landing_invitation", "received_at"=>"2015-11-21 16:33:05 +0200"}], "retry"=>true, "queue"=>"default", "jid"=>"ab6a7f34dfcb19076fedaf37", "created_at"=>1448116385.489842, "enqueued_at"=>1448116385.489919}
You can see that message received in sidekiq is different than the message received in the kafka job. I think sidekiq messes with it when encoding to JSON.
from karafka.
If I send the message as base64 encoded protobuf, and change the decoding line to:
@klass.decode(Base64.strict_decode64(msg))
I can see on the logs:
# karafka:run
Enqueuing LandingInvitationController - {"email"=>"[email protected]", "token"=>"barbaz", "controller"=>LandingInvitationController...}
# sidekiq
{"class"=>"LandingInvitationWorker", "args"=>[{"email"=>"[email protected]", "token"=>"barbaz", "controller"=>"LandingInvitationController", "worker"=>"LandingInvitationWorker"...}
I guess sidekiq is trying to encode my object to json. I still want to receive protobuf message and decode it in worker.
from karafka.
I managed to get it working without serializer proxy class, but it requires a little boilerplate:
class LandingInvitationController < Karafka::BaseController
self.topic = 'mail.landing_invitation'
self.worker = LandingInvitationJob
def perform
end
end
class LandingInvitationJob < BaseJob
sidekiq_options :queue => :landing_invitation
serializer Yam::LandingInvitation
Contract Yam::LandingInvitation => Any
def process(payload)
Karafka.logger.info(payload)
end
def on_failure
#do_something
end
end
# and the custom worker
class BaseJob < ::SidekiqGlass::Worker
include Sidekiq::Worker
include Contracts
attr_accessor :params
def self.serializer(klass)
define_method :parse do |message|
klass.decode(Base64.strict_decode64(message))
end
end
# @param args [Array] controller params and controller topic
# @note Arguments are provided in Karafka::BaseController enqueue
def execute(*args)
self.params = args.first
Karafka.logger.info("#{self.class}#execute for #{params}")
if self.respond_to?(:parse)
process(parse(params["message"]))
else
process(params["message"])
end
end
# What action should be taken when execute method fails
# With after_failure we can provide reentrancy to this worker
# @param args [Array] controller params and controller topic
def after_failure(*args)
self.params = args.first
unless self.respond_to?(:on_failure)
Karafka.logger.warn("#{self.class}#on_failure missing for #{params}")
return
end
Karafka.logger.warn("#{self.class}#on_failure for #{args}")
self.on_failure
end
end
from karafka.
Yeah - it seems sidekiq is fu**king up data when #to_json
created a separate issue for that #31 - we will fix it soon
from karafka.
I've been able to reproduce:
WaterDrop::Message.new('channel_video_event', "\[email protected]\x12\x06barbaz").send!
Enqueuing CountersController - {"message"=>"\[email protected]\x12\x06barbaz", "controller"=>CountersController, "worker"=>CountersWorker, "parser"=>JSON, "topic"=>:channel_video_event, "received_at"=>2015-11-26 15:53:32 +0100} into CountersWorker
2015-11-26T14:53:50.652Z 16770 TID-cpeig CountersWorker JID-c43181e4748a487a01a74476 INFO: start
I, [2015-11-26T15:53:50.653021 #16770] INFO -- : CountersWorker#execute for {"message"=>"\[email protected]\u0012\u0006barbaz", "controller"=>"CountersController", "worker"=>"CountersWorker", "parser"=>"JSON", "topic"=>"channel_video_event", "received_at"=>"2015-11-26 15:53:50 +0100"}
Work in progress...
from karafka.
Hey @teodor-pripoae we've fixed your issue. Please use custom interchangers with data that cannot be json parsed via sidekiq:
class TestController < Karafka::BaseController
self.interchanger = Base64Interchanger
end
class Base64Interchanger
class << self
def load(params)
Base64.encode64(Marshal.dump(params))
end
def parse(params)
Marshal.load(Base64.decode64(params))
end
end
end
this should be merged and in master today. I've tested that on your example - seems to work fine :) Will add example like that also to the example app. Thanks!
from karafka.
Related Issues (20)
- Improve critical error shutdown procedure HOT 1
- Proper way to use independent option for DLQ HOT 3
- Provide context-based offset movement and earliest support HOT 1
- Support migrating via aliases and plan with aliases usage HOT 2
- Add base64 to gems for Ruby 3.4
- Backport #1767 to karafka v2.3 for supporting applications running on ruby <3.0 HOT 1
- Fix string vs. symbol inconsistency in config
- Active with default set to `false` cannot be overwritten
- [v2.4.0] Unable to consume messages (only on ConfluentCloud) HOT 11
- Alias consumer group from the topic HOT 1
- Add integration tests for dynamic multiplexing with routing pattern that discovers topics after subscription is running
- Add spec illustrating support of compression types in one topic
- Add a pristine spec for only waterdrop to ensure its internal system gems are all as expected
- Inheritance of `karafka_options` can cause job setting modification
- Support variants when using dispatcher for active jobs
- Align docs with changes in 2.4.1
- Use configs API to generate docs for topics and cluster configs list
- Add spec illustrating compacted topic error on dispatch of non-key messages
- Validate `producer` `karafka_options`
- Internal seek does not resolve the offset correctly for time based lookup
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from karafka.