wso2-extensions / esb-connector-kafka Goto Github PK
View Code? Open in Web Editor NEWLicense: Apache License 2.0
License: Apache License 2.0
Hi Team,
Description:
This is regarding the Kafka connector[1] and currently the Kafka connector implementation[2] does not support Avro schemas[3] with schema inside type field.
Please refer to the below example of avro schema with schema in type.
{
"type": "record",
"name": "Customer",
"namespace": "kafka.affanhasan.poc",
"fields": [
{
"name": "contactNumber",
"type": "string"
},
{
"name": "email",
"type": "string"
},
{
"name": "name",
"type": {
"type": "record",
"name": "name",
"fields": [
{
"name": "first",
"type": "string"
},
{
"name": "last",
"type": "string"
}
]
}
}
]
}
It would be great if we can add this support as well since it is highly unlikely that we only need just one record definition for the entire avro schema[4][5].
[1] https://store.wso2.com/store/assets/esbconnector/details/b15e9612-5144-4c97-a3f0-179ea583be88
[2] https://github.com/wso2-extensions/esb-connector-kafka/tree/v3.1.3
[3] https://apim.docs.wso2.com/en/latest/reference/connectors/kafka-connector/kafka-connector-avro-producer-example/
[4] https://avro.apache.org/docs/1.10.2/spec.html#schemas
[5] https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-avro.html#avro-serializer
Thank you,
Pasindu G.
Description:
In Kafka producer connector configuration, when adding custom message headers we have to give the parameter name as . which limits the capability to set the topic name dynamically in the API/Proxy configuration.
E.g:
<kafkaTransport.publishMessages>
<topic>topic1</topic>
<partitionNo>{$ctx:partitionNo}</partitionNo>
<topic1.reProcessCount>{$ctx:initialReprocessCount}</topic1.reProcessCount>
<topic1.reportType>{$ctx:reportType}</topic1.reportType>
</kafkaTransport.publishMessages>
I use your extension in wso2 ESB-5.0.0. it works fine but when I restart or start ESB i got below ERROR but it works fine. Why I got this Error??
I use version 2.0.4.
TID: [-1234] [] [2019-12-14 16:27:21,549] ERROR {org.apache.synapse.deployers.LibraryArtifactDeployer} - Deployment of synapse artifact failed for synapse libray at : /opt/wso2esb-5.0.0/repository/deployment/server/synapse-libs/esb-connector-kafka-org.wso2.carbon.connector.kafkaTransport-2.0.4.zip : artifacts.xml file not found at : /opt/wso2esb-5.0.0/tmp/libs/1576328241545esb-connector-kafka-org.wso2.carbon.connector.kafkaTransport-2.0.4.zip/connector.xml {org.apache.synapse.deployers.LibraryArtifactDeployer} org.apache.synapse.SynapseException: artifacts.xml file not found at : /opt/wso2esb-5.0.0/tmp/libs/1576328241545esb-connector-kafka-org.wso2.carbon.connector.kafkaTransport-2.0.4.zip/connector.xml at org.apache.synapse.libraries.util.LibDeployerUtils.populateDependencies(LibDeployerUtils.java:117) at org.apache.synapse.libraries.util.LibDeployerUtils.createSynapseLibrary(LibDeployerUtils.java:67) at org.apache.synapse.deployers.LibraryArtifactDeployer.deploy(LibraryArtifactDeployer.java:60) at org.apache.axis2.deployment.repository.util.DeploymentFileData.deploy(DeploymentFileData.java:136) at org.apache.axis2.deployment.DeploymentEngine.doDeploy(DeploymentEngine.java:807) at org.apache.axis2.deployment.repository.util.WSInfoList.update(WSInfoList.java:144) at org.apache.axis2.deployment.RepositoryListener.update(RepositoryListener.java:377) at org.apache.axis2.deployment.RepositoryListener.checkServices(RepositoryListener.java:254) at org.apache.synapse.Axis2SynapseController.deployMediatorExtensions(Axis2SynapseController.java:743) at org.apache.synapse.Axis2SynapseController.createSynapseEnvironment(Axis2SynapseController.java:388) at org.apache.synapse.ServerManager.start(ServerManager.java:182) at org.wso2.carbon.mediation.initializer.ServiceBusInitializer.initESB(ServiceBusInitializer.java:452) at org.wso2.carbon.mediation.initializer.ServiceBusInitializer.activate(ServiceBusInitializer.java:196) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.eclipse.equinox.internal.ds.model.ServiceComponent.activate(ServiceComponent.java:260) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.activate(ServiceComponentProp.java:146) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.build(ServiceComponentProp.java:345) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponent(InstanceProcess.java:620) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponents(InstanceProcess.java:197) at org.eclipse.equinox.internal.ds.Resolver.getEligible(Resolver.java:343) at org.eclipse.equinox.internal.ds.SCRManager.serviceChanged(SCRManager.java:222) at org.eclipse.osgi.internal.serviceregistry.FilteredServiceListener.serviceChanged(FilteredServiceListener.java:107) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.dispatchEvent(BundleContextImpl.java:861) at org.eclipse.osgi.framework.eventmgr.EventManager.dispatchEvent(EventManager.java:230) at org.eclipse.osgi.framework.eventmgr.ListenerQueue.dispatchEventSynchronous(ListenerQueue.java:148) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEventPrivileged(ServiceRegistry.java:819) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEvent(ServiceRegistry.java:771) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistrationImpl.register(ServiceRegistrationImpl.java:130) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.registerService(ServiceRegistry.java:214) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.registerService(BundleContextImpl.java:433) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.registerService(BundleContextImpl.java:451) at org.wso2.carbon.inbound.endpoint.persistence.service.InboundEndpointPersistenceServiceDSComponent.activate(InboundEndpointPersistenceServiceDSComponent.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.eclipse.equinox.internal.ds.model.ServiceComponent.activate(ServiceComponent.java:260) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.activate(ServiceComponentProp.java:146) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.build(ServiceComponentProp.java:345) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponent(InstanceProcess.java:620) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponents(InstanceProcess.java:197) at org.eclipse.equinox.internal.ds.Resolver.getEligible(Resolver.java:343) at org.eclipse.equinox.internal.ds.SCRManager.serviceChanged(SCRManager.java:222) at org.eclipse.osgi.internal.serviceregistry.FilteredServiceListener.serviceChanged(FilteredServiceListener.java:107) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.dispatchEvent(BundleContextImpl.java:861) at org.eclipse.osgi.framework.eventmgr.EventManager.dispatchEvent(EventManager.java:230) at org.eclipse.osgi.framework.eventmgr.ListenerQueue.dispatchEventSynchronous(ListenerQueue.java:148) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEventPrivileged(ServiceRegistry.java:819) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEvent(ServiceRegistry.java:771) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistrationImpl.register(ServiceRegistrationImpl.java:130) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.registerService(ServiceRegistry.java:214) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.registerService(BundleContextImpl.java:433) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.registerService(BundleContextImpl.java:451) at org.wso2.carbon.core.init.CarbonServerManager.initializeCarbon(CarbonServerManager.java:514) at org.wso2.carbon.core.init.CarbonServerManager.start(CarbonServerManager.java:219) at org.wso2.carbon.core.internal.CarbonCoreServiceComponent.activate(CarbonCoreServiceComponent.java:94) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.eclipse.equinox.internal.ds.model.ServiceComponent.activate(ServiceComponent.java:260) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.activate(ServiceComponentProp.java:146) at org.eclipse.equinox.internal.ds.model.ServiceComponentProp.build(ServiceComponentProp.java:345) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponent(InstanceProcess.java:620) at org.eclipse.equinox.internal.ds.InstanceProcess.buildComponents(InstanceProcess.java:197) at org.eclipse.equinox.internal.ds.Resolver.getEligible(Resolver.java:343) at org.eclipse.equinox.internal.ds.SCRManager.serviceChanged(SCRManager.java:222) at org.eclipse.osgi.internal.serviceregistry.FilteredServiceListener.serviceChanged(FilteredServiceListener.java:107) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.dispatchEvent(BundleContextImpl.java:861) at org.eclipse.osgi.framework.eventmgr.EventManager.dispatchEvent(EventManager.java:230) at org.eclipse.osgi.framework.eventmgr.ListenerQueue.dispatchEventSynchronous(ListenerQueue.java:148) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEventPrivileged(ServiceRegistry.java:819) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.publishServiceEvent(ServiceRegistry.java:771) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistrationImpl.register(ServiceRegistrationImpl.java:130) at org.eclipse.osgi.internal.serviceregistry.ServiceRegistry.registerService(ServiceRegistry.java:214) at org.eclipse.osgi.framework.internal.core.BundleContextImpl.registerService(BundleContextImpl.java:433) at org.eclipse.equinox.http.servlet.internal.Activator.registerHttpService(Activator.java:81) at org.eclipse.equinox.http.servlet.internal.Activator.addProxyServlet(Activator.java:60) at org.eclipse.equinox.http.servlet.internal.ProxyServlet.init(ProxyServlet.java:40) at org.wso2.carbon.tomcat.ext.servlet.DelegationServlet.init(DelegationServlet.java:38) at org.apache.catalina.core.StandardWrapper.initServlet(StandardWrapper.java:1282) at org.apache.catalina.core.StandardWrapper.loadServlet(StandardWrapper.java:1195) at org.apache.catalina.core.StandardWrapper.load(StandardWrapper.java:1085) at org.apache.catalina.core.StandardContext.loadOnStartup(StandardContext.java:5318) at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5610) at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:147) at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1572) at org.apache.catalina.core.ContainerBase$StartChild.call(ContainerBase.java:1562) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Description:
We need to add Avro Logical Type support for the Kafka connector when producing messages.
A sample of Kafka-Schema looks like as shown below:
{ "name": "creation", "type": { "type": "long", "logicalType": "timestamp-millis" } },
Suggested Labels:
Suggested Assignees:
Affected Product Version:
wso2mi-4.2.0
Description:
Connector version >=3.0.0 doesn't have kafkaTransport.init component for Integration Studio,
Affected Product Version:
=3.0.0
OS, DB, other environment details and versions:
Linux
Steps to reproduce:
<kafkaTransport.init><bootstrapServers>server:9092</bootstrapServers></kafkaTransport.init>
<kafkaTransport.init/>
I use connector from WSO2 Integration Studio 8.0.2
<?xml version="1.0" encoding="UTF-8"?>
<api context="/publishweatherdata" name="WeatherDataPublishAPI" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST">
<inSequence>
<kafkaTransport.init>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</keySerializerClass>
<valueSerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</valueSerializerClass>
<schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
<acks>all</acks>
<requestTimeout>10000</requestTimeout>
<timeout>8000</timeout>
<metadataFetchTimeout>5000</metadataFetchTimeout>
</kafkaTransport.init>
<kafkaTransport.init>
</kafkaTransport.init>
<kafkaTransport.publishMessages>
<topic>weatherdatatopic</topic>
</kafkaTransport.publishMessages>
<payloadFactory media-type="json">
<format>
{"topic":"$1", "partition":"$2", "offset":"$3"}
</format>
<args>
<arg evaluator="xml" expression="$ctx:topic"/>
<arg evaluator="xml" expression="$ctx:partition"/>
<arg evaluator="xml" expression="$ctx:offset"/>
</args>
</payloadFactory>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<respond/>
</inSequence>
<outSequence/>
<faultSequence/>
</resource>
</api>
if I use org.apache.kafka.common.serialization.StringSerializer working fine
if I use io.confluent.kafka.serializers.KafkaAvroSerializer I have:
[2022-02-22 15:00:08,817] INFO {KafkaProduceConnector} - {api:WeatherDataPublishAPI} SEND : send message to Broker lists
but toipic is empty
when switching windows <schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
disappears
Connector 3.1.1 & 3.1.2
On WSO2 Proxy
<kafkaTransport.init>
<connectionType>kafka</connectionType>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<name>KafkaSSLBroker</name>
<securityProtocol>SSL</securityProtocol>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<bootstrapServers>server:9093</bootstrapServers>
<poolingEnabled>false</poolingEnabled>
<sslTruststoreLocation>D:\Temp\kfk\kafka.client.truststore.jks</sslTruststoreLocation>
<sslTruststorePassword>pass123</sslTruststorePassword>
<sslKeystoreLocation>D:\Temp\kfk\kafka.server.keystore.jks</sslKeystoreLocation>
<sslKeystorePassword>pass123</sslKeystorePassword>
<sslKeyPassword>pass123</sslKeyPassword>
<sslEndpointIdentificationAlgorithm/>
</kafkaTransport.init>
On server.properties parameter ssl.endpoint.identification.algorithm =
wso2carbon.log:
`[2021-08-20 15:47:49,565] INFO {org.apache.kafka.clients.producer.ProducerConfig} - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [192.168.106.17:9093]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 50
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = D:\Temp\kfk\kafka.server.keystore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = D:\Temp\kfk\kafka.client.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
[2021-08-20 15:47:49,599] WARN {org.apache.kafka.clients.producer.ProducerConfig} - The configuration 'metadata.fetch.timeout.ms' was supplied but isn't a known config.
[2021-08-20 15:47:49,600] WARN {org.apache.kafka.clients.producer.ProducerConfig} - The configuration 'timeout.ms' was supplied but isn't a known config.
[2021-08-20 15:47:49,600] WARN {org.apache.kafka.clients.producer.ProducerConfig} - The configuration 'schema.registry.url' was supplied but isn't a known config.
[2021-08-20 15:47:49,600] WARN {org.apache.kafka.clients.producer.ProducerConfig} - The configuration 'block.on.buffer.full' was supplied but isn't a known config.
[2021-08-20 15:47:49,601] INFO {org.apache.kafka.common.utils.AppInfoParser} - Kafka version: 2.3.0
[2021-08-20 15:47:49,602] INFO {org.apache.kafka.common.utils.AppInfoParser} - Kafka commitId: fc1aaa116b661c8a
[2021-08-20 15:47:49,602] INFO {org.apache.kafka.common.utils.AppInfoParser} - Kafka startTimeMs: 1629449269601
[2021-08-20 15:47:49,603] INFO {org.wso2.carbon.connector.KafkaProduceConnector} - {proxy:KafkaProxy} SEND : send message to Broker lists
[2021-08-20 15:47:50,085] INFO {org.apache.kafka.common.network.Selector} - [Producer clientId=producer-1] Failed authentication with /192.168.106.17 (SSL handshake failed)
[2021-08-20 15:47:50,086] ERROR {org.apache.kafka.clients.NetworkClient} - [Producer clientId=producer-1] Connection to node -1 (/192.168.106.17:9093) failed authentication due to: SSL handshake failed
[2021-08-20 15:47:50,087] ERROR {org.wso2.carbon.connector.KafkaProduceConnector} - {proxy:KafkaProxy} Kafka producer connector:Error sending the message to broker lists with connection Pool java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1269)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:933)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:743)
at org.wso2.carbon.connector.KafkaProduceConnector.send(KafkaProduceConnector.java:313)
at org.wso2.carbon.connector.KafkaProduceConnector.publishMessage(KafkaProduceConnector.java:223)
at org.wso2.carbon.connector.KafkaProduceConnector.connect(KafkaProduceConnector.java:132)
at org.wso2.carbon.connector.core.AbstractConnector.mediate(AbstractConnector.java:32)
at org.apache.synapse.mediators.ext.ClassMediator.updateInstancePropertiesAndMediate(ClassMediator.java:178)
at org.apache.synapse.mediators.ext.ClassMediator.mediate(ClassMediator.java:97)
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:109)
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:71)
at org.apache.synapse.mediators.template.TemplateMediator.mediate(TemplateMediator.java:136)
at org.apache.synapse.mediators.template.InvokeMediator.mediate(InvokeMediator.java:170)
at org.apache.synapse.mediators.template.InvokeMediator.mediate(InvokeMediator.java:93)
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:109)
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:71)
at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:158)
at org.apache.synapse.core.axis2.ProxyServiceMessageReceiver.receive(ProxyServiceMessageReceiver.java:228)
at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
at org.apache.synapse.transport.passthru.ServerWorker.processNonEntityEnclosingRESTHandler(ServerWorker.java:375)
at org.apache.synapse.transport.passthru.ServerWorker.processEntityEnclosingRequest(ServerWorker.java:434)
at org.apache.synapse.transport.passthru.ServerWorker.run(ServerWorker.java:182)
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names present
at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:350)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:293)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:288)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:654)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:473)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:369)
at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:444)
at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074)
at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008)
at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:402)
at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:484)
at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:340)
at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:265)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:170)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:331)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.security.cert.CertificateException: No subject alternative names present
at java.base/sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:142)
at java.base/sun.security.util.HostnameChecker.match(HostnameChecker.java:101)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:429)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:283)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:632)
... 19 more
`
Please correct the sslEndpointIdentificationAlgorithm, which you set in "https" forcibly !!!
Clients that I wrote on Golang and Python connect without problems with Kafka Broker with the same parameters.
I also use https://www.conduktor.io/ and connect without any problems.
Description:
If the field has a default value defined on the schema, when the field is missing, the default value should be provided without raising an exception.
Description:
When using the Avro schema, the message is accepted, despite the value for the given field doesn't match the schema specification. We need to return an error about the message not complying with the schema.
I am trying to publish a message to Kafka with a very simple configuration as follows:
<localEntry key="KAFKA_CONNECTION_1" xmlns="http://ws.apache.org/ns/synapse">
<kafkaTransport.init>
<connectionType>kafka</connectionType>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<name>KAFKA_CONNECTION_1</name>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<bootstrapServers>localhost:9092</bootstrapServers>
<poolingEnabled>false</poolingEnabled>
</kafkaTransport.init>
</localEntry>
<kafkaTransport.publishMessages configKey="KAFKA_CONNECTION_1">
<topic>ei-output</topic>
<partitionNo>1</partitionNo>
</kafkaTransport.publishMessages>
When my sequence publishes a message, it always gets an error like this:
2022-01-24 11:12:12,561 [DEBUG ] clients.producer.KafkaProducer - [Producer clientId=] Exception occurred during message send:
org.apache.kafka.common.errors.TimeoutException: Topic ei-output not present in metadata after 60000 ms.
2022-01-24 11:12:12,602 [ERROR ] carbon.connector.KafkaProduceConnector - Kafka producer connector : Error sending the message to broker
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic ei-output not present in metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1317) ~[kafka-clients-2.7.1.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:986) ~[kafka-clients-2.7.1.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:886) ~[kafka-clients-2.7.1.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:774) ~[kafka-clients-2.7.1.jar:?]
at org.wso2.carbon.connector.KafkaProduceConnector.send(KafkaProduceConnector.java:327) ~[1642997422761kafkaTransport-connector-3.1.2.zip/:?]
at org.wso2.carbon.connector.KafkaProduceConnector.publishMessage(KafkaProduceConnector.java:241) [1642997422761kafkaTransport-connector-3.1.2.zip/:?]
at org.wso2.carbon.connector.KafkaProduceConnector.connect(KafkaProduceConnector.java:138) [1642997422761kafkaTransport-connector-3.1.2.zip/:?]
at org.wso2.carbon.connector.core.AbstractConnector.mediate(AbstractConnector.java:32) [org.wso2.carbon.connector.core_4.7.99.jar:?]
at org.apache.synapse.mediators.ext.ClassMediator.updateInstancePropertiesAndMediate(ClassMediator.java:178) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
at org.apache.synapse.mediators.ext.ClassMediator.mediate(ClassMediator.java:97) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:109) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:71) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
at org.apache.synapse.mediators.template.TemplateMediator.mediate(TemplateMediator.java:136) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
at org.apache.synapse.mediators.template.InvokeMediator.mediate(InvokeMediator.java:170) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
at org.apache.synapse.mediators.template.InvokeMediator.mediate(InvokeMediator.java:93) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:109) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:242) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
at org.apache.synapse.core.axis2.Axis2SynapseEnvironment.mediateFromContinuationStateStack(Axis2SynapseEnvironment.java:820) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
at org.apache.synapse.core.axis2.Axis2SynapseEnvironment.injectMessage(Axis2SynapseEnvironment.java:322) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
at org.apache.synapse.core.axis2.SynapseCallbackReceiver.handleMessage(SynapseCallbackReceiver.java:608) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
at org.apache.synapse.core.axis2.SynapseCallbackReceiver.receive(SynapseCallbackReceiver.java:207) [synapse-core_2.1.7.wso2v227.jar:2.1.7-wso2v227]
at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180) [axis2_1.6.1.wso2v63.jar:?]
at org.apache.synapse.transport.passthru.ClientWorker.run(ClientWorker.java:298) [synapse-nhttp-transport_2.1.7.wso2v227.jar:?]
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172) [axis2_1.6.1.wso2v63.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic ei-output not present in metadata after 60000 ms.
I spent the whole day struggling with this problem, and finally, I found out the problem seemed to come from the <partitionNo>1</partitionNo>
configuration. When I delete this configuration, the Kafka connector has been operating normally. So there may be an error in this configuration
Description:
Following parameters are not been added to the design view and the source view respectively.
Affected Product Version:
3.1.4
Steps to reproduce:
Expected behaviour
Current behaviour
Description:
When something goes wrong an error log is printed, but the error is not propagated to the client. Therefore we need to catch the exception on the connector and set the following to the message context,
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.