Is your feature request related to a problem? Please describe.
I have a use case,
- Creating schema with references
- Validating the schema
I have a user schema with "id", and "role_name", and user_command schema which is used as a part of a micro service, but also it uses the user schema as a reference
1. User and User Command schemas:
pub fn user_schema() -> SuppliedReference {
let schema_raw = r#"
{
"type":"object",
"properties":{
"id":{"type":"string"},
"role_name":{"type":"string"}
}
}
"#;
SuppliedReference {
name: String::from("com.example.user"),
subject: String::from("com.example.user"),
schema: String::from(schema_raw),
references: vec![],
}
}
pub fn user_command_schema() -> SuppliedSchema {
SuppliedSchema {
name: Some(String::from("com.example.user_command")),
schema_type: SchemaType::Json,
schema: r#"
{
"properties":{
"id":{"type":"string"},
"name":{"type":"string"},
"metadata":{"type":"string"},
"creator_service_name":{"type":"string"},
"created_on":{"type":"integer"},
"data": {
"$ref": "{}"
}
}
}"#.to_string().replace("{}", &user_schema().subject),
references: vec![user_schema()]
}
}
2. Registering the schemas into the schema registry:
let schema_registry_url = "localhost:9001".to_string();
let subject= "com.example.user_command".to_string();
let result = post_schema(
&SrSettings::new(schema_registry_url),
subject,
user_command_schema(),
)
.await
.unwrap();
println!("result: {:?}", result);
println!("Schema registry creation is done");
Until here there there is not problem, and the schema will be registered into the schema registry.
The problem arises when I need to validate + produce it to Kafka, how can I use my user_command_schema
to validate my data before is produced into Kafka
The code will fail if I try to encode the data using the schema and it will fail by running this code:
// source at: https://github.com/gklijs/schema_registry_converter/blob/master/src/async_impl/json.rs#L237
fn reference_url(rr: &RegisteredReference) -> Result<Url, SRCError> {
match Url::from_str(&*rr.name) {
Ok(v) => Ok(v),
Err(e) => Err(SRCError::non_retryable_with_cause(e, &*format!("reference schema with subject {} and version {} has invalid id {}, it has to be a fully qualified url", rr.subject, rr.version, rr.name)))
}
}
Describe the solution you'd like
I need in my example using json that producer.send_json
to do the validation before sending the data to kafka, also in case transactional producer/consumer to validate the data before sending (for producer) and after receiving (for consumer).