Code Monkey home page Code Monkey logo

schema_registry_converter's Introduction

schema_registry_converter

Build Status codecov Crates.io Crates.io docs.rs

This library provides a way of using the Confluent Schema Registry in a way that is compliant with the Java client. Since Karapace is API compatible it could also be used with this library. The release notes can be found on github Consuming/decoding and producing/encoding is supported. It's also possible to provide the schema to use when decoding. You can also include references when decoding. Without a schema provided, the latest schema with the same subject will be used.

It's supposed to be feature complete compared to the Java version. If anything is missing or not working as expected please create an issue or start a discussion on github discussions. An example of using this library async with protobuf to produce data to Kafka can be found in ksqlDB-GraphQL-poc. A blog with a bit of background on this library can be found titled confluent Schema Registry and Rust

Getting Started

schema_registry_converter.rs is available on crates.io. It is recommended to look there for the newest and more elaborate documentation. It has a couple of feature flags, be sure to set them correctly.

To use it to convert using avro async use:

[dependencies]
schema_registry_converter = { version = "4.0.0", features = ["avro"] }

For simplicity there are easy variants that internally have an arc. Making it easier to use at the price of some overhead. To use the easy variants add the easy feature and use the structs that start with Easy in the name to do the conversions.

[dependencies]
schema_registry_converter = { version = "4.0.0", features = ["easy", "avro"] }

...and see the docs for how to use it.

All the converters also have a blocking (non async) version, in that case use something like:

[dependencies]
schema_registry_converter = { version = "4.0.0", default-features = false, features = ["avro", "blocking"] }

If you need to use both in a project you can use something like, but have to be weary you import the correct paths depending on your use.

[dependencies]
schema_registry_converter = { version = "4.0.0", features = ["avro", "blocking"] }

Consumer

For consuming messages encoded with the schema registry, you need to fetch the correct schema from the schema registry to transform it into a record. For clarity, error handling is omitted from the diagram.

Consumer activity flow

Producer

For producing messages which can be properly consumed by other clients, the proper id needs to be encoded with the message. To get the correct id, it might be necessary to register a new schema. For clarity, error handling is omitted from the diagram.

Producer activity flow

Example with consumer and producer using Avro (blocking)

Examples which does both consuming/decoding and producing/encoding. To use structs with Avro they must have an implementation of either the serde::Deserialize or serde::Serialize trait to work. The examples are especially useful to update from the 1.x.x version, when starting you probably want to use the async versions.

use rdkafka::message::{Message, BorrowedMessage};
use apache_avro::types::Value;
use schema_registry_converter::blocking::{Decoder, Encoder};
use schema_registry_converter::blocking::schema_registry::SubjectNameStrategy;

fn main() {
    let decoder = Decoder::new(SrSettings::new(String::from("http://localhost:8081")));
    let encoder = Encoder::new(SrSettings::new(String::from("http://localhost:8081")));
    let hb = get_heartbeat(msg, &decoder);
    let record = get_future_record_from_struct("hb", Some("id"), hb, &encoder);
    producer.send(record);
}

fn get_value<'a>(
    msg: &'a BorrowedMessage,
    decoder: &'a Decoder,
) -> Value {
    match decoder.decode(msg.payload()) {
        Ok(v) => v,
        Err(e) => panic!("Error getting value: {}", e),
    }
}

fn get_heartbeat<'a>(
    msg: &'a BorrowedMessage,
    decoder: &'a Decoder,
) -> Heartbeat {
    match decoder.decode_with_name(msg.payload()) {
        Ok((name, value)) => {
            match name.name.as_str() {
                "Heartbeat" => {
                    match name.namespace {
                        Some(namespace) => {
                            match namespace.as_str() {
                                "nl.openweb.data" => from_value::<Heartbeat>(&value).unwrap(),
                                ns => panic!("Unexpected namespace {}", ns),
                            }
                        }
                        None => panic!("No namespace in schema, while expected"),
                    }
                }
                name => panic!("Unexpected name {}", name),
            }
        }
        Err(e) => panic!("error getting heartbeat: {}", e),
    }
}

fn get_future_record<'a>(
    topic: &'a str,
    key: Option<&'a str>,
    values: Vec<(&'static str, Value)>,
    encoder: &'a Encoder,
) -> FutureRecord<'a> {
    let subject_name_strategy = SubjectNameStrategy::TopicNameStrategy(topic, false);
    let payload = match encoder.encode(values, &subject_name_strategy) {
        Ok(v) => v,
        Err(e) => panic!("Error getting payload: {}", e),
    };
    FutureRecord {
        topic,
        partition: None,
        payload: Some(&payload),
        key,
        timestamp: None,
        headers: None,
    }
}

fn get_future_record_from_struct<'a>(
    topic: &'a str,
    key: Option<&'a str>,
    heartbeat: Heartbeat,
    encoder: &'a Encoder,
) -> FutureRecord<'a> {
    let subject_name_strategy = SubjectNameStrategy::TopicNameStrategy(topic, false);
    let payload = match encoder.encode_struct(heartbeat, &subject_name_strategy) {
        Ok(v) => v,
        Err(e) => panic!("Error getting payload: {}", e),
    };
    FutureRecord {
        topic,
        partition: None,
        payload: Some(&payload),
        key,
        timestamp: None,
        headers: None,
    }
}

Direct interaction with schema registry

Some functions have been opened so this library can be used to directly get all the subjects, all the version of a subject, or the raw schema with a subject and version. For these see the either async or blocking version of the integration tests.

Example using to post schema to schema registry

use schema_registry_converter::blocking::schema_registry::{
    post_schema,
    SuppliedSchema
};

fn main() {
    let schema = SuppliedSchema {
        name: String::from("nl.openweb.data.Heartbeat"),
        schema_type: SchemaType::AVRO,
        schema: String::from(r#"{"type":"record","name":"Heartbeat","namespace":"nl.openweb.data","fields":[{"name":"beat","type":"long"}]}"#),
        references: vec![],
    };
    let result = post_schema("http://localhost:8081/subjects/test-value/versions", heartbeat_schema);
}

Relation to related libraries

The avro part of the conversion is handled by avro-rs. As such, I don't include tests for every possible schema. While I used rdkafka in combination to successfully consume from and produce to kafka, and while it's used in the example, this crate has no direct dependency on it. All this crate does is convert [u8] <-> Some Value (based on converter used). With Json and Protobuf some other dependencies are pulled in, by using said features. I have tried to encapsulate all the errors in the SRCError type. So even when you get a pannic/error that's an SRCError it could be an error from one of the dependencies. Please make sure you are using the library correctly, and the error is not caused by a depency, before creating an issue.

Integration test

The integration tests require a Kafka cluster running on the default ports. It will create topics, register schema's, produce and consume some messages. They are only included when compiled with the kafka_test feature, so to include them in testing cargo +stable test --all-features --color=always -- --nocapture needs to be run. The 'prepare_integration_test.sh' script can be used to create the 3 topics needed for the tests. To ensure Java compatibility it's also needed to run the schema-registry-test-app docker image.

License

This project is licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in Schema Registry Converter by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

schema_registry_converter's People

Contributors

benmanns avatar cbzehner avatar dependabot[bot] avatar gklijs avatar johnhalbert avatar kitsuneninetails avatar kujeger avatar licenser avatar mariellhoversholm-paf avatar marioloko avatar sergestrashko avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

schema_registry_converter's Issues

Have a way to leave out the full_name for the Proto Encoder

Is your feature request related to a problem? Please describe.
Currently the full_name is always needed in the proto raw encoded. This is additional hassle and prone to errors, also it's causing a little additional overhead.

Describe the solution you'd like
There should be an encode without the full_name, but that can only be used id the message use is the first message defined. Using references this is very much possible, and also how I figured out this might be the default use.
Possibly we could throw an error if the method without the full name is used, but the resolved proto file does contain multiple messages.

Describe alternatives you've considered
Just using it with the name, but it's an hassle when the proto file only contains one message.

Additional context
As you can see in https://github.com/gklijs/ksqlDB-GraphQL-poc/blob/3670101fd36f2223dfed094601f525bb48a09b5b/rust-data-creator/src/kafka_producer.rs#L17 it's quite some hassle.

Doubt regarding deserialization

Is your feature request related to a problem? Please describe.
In the decode process that exists in async_impl/avro, there is the "decode" function
This function receives the message payload and parses it, spliting the schema id and the message
Where this id is the one which the producer used to generated the message
The decode function uses this id to reach schema registry and load this schema into memory but I may be using a old DTO given that my producer can evolve adding new optional fields, with this I got errors during deserialization.

Describe the solution you'd like
Be able to provide the schema version I would like to be used to parse the message, instead of using the one used to produce it

Describe alternatives you've considered
Update the DTO before updating the schema, however this would generated a lot of work on events with a lot of consumers

Additional context
I did not found a better way to do it, if it exists please forgive my mistake

Can this crate be made to support hyphenated subject names?

Is your feature request related to a problem? Please describe.
We have an Avro schema registry set to be permissive about hyphens in its subject names. The python module has supported this for years now, as well as the Debezium source connector, KSQLdb and all of the sinks we've used. This crate doesn't seem to be able to handle subject names with hyphens.

Describe the solution you'd like
I would like this crate to allow hyphens in subject names of permissive Avro schema registries.

Describe alternatives you've considered
The alternative is that we re-emit several topics using underscores instead. This is a lot of work, and would only be necessary because of this crate. I'm not even sure ultimately that we can use this crate if we can't use hyphenated subject names.

Additional context
You might know this already, but just in case, here's the feature that made Avro allow for hyphens in their subject names. As you can see, there are use cases where it's not realistic to be restrictive of something like a hyphen in a subject name since often the input is not within the control of the user writing the consumer.

Move caches to dashmap::DashMap

Is your feature request related to a problem? Please describe.
Faster cache at the prize of an additional dependency.

Describe the solution you'd like
Use it for everything to keep maintainability good. There are already a lot of features.

Describe alternatives you've considered
Having it as feature, but that increases complexity, especially as it's in a complex integrated piece.

Additional context
See discussion #67

Add support for json schema

Is your feature request related to a problem? Please describe.
Add support for json schema, as it's not currently supported

Describe the solution you'd like
Use valico to validate the schema for producing/encoding, and use the JSON coercer from the library for consuming/decoding.

Remove or replace failure dependency

Describe the issue
The Cargo.toml file defines a dependency to the failure crate. According to this PR here, the library is unmaintained and additionally there is a RUST-SEC warning for the library.

Additional context
To be honest - I'm not sure if the schema_registry_converter is affected at all. Searching for the term failure in the github search returned just the entry in the Cargo.toml (in docker-compose and in a license file). Maybe it was removed / replaced in the past and the entry in the Cargo.toml wasn't removed?

Nevertheless I would appreciate, if the (unused) dependency could be removed or replaced.

post_schema example documentation

SuppliedSchema is referenced in the example for using post_schema in the README.md, but the example doesn't show how it is brought into scope like it does for post_schema function.

Creating a pull request for this issue that addresses this.

[Proposal] Enhancements to improve DevEX and align crate with Confluent Schema Registry java library

Feature Description

I'd like to propose a few changes aimed at improving the developer experience and ensuring the crate is in line with the Java library standards.

These changes are intended to make development and testing more straightforward and enhance the overall usability of the crate.

Here are the proposed modifications:

Introduce SchemaRegistryClient trait:

Using a trait we can significantly facilitate testing in applications that uses this crate. It will be easier to mock and test components that depend on it.

Refactor SrSettingsBuilder:

Rename to SchemaRegistryClientConfig and make methods return Self. This will improve the configuration setup in applications that use this crate.

Refactor SrSettings:

  • Rename it to CachedSchemaRegistryClient, and implement SchemaRegistryClient for it.
  • Move caches from AvroEncoder and AvroDecoder to the new Client implementation. This will make us able to share the cached schemas between the encoding and decoding process.

These changes will make this crate consistent with the Confluent Schema Registry library, and improve the code structure.

Use dynamic dispatch in AvroEncoder and AvroDecoder.

This will enable users to provide any implementation of SchemaRegistryClient to encoders and decoders

MessageResolver and IndexResolver Public

Is your feature request related to a problem? Please describe.
It would be great if the proto_resolver module would be a public module for things such as the MessageResolver and IndexResolver. This is useful when pulling down a schema from a registry and validating producer data with the pulled down schema if the schema has multiple proto messages in it.

Describe the solution you'd like
Make the proto_resolver module defined as pub in lib.rs. Or move the IndexResolver and MessageResolver to another public module so that rust apps can access it.

Additional context
I am pulling proto schemas from a schema registry by subject, and some of these schemas have multiple message definitions in it. It would be great to use the resolvers to handle this.

Support using "blocking" and "async_impl" features at the same time

Is your feature request related to a problem? Please describe.
I have a cargo workspace with several projects that use the schema_registry_converter. Some of them are tools that use the blocking implementation (i.e., no need for async_impl), but some are server apps, which need to use async_impl.

Because the workspace builds dependencies configured with the union of all features listed across all projects, I end up not being able to use async_impl anywhere as long as at least one of the projects uses blocking.

Describe the solution you'd like
I would like to be able to select both async_impl as well as blocking features without one disabling the other.

Describe alternatives you've considered
For now, I get around it by including the kafka_test feature, which ends up allowing blocking alongside async_impl, but this seems like a hack.

Additional context
Is there a specific reason why blocking automatically precludes async_impl?

https://github.com/gklijs/schema_registry_converter/blob/master/src/lib.rs#L24

I realize one must have at least one of them enabled, but the mechanism that you describe in an example (i.e., default-features = false, features = ["blocking"]) seems to provide that.

Thank you!

Error using encode_with_struct with fixed value in schema

Describe the bug
When using a fixed value in the schema it will create a value of Value::Array, which will give an a error. Ideally this should be fixed in the avro-rs library but we can work around it,

To Reproduce
Steps to reproduce the behavior:
Having a schema like

r#"{"type":"record","name":"ConfirmAccountCreation","namespace":"nl.openweb.data","fields":[{"name":"id","type":{"type":"fixed","name":"Uuid","size":16}},{"name":"a_type","type":{"type":"enum","name":"Atype","symbols":["AUTO","MANUAL"]}}]}"#;

Move to reqwest and support proxy and api key for the schema registry calls

Is your feature request related to a problem? Please describe.
Leaving curl for better support in other compilation target's like WASM. Also might reduce some complexity
because reqwest has json support, https://crates.io/crates/reqwest.

Describe the solution you'd like
Replace dependency and use, for configuration try to stay close to the names used in https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClientConfig.java where possible. It will likely mean the current schema_registry_url that is passed around will be a struct instead, with the url as only mandatory property.

Describe alternatives you've considered
Adding the featured to curl, it's possible, but make it more work to move to reqwest. So seems better to do both at the same time, before releasing 2.0.0

Additional context
Not applicable

New maintainance release with at least Tokio 1.0

Is your feature request related to a problem? Please describe.
Rdkafka is about to release a new version with a dependency on Tokio 1.0, fede1024/rust-rdkafka#299 this might cause compatibility issues, or at least might make the binaries bigger thelan they need to be.

Describe the solution you'd like
Update dependencies to latest when it don't give problems. There is also a new release of avro-rs, there might be others.

Describe alternatives you've considered
Best to fix this in the library.

Additional context
Not applicable

Switch to apache-avro

Hi, so based on this: flavray/avro-rs#99 (comment), avro-rs is not really maintained anymore and the code was contributed to apache-avro, which has added support for named/recursive types (features from the avro spec that current avro-rs does not support).

If this project could switch to the maintained and more fully featured avro library that would be awesome. Thanks.

Move Ci to Github Actions

Travis Ci is slow and shutting down, at least travis.org. Moving to Github Actions is likely the best way forward.
It seems it currently is also slower then before, and therefore the build currently fails on master when rerun.

encode string as key?

I am unable to determine how to encode a String value as the key. Ideally, the steps would be similar to:

fn get_string_primitive_schema() -> Box<SuppliedSchema> {
    Box::from(SuppliedSchema::new(r#"{"type": "string"}"#.into()))
}

let key_strategy = SubjectNameStrategy::TopicNameStrategyWithSchema(
    "test".into(),
    true,
    cp::get_string_primitive_schema(),
);
let k = Value::String("hello".to_string());
let key = self.encoder.encode(k, &key_strategy)?;

Am I missing a feature in the library or is encoding a primitive value not implemented?

Support Fetching Protobuf Field Names

I was wondering if there is any way to get the field names when decoding a protobuf. It seems that protofish only returns a Value object with a Message which has field numbers and values.

It seems like the Message may also have a MessageRef which (with a protofish context) can be converted to a MessageInfo on which you could message_info.get_field(number).name to get the field name.

That seems to be the supported way to get field names from protofish but requires the protofish context which schema_registry_converter doesn't expose.

Is there some way to get at that context, or a better way to get field names out of a protobuf?

Improve readme

Is your feature request related to a problem? Please describe.
There should be enough in the readme to get started quickly, and it should represent the latest version.

Describe the solution you'd like
A better readme.

Failing to decode record type referencing an enum

My schema looks like this

[
  {
    "type": "enum",
    "name": "myflag",
    "namespace": "myns",
    "symbols": [
      "A",
      "B",
      "C"
    ]
  },
  {
    "type": "record",
    "name": "mymsg",
    "namespace": "myns",
    "fields": [
      {
        "name": "myid",
        "type": "int"
      },
      {
        "name": "c1",
        "type": "myflag"
      },
      {
        "name": "c2",
        "type": "myflag"
      }
    ]
  }
]

Getting following error from decoder.decode function

thread '' panicked at 'called Result::unwrap() on an Err value: SRCError { error: "Could not parse schema", side: Some("Failed to parse schema: Unknown type: myflag"), retriable: false, cached: true }', src/receiver.rs:96:23

I would appreciate any help with this

Have an easy way to use the async variants

Is your feature request related to a problem? Please describe.
Because the shared cache the converters are mutable, this can be made easier be putting the correct types around it, making it much easier to use. This should especially be convenient combined with https://docs.rs/rdkafka/*/rdkafka/producer/base_producer/struct.ThreadedProducer.html.

Describe the solution you'd like
Another way to create the Encoder, that will handle the mutability better, while still ensuring the cache is used.

Describe alternatives you've considered
Users of the library do it themselves, but it seems better to just do it properly ones. This will especially make the library more useful for people less familiar with Rust.

Additional context
The solution could be used in https://github.com/gklijs/ksqlDB-GraphQL-poc/tree/main/rust-data-creator, it might make sense to also add a link to this project in the main readme.

Working through error with schema_registry_converter

Hey @gklijs, moved over to this repo as you mentioned. The code I have been testing is linked here. The error that you mentioned is correct: Could not get avro bytes, was cause by Decoding error: value does not match schema, it's retriable: false, it's cached: false', src/kafka_producer.rs:21:23. Thanks for all the help!

E0308 mismatched types: perhaps two different versions of a crate are being used

Describe the bug
My team ran into an issue with having schema_registry_converter and avro-rs as dependencies where if the versions don't match our build fails. I opened an issue with cargo, so you can find more information here: rust-lang/cargo#8178

To Reproduce
Steps to reproduce the behavior:

  1. Add schema_registry_converter and avro-rs as dependencies, but set the avro-rs version to something before the latest: avro-rs = "0.7.0"
  2. Try to build.

Notes
This comment adds some more details to the situation.

API authorization

Is your feature request related to a problem? Please describe.
Confluent Cloud requires schema api clients to authorize requests.

Describe the solution you'd like
API for providing username and password.

Support additional local protobuf definitions

The protobuf definitions in our Schema Registry contain some references to the Google common protos which are not in the registry, but rather shipped locally with our applications.

It would be helpful if this library allowed loading additional proto definitions to be used when decoding with ProtoDecoder.

It may be possible to do this by hand with protofish and the raw proto decoder, but it would be helpful if this were baked in.

Please switch SuppliedSchema to an owned string

Using a &`static str means that the string supplied to the schema must be a string known at compile time and created at program execution (akin to a static const char* in C). This disallows reading a schema in from a file.

Looking at how SuppliedSchema uses it's string, it's not obvious that a static string is needed or why an owned string should not be used. In terms of performance, this is a one-time operation or thereabouts, and is not a repeated call, and usually upon program initiation, meaning super-high performance isn't really a factor.

Using an owned string is more usable and transparent and allows for more flexibility in the use of SuppliedSchema

A proposed fix is in an upcoming PR.

Implement std::error::Error for SRCError

Is your feature request related to a problem? Please describe.

The error type does not implement the standard Error trait.

Describe the solution you'd like

Implement std::error::Error, optionally behind a feature-gate.

Describe alternatives you've considered

None.

Additional context

This was discovered while attempting to wrap the error in a snafu enum variant, only to get an error as it does not implement the now fairly old error trait.
To get around this, we opted for a new extension trait to wrap the error with the given type and return SchemaRegistryErr { source: src_error }.fail(), where SchemaRegistryErr is a variant in the snafu declaration with source tagged #[snafu(source(false))].

Support for schema reference validation

Is your feature request related to a problem? Please describe.
I have a use case,

  1. Creating schema with references
  2. Validating the schema

I have a user schema with "id", and "role_name", and user_command schema which is used as a part of a micro service, but also it uses the user schema as a reference

1. User and User Command schemas:

pub fn user_schema() -> SuppliedReference {
    let schema_raw = r#"
    {
        "type":"object",
        "properties":{
            "id":{"type":"string"},
            "role_name":{"type":"string"}
        }
    }
    "#;

    SuppliedReference {
        name: String::from("com.example.user"),
        subject: String::from("com.example.user"),
        schema: String::from(schema_raw),
        references: vec![],
    }
}

pub fn user_command_schema() -> SuppliedSchema {
    SuppliedSchema { 
        name: Some(String::from("com.example.user_command")), 
        schema_type: SchemaType::Json, 
        schema: r#"
        {
            "properties":{
                "id":{"type":"string"},
                "name":{"type":"string"},
                "metadata":{"type":"string"},
                "creator_service_name":{"type":"string"},
                "created_on":{"type":"integer"},
                "data": {
                    "$ref": "{}"
                } 
            }
        }"#.to_string().replace("{}", &user_schema().subject), 
        references: vec![user_schema()]
    }
}

2. Registering the schemas into the schema registry:

let schema_registry_url = "localhost:9001".to_string();
let subject= "com.example.user_command".to_string();
let result = post_schema(
            &SrSettings::new(schema_registry_url),
            subject,
            user_command_schema(),
        )
        .await
        .unwrap();

println!("result: {:?}", result);
println!("Schema registry creation is done");

Until here there there is not problem, and the schema will be registered into the schema registry.

The problem arises when I need to validate + produce it to Kafka, how can I use my user_command_schema to validate my data before is produced into Kafka

The code will fail if I try to encode the data using the schema and it will fail by running this code:

// source at: https://github.com/gklijs/schema_registry_converter/blob/master/src/async_impl/json.rs#L237

fn reference_url(rr: &RegisteredReference) -> Result<Url, SRCError> {
    match Url::from_str(&*rr.name) {
        Ok(v) => Ok(v),
        Err(e) => Err(SRCError::non_retryable_with_cause(e, &*format!("reference schema with subject {} and version {} has invalid id {}, it has to be a fully qualified url", rr.subject, rr.version, rr.name)))
    }
}

Describe the solution you'd like
I need in my example using json that producer.send_json to do the validation before sending the data to kafka, also in case transactional producer/consumer to validate the data before sending (for producer) and after receiving (for consumer).

multiple values encoding/decoding

At times it is convenient to pass multiple events at once to avoid inconsistet state if for example 1 out of 10 events causes some kind of error on its way.

I guess it would be great to have some API llike (encode|decode)_many which would pack events together and then unpack then successfully. avro-rs works this way actually.

QUESTION: How to pas decoder: &'_ mut AvroDecoder <'_> from async move block

Hello! I'm using this example from the decode docs in my app, I'm trying to pass &mut Decoder
into a function but getting

move occurs because `decoder` has type `schema_registry_converter::async_impl::avro::AvroDecoder<'_>`, which does not implement the `Copy` trait
    | |                                                                               move occurs due to use in generator

Has anyone seen this before?

/// Decodes bytes into a value.
    /// The choice to use Option<&[u8]> as type us made so it plays nice with the BorrowedMessage
    /// struct from rdkafka, for example if we have m: &'a BorrowedMessage and decoder: &'a mut
    /// Decoder we can use decoder.decode(m.payload()) to decode the payload or
    /// decoder.decode(m.key()) to get the decoded key.
    ///
    /// ```no_run
    /// use rdkafka::message::{Message, BorrowedMessage};
    /// use avro_rs::types::Value;
    /// use schema_registry_converter::async_impl::avro::AvroDecoder;
    /// async fn get_value (
    ///     msg: &'_ BorrowedMessage <'_>,
    ///     decoder: &'_ mut AvroDecoder <'_>,
    /// ) -> Value{
    ///     match decoder.decode(msg.payload()).await{
    ///         Ok(r) => r.value,
    ///         Err(e) => panic!("Error getting value: {}", e),
    ///     }
    /// }
    /// ```

panicked when working with tokio async

when use reqwest to request schema registry with tokio async ,like my code below in warp main thread or use tokio::spawn to create a async block will exit

use warp::Filter;
use schema_registry_converter::schema_registry::{SrSettings};

#[tokio::main]
async fn main() {
     let hello = warp::path!("hello" / String)
        .map(|name| format!("Hello, {}!", name));
     let sr_settings = SrSettings::new(String::from("http://localhost:8081/"));

    warp::serve(hello)
        .run(([127, 0, 0, 1], 3030))
        .await;
}
thread 'main' panicked at 'Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context.', ~/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.22/src/runtime/blocking/shutdown.rs:49:21
stack backtrace:
   0:        0x1083da9ee - <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::h72782cdbf82d2e78
   1:        0x1083fc1ac - core::fmt::write::h0f2c225c157771c1
   2:        0x1083d52d9 - std::io::Write::write_fmt::h6d219fc26cb45a24
   3:        0x1083dc865 - std::panicking::default_hook::{{closure}}::hde29d026f53869b1
   4:        0x1083dc5a2 - std::panicking::default_hook::h5de23f27de9ce8ce
   5:        0x1083dcdc5 - std::panicking::rust_panic_with_hook::h720143ee15fc80ba
   6:        0x10840664e - std::panicking::begin_panic::h80d64999a84f0366
   7:        0x108297874 - tokio::runtime::blocking::shutdown::Receiver::wait::hc15bfd5c68b76ca0
   8:        0x1082cbfc5 - tokio::runtime::blocking::pool::BlockingPool::shutdown::h702cd79db6adf80b
   9:        0x1082cc07d - <tokio::runtime::blocking::pool::BlockingPool as core::ops::drop::Drop>::drop::h0ab515030d41ffa8
  10:        0x1082d6215 - core::ptr::drop_in_place::h1df0de4e6c94e3fe
  11:        0x1082d8252 - core::ptr::drop_in_place::h94c49217ab7681ef
  12:        0x10801a738 - reqwest::blocking::wait::enter::he7b4b5e4e35343bf
  13:        0x108019bd7 - reqwest::blocking::wait::timeout::h7234a2ddbbcb0534
  14:        0x10803b703 - reqwest::blocking::client::ClientHandle::new::hd976e58a3aac69e4
  15:        0x10803ae1d - reqwest::blocking::client::ClientBuilder::build::h18af5d75937efe22
  16:        0x10803aece - reqwest::blocking::client::Client::new::h09aac586c8993277
  17:        0x107eff6ea - schema_registry_converter::schema_registry::SrSettings::new::h3b74a91814783f96
  18:        0x107d0a3f3 - my_web::main::{{closure}}::hbb912dd7df4aeb4d
  19:        0x107ca4120 - <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll::ha2815f8e7aca5dc1
  20:        0x107c8865d - tokio::runtime::enter::Enter::block_on::{{closure}}::h6b5e9b23e5c4d655
  21:        0x107c8e506 - tokio::coop::with_budget::{{closure}}::hb18b9d07d0e693a7
  22:        0x107cebdee - std::thread::local::LocalKey<T>::try_with::ha9811a7840d57628
  23:        0x107ceb4ac - std::thread::local::LocalKey<T>::with::hcb4e45ba7e69edaa
  24:        0x107c884ad - tokio::runtime::enter::Enter::block_on::h8dea6c389ed18880
  25:        0x107d11315 - tokio::runtime::thread_pool::ThreadPool::block_on::h274b9930f19cc162
  26:        0x107ca76e8 - tokio::runtime::Runtime::block_on::{{closure}}::hf60396b6d86ee761
  27:        0x107cc90b8 - tokio::runtime::context::enter::hcb90097146788eb6
  28:        0x107c653db - tokio::runtime::handle::Handle::enter::h7963a1ebbf377697
  29:        0x107ca763d - tokio::runtime::Runtime::block_on::h61ecdca8fd87f99b
  30:        0x107cb24cc - my_web::main::h372f3d2471b960b7
  31:        0x107d1171e - std::rt::lang_start::{{closure}}::h6532f1318acae0d5
  32:        0x1083dd14f - std::rt::lang_start_internal::hbbd10965adc92ae7
  33:        0x107d11701 - std::rt::lang_start::ha5adc3b371471675
  34:        0x107cb2532 - main

Please switch to owned strings in SubjectNameStrategy

Using slices in this enum makes it harder to use, as the lifetime of whatever string that gets passed in must outlive the SubjectNameStrategy enum that gets created, meaning this enum cannot be passed around to other functions, or stored in long-lived structs (as the owned string the slice comes from is not likely to live that long). The only way around this is to use lifetime specifiers everywhere.

As an alternative, just changing this to an owned string makes it take ownership of the string and then it can get passed around as much as it wants without lifetime specifiers. Furthermore, upon use, this slice always gets cloned into an owned string anyways, which means there isn't really a performance loss here. Plus, it's just a lot more transparent and easy-to-use in my opinion.

A fix for this is proposed in an upcoming PR.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.