Code Monkey home page Code Monkey logo

emp-connector's Introduction

EMP-Connector Example $\textsf{\textcolor{red}{(Deprecated)}}$

A simplified connector example to the Enterprise Messaging Platform.

This example connector provides support for SSL, HTTP proxies and supports both the long polling and websocket streaming transports. Easy subscription management and full support for event replay is provided. The connector is a thin wrapper around the underlying CometD library.


$\textsf{\textcolor{red}{Disclaimer}}$

The EMP-Connector sample is deprecated and will be archived in the future. The EMP-Connector code is an example only and isn't intended for production environments. It hasn't been rigorously tested nor performance tested for throughput and scale.

Are you looking for a code sample for subscribing to platform events and change events? Check out the Java Quick Start for Pub/Sub API in the Pub/Sub API Guide.


Example Classes

Several example classes are provided to subscribe to a channel. All classes contain a main function that starts the tool. All examples authenticate to Salesforce and subscribe to a channel. Some examples use a different authentication mechanism or provide verbose logging.

All classes process events asynchronously in a separate thread. This ensures that EMP Connector continues to perform /meta/connect requests and keeps the session alive on the server. Currently, one thread is used for event processing and events are processed in the order they're received. You can increase the number of threads for parallel processing. However, doing so may cause events to not be processed in the order received.

LoginExample

The LoginExample.java class is the default class that EMP Connector executes. This class authenticates to your production Salesforce org using your Salesforce username and password.

DevLoginExample

The DevLoginExample class enables you to pass in a custom login URL, such as a sandbox instance (https://test.salesforce.com). Also, DevLoginExample logs to the console the Bayeux connection messages received on the /meta channels, such as /meta/handshake and /meta/connect.

BearerTokenExample

The BearerExample.java class uses the OAuth bearer token authentication and accepts an access token.

Build and Execute EMP Connector

After cloning the project, build EMP Connector using Maven: $ mvn clean package

The build generates the jar file in the target subfolder.

To run EMP Connector using the LoginExample class with username and password authentication, use this command.

$ java -jar target/emp-connector-0.0.1-SNAPSHOT-phat.jar <username> <password> <channel> [optional_replay_id]

To run EMP Connector using the DevLoginExample class with username and password authentication, use this command.

$ java -classpath target/emp-connector-0.0.1-SNAPSHOT-phat.jar com.salesforce.emp.connector.example.DevLoginExample <login_URL> <username> <password> <channel> [optional_replay_id]

To run EMP Connector using an OAuth access token, use this command.

$ java -classpath target/emp-connector-0.0.1-SNAPSHOT-phat.jar com.salesforce.emp.connector.example.BearerTokenExample <instance_URL> <token> <channel> [optional_replay_id]

The last parameter is the replay ID, which is the position in the stream from which you want to receive event messages. This parameter is optional. If not specified, EMP Connector fetches events starting from the tip, the newly received event messages (-1 option). To receive stored event messages that are within the retention window, specify -2. Use -2 sparingly. If a large volume of event messages is stored, retrieving all event messages can slow performance. For more information, see Message Durability.

Subscription Filtering for PushTopic Channels

If you subscribe to a PushTopic channel with a filter, enclose the entire channel and filter information within quotes on the command line. Do not use single quotes around field values. Otherwise, EMP Connector doesn't work properly. For example, this command line uses filters on the TestAccount PushTopic.

$ java -jar target/emp-connector-0.0.1-SNAPSHOT-phat.jar <username> <password> "/topic/TestAccount?Type=Technology Partner&Phone=(415) 555-1212"

Only Pushtopic events support filtering. For more information, see Filtered Subscriptions.

Debug Logging of Bayeux Messages

The LoggingListener class provides debug logging output of Bayeux messages received on the meta channels, such as /meta/handshake and /meta/connect. Each message is logged to the console with a timestamp, a "Success" prefix or a "Failure" prefix depending on whether the operation was successful or not, and then the body of the Bayeux message. For example, this log is for a handshake message.

[2018-01-19 10:54:12.701] Success:[/meta/handshake]
{ext={replay=true, payload.format=true}, minimumVersion=1.0, clientId=cn2vei6rz2pa01gqqvungzlppy,
    supportedConnectionTypes=[Ljava.lang.Object;@6e2ce7d1,
    channel=/meta/handshake, id=1, version=1.0, successful=true}

To add logging support to your connection, first create an instance of the LoggingListener class. The LoggingListener constructor accepts two boolean arguments that specify whether to log success and failure messages. Next, call the EmpConnector.addListener() method for each meta channel to add logging for and pass in the channel and the LoggingListener instance. This example adds logging for multiple channels.

LoggingListener loggingListener = new LoggingListener(true, true);
connector.addListener(META_HANDSHAKE, loggingListener)
         .addListener(META_CONNECT, loggingListener)
         .addListener(META_DISCONNECT, loggingListener)
         .addListener(META_SUBSCRIBE, loggingListener)
         .addListener(META_UNSUBSCRIBE, loggingListener);

The DevLoginExample class uses LoggingListener to log the messages received.

Buffer Size for Received Batch of Events

EMP Connector buffers the batch of events received using the CometD library. The buffer size is set in BayeuxParameters.java, in maxBufferSize(). Ensure that the buffer size is large enough to hold all event messages in the batch. The buffer size needed depends on the publishing rate and the event message size. At a minimum, set the buffer size to 10 MB, and adjust it higher if needed.

API Version in the Streaming API Endpoint

The subscription endpoint for platform events includes the Salesforce API version. You can change the default API version in EMP Connector in BayeuxParameters.java, in version().

Reauthentication

Authentication becomes invalid when a Salesforce session is invalidated or an access token is revoked. EMP connector listens to 401::Authentication invalid error messages that Streaming API sends when the authentication is no longer valid. To reauthenticate after a 401 error is received, call the EmpConnector.setBearerTokenProvider() method, which accepts a function that reauthenticates and returns a new session ID or access token.

// Define the bearer token function
Function<Boolean, String> bearerTokenProvider = (Boolean reAuth) -> {
  ...
}
// Set the bearer token function
connector.setBearerTokenProvider(bearerTokenProvider);

For a full example, see LoginExample.java.

Documentation

For more information about the components of the EMP Connector and a walkthrough, see the Java Client Example in the Streaming API Developer Guide.

emp-connector's People

Contributors

alintnersfdc avatar aryavk avatar gmenard avatar john-brock avatar jthurst01 avatar karunahluwalia9 avatar knhage avatar lmcalpin avatar mohitj13 avatar mport avatar pbn-sfdc avatar sfdc-dxu avatar sfdc-hhildebrand avatar sivananda avatar sthummala2510 avatar svc-scm avatar vikram-kommaraju avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

emp-connector's Issues

README contains outdated example

In README.md under the section โ€œBuild and Execute EMP Connectorโ€ we have the following example:

To run EMP Connector using theย DevLoginExampleย class with username and password authentication, use this command.
$ java -classpath target/emp-connector-0.0.1-SNAPSHOT-phat.jar com.salesforce.emp.connector.example.DevLoginExample <login_URL> [optional_replay_id]

However, this command will no longer work because DevLoginExample is abstract. The examples should be updated to use DevLoginSynchronousEventProcessingExample or DevLoginAsynchronousEventProcessingExample.

Buffering capacity 1048576 exceeded

When we subscribe platform events from SFDC using EMP connector, sometimes we receive the below error message, does anyone know what is the root cause of error "Buffering capacity 1048576 exceeded"?
2019-08-16 11:53:07,078 WARN [c.i.s.e.lib.EmpConnector] [ HttpClient@153c6041-935910] EMP connector received error message: {failure={exception=java.lang.IllegalArgumentException: Buffering capacity 1048576 exceeded, message={clientId=jr1mvw84q01iegfw6mtlig4fqfm, channel=/meta/connect, id=16376, connectionType=long-polling}, connectionType=long-polling}, channel=/meta/connect, id=16376, successful=false}, will stop and restore the connection.

Reconnect fails

Running on Heroku seems to throw error when trying to stop cometd

2018-10-02T03:15:42.243201+00:00 app[worker.1]: <<<<
2018-10-02T03:15:42.243161+00:00 app[worker.1]: {advice={reconnect=handshake, interval=0}, channel=/meta/connect, id=297, error=403::Unknown client, successful=false}
2018-10-02T03:15:42.244849+00:00 app[worker.1]: >>>>
2018-10-02T03:15:42.245023+00:00 app[worker.1]: [2018-10-02 03:15:42.244] Failure:[/meta/disconnect]
2018-10-02T03:15:42.245060+00:00 app[worker.1]: {failure={exception=java.nio.channels.AsynchronousCloseException, message={clientId=391c3uwfrupcxgf1uaexxljm6fow, channel=/meta/disconnect, id=298}, connectionType=long-polling}, channel=/meta/disconnect, id=298, successful=false}
2018-10-02T03:15:42.245063+00:00 app[worker.1]: <<<<
2018-10-02T03:15:42.245086+00:00 app[worker.1]: >>>>
2018-10-02T03:15:42.245199+00:00 app[worker.1]: [2018-10-02 03:15:42.245] Failure:[/meta/disconnect]
2018-10-02T03:15:42.245201+00:00 app[worker.1]: {failure={exception=java.nio.channels.AsynchronousCloseException, message={clientId=391c3uwfrupcxgf1uaexxljm6fow, channel=/meta/disconnect, id=298}, connectionType=long-polling}, channel=/meta/disconnect, id=298, successful=false}
2018-10-02T03:15:42.245202+00:00 app[worker.1]: <<<<
2018-10-02T03:15:57.247250+00:00 app[worker.1]: [HttpClient@411721666-48] ERROR c.s.e.c.EmpConnector - Unable to stop HTTP transport[https://cs43.salesforce.com/cometd/37.0]
2018-10-02T03:15:57.247263+00:00 app[worker.1]: java.lang.InterruptedException: null
2018-10-02T03:15:57.247265+00:00 app[worker.1]: at java.lang.Object.wait(Native Method)
2018-10-02T03:15:57.247266+00:00 app[worker.1]: at java.lang.Thread.join(Thread.java:1260)
2018-10-02T03:15:57.247269+00:00 app[worker.1]: at org.eclipse.jetty.util.thread.QueuedThreadPool.doStop(QueuedThreadPool.java:154)
2018-10-02T03:15:57.247271+00:00 app[worker.1]: at org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
2018-10-02T03:15:57.247272+00:00 app[worker.1]: at org.eclipse.jetty.util.component.ContainerLifeCycle.stop(ContainerLifeCycle.java:143)
2018-10-02T03:15:57.247274+00:00 app[worker.1]: at org.eclipse.jetty.util.component.ContainerLifeCycle.doStop(ContainerLifeCycle.java:162)
2018-10-02T03:15:57.247276+00:00 app[worker.1]: at org.eclipse.jetty.client.HttpClient.doStop(HttpClient.java:252)
2018-10-02T03:15:57.247278+00:00 app[worker.1]: at org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:89)
2018-10-02T03:15:57.247280+00:00 app[worker.1]: at com.salesforce.emp.connector.EmpConnector.stop(EmpConnector.java:154)
2018-10-02T03:15:57.247282+00:00 app[worker.1]: at com.salesforce.emp.connector.EmpConnector$AuthFailureListener.onMessage(EmpConnector.java:334)
2018-10-02T03:15:57.339835+00:00 app[worker.1]: >>>>
2018-10-02T03:15:57.339956+00:00 app[worker.1]: [2018-10-02 03:15:57.339] Success:[/meta/handshake]
2018-10-02T03:15:57.339991+00:00 app[worker.1]: {ext={replay=true, payload.format=true}, minimumVersion=1.0, clientId=3bd0a4rek46fkg14j04vgo0dbb0, supportedConnectionTypes=[Ljava.lang.Object;@5ebb9ec0, channel=/meta/handshake, id=299, version=1.0, successful=true}

Unable to login

I'm trying to follow along with the Change Data Capture trailhead module, and unable to continue with the example using EMP-Connector/CometD based on a login error.

I can't ascertain what the problem is: a parser.faultstring of . is rather cryptic. Is there some additional info I can provide to determine the cause of the error?

After building the package, here's the invocation that I'm using that is resulting in an error:

$ java -jar target/emp-connector-0.0.1-SNAPSHOT-phat.jar [email protected] my-password /data/Employee__ChangeEvent
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
java.net.ConnectException: Unable to login: .
        at com.salesforce.emp.connector.LoginHelper.login(LoginHelper.java:138)
        at com.salesforce.emp.connector.LoginHelper.login(LoginHelper.java:100)
        at com.salesforce.emp.connector.LoginHelper.login(LoginHelper.java:92)
        at com.salesforce.emp.connector.example.LoginExample.lambda$main$0(LoginExample.java:41)
        at com.salesforce.emp.connector.example.BearerTokenProvider.login(BearerTokenProvider.java:24)
        at com.salesforce.emp.connector.example.LoginExample.main(LoginExample.java:49)

Disconnecting after few mins

Working fine initially but disconnecting after few minutes. We are sending salesforce account events to kafka bus.

LOG:
2017-08-08T21:02:34.047Z DEBUG <> [HttpClient@1874542689-91] c.o.s.empconnector.EmpConnector$2 - Processing /meta/connect {clientId=ja3en1s5e7cy7n1w765gzi8nrgz, channel=/meta/connect, id=32, successful=true}
2017-08-08T21:02:34.047Z DEBUG <> [HttpClient@1874542689-91] c.o.s.empconnector.EmpConnector$2 - State update: CONNECTED -> CONNECTED
2017-08-08T21:02:34.050Z DEBUG <> [pool-9-thread-1] c.o.s.empconnector.EmpConnector$2 - Connecting, transport com.opentable.salesforceinternalservice.empconnector.EmpConnector$1@795f8317
2017-08-08T21:02:34.050Z DEBUG <> [pool-9-thread-1] c.o.s.empconnector.EmpConnector$2 - Sending messages [{clientId=ja3en1s5e7cy7n1w765gzi8nrgz, channel=/meta/connect, id=33, connectionType=long-polling}]
2017-08-08T21:02:34.149Z DEBUG <> [HttpClient@1874542689-95] c.o.s.empconnector.EmpConnector$1 - Received messages [{channel=/meta/disconnect}, {clientId=ja3en1s5e7cy7n1w765gzi8nrgz, advice={reconnect=none, interval=0}, channel=/meta/connect, id=33, error=403::Unknown client, successful=false}]
2017-08-08T21:02:34.149Z DEBUG <> [HttpClient@1874542689-95] c.o.s.empconnector.EmpConnector$2 - Processing /meta/disconnect {channel=/meta/disconnect}
2017-08-08T21:02:34.150Z DEBUG <> [HttpClient@1874542689-95] c.o.s.empconnector.EmpConnector$2 - State update: CONNECTED -> TERMINATING
2017-08-08T21:02:34.151Z DEBUG <> [HttpClient@1874542689-95] c.o.s.empconnector.EmpConnector$2 - State update: TERMINATING -> DISCONNECTED
2017-08-08T21:02:34.151Z DEBUG <> [HttpClient@1874542689-95] c.o.s.empconnector.EmpConnector$2 - Processing /meta/connect {clientId=ja3en1s5e7cy7n1w765gzi8nrgz, advice={reconnect=none, interval=0}, channel=/meta/connect, id=33, error=403::Unknown client, successful=false}
2017-08-08T21:02:34.151Z DEBUG <> [HttpClient@1874542689-95] c.o.s.empconnector.EmpConnector$2 - State not updateable: DISCONNECTED -> TERMINATING

Please help me on this.

Event Schema - How to get schema along with the data

After subscribing, I am getting the event data like
{"event":{"createdDate":"2018-11-20T18:20:23.383Z","replayId":2,"type":"updated"},"sobject":{"Email":"[email protected]","Id":"someid","Name":"Test 2"}}
I would like to receive schema too. I cannot just post this data to down stream apps. I need associated schema with it. This helps downstream apps to adopt to changes like adding new fields, deleting etc.

Forget the adhoc changes, I still want to receive schema along with data even with fixed schema.

Kindly help us.

EmpConnector Keep alive scheduler is not stopped upon Connector stop.

Problem:

When the EmpConnector is initialized using the below constructor

public EmpConnector(BayeuxParameters parameters) {
this(parameters, Executors.newSingleThreadScheduledExecutor());
}

New singleThreadScheduledExecutor thread is automatically created. This scheduler thread is never getting shutdown when the connector was stopped. Also, the application is not having the access to private member scheduler to do a shutdown.

Any application usage of creating EmpConnector on the fly is leading to thread leak.

Workaround:
The only way to avoid this issue is by calling the other constructor with application taking care of the scheduler lifecycle.
public EmpConnector(BayeuxParameters parameters, ScheduledExecutorService scheduler)

Solution:
Instead of creating a scheduler every time the EmpConnector is created, use a singleton scheduler to reuse the scheduler.

Thanks
Suriya.

Please push to maven

Hi,

Can someone please push latest artifact to maven repository? Currently latest version is 0.0.2 published Jun, 2019.
I really need fix done by #62 and will be very useful if you can get latest artifact to Maven.

Thanks,

Websocket Example

I don't see any example for websocket connection? can you please guide me to the required files?

Outdated jetty.HttpClient

In attempting to consume this project for a project to connect to the salesforce streaming api, it was found that the current org.eclipse.jetty.HttpClient that is used was older (~2 years) than an internal library that was using a newer version.

This older version of HttpClient conflicted with our internal library and failed to acquire any connections to the threadpool used by HttpClient.

While most users of this library wouldn't necessarily see this issue, the problem was resolved by matching versions of our internal library and as a result it may be a good idea to update the version of HttpClient used.

I noticed that this is also tied to the cometd reference so keep in mind that updating HttpClient would required a newer version of cometd which would likely have some conflicts if they're semantically versioned. This library uses 31.0 and cometd is currently on 34.0

Just a heads up/recommendation if anyone else comes across a similar issue.

Error subscribing when session expires.

I get the following error when I delete the AuthSession row associated with the app to mimic an access token expiring. I tested this with access token and login in with a user and password. The following error happens like 5 minutes after deleting the AuthSession.
error=400::The replayId for channel {/topic/User_Provision_Updates} wasn't found using the provided replay ID map {{}}. Ensure that the channel name you provided in the replay map is valid and matches the channel name used for subscribing., successful=false}]

Maven plugins not found

Hi, i ran mvn clean package and it is not able to find any of my plugins.

When I run the command to subscribe to the channel,
$ java -classpath target/emp-connector-0.0.1-SNAPSHOT-phat.jar
com.salesforce.emp.connector.example.DevLoginExample <login_URL>

this is the error message that I get:

Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.net.SocketTimeoutException: Connect Timeout
at com.salesforce.emp.connector.example.DevLoginExample.lambda$processEvents$0(DevLoginExample.java:50)
at com.salesforce.emp.connector.example.BearerTokenProvider.login(BearerTokenProvider.java:24)
at com.salesforce.emp.connector.example.DevLoginExample.processEvents(DevLoginExample.java:54)
at com.salesforce.emp.connector.example.DevLoginExample.main(DevLoginExample.java:36)
Caused by: java.util.concurrent.ExecutionException: java.net.SocketTimeoutException: Connect Timeout
at org.eclipse.jetty.client.util.FutureResponseListener.getResult(FutureResponseListener.java:118)
at org.eclipse.jetty.client.util.FutureResponseListener.get(FutureResponseListener.java:101)
at org.eclipse.jetty.client.HttpRequest.send(HttpRequest.java:683)
at com.salesforce.emp.connector.LoginHelper.login(LoginHelper.java:124)
at com.salesforce.emp.connector.LoginHelper.login(LoginHelper.java:100)
at com.salesforce.emp.connector.example.DevLoginExample.lambda$processEvents$0(DevLoginExample.java:48)
... 3 more
Caused by: java.net.SocketTimeoutException: Connect Timeout
at org.eclipse.jetty.io.ManagedSelector$Connect.run(ManagedSelector.java:801)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)

Unable to access jarfile ./target/emp-connector-0.0.1-SNAPSHOT-phat.jar

During the build process I get this error.

mvn clean package
[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.salesforce.conduit:emp-connector >----------------
[INFO] Building EMP Connector 0.0.1-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ emp-connector ---
[INFO] Deleting /Users/JCatter2/Desktop/EMP-Connector/target
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ emp-connector ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /Users/JCatter2/Desktop/EMP-Connector/src/main/resources
[INFO] 
[INFO] --- maven-compiler-plugin:2.3.2:compile (default-compile) @ emp-connector ---
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] Compiling 14 source files to /Users/JCatter2/Desktop/EMP-Connector/target/classes
[INFO] 
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ emp-connector ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] Copying 1 resource
[INFO] 
[INFO] --- maven-compiler-plugin:2.3.2:testCompile (default-testCompile) @ emp-connector ---
[INFO] No sources to compile
[INFO] 
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ emp-connector ---
[INFO] 
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ emp-connector ---
[INFO] Building jar: /Users/JCatter2/Desktop/EMP-Connector/target/emp-connector-0.0.1-SNAPSHOT.jar
[INFO] 
[INFO] >>> maven-source-plugin:3.0.1:jar (attach-sources) > generate-sources @ emp-connector >>>
[INFO] 
[INFO] <<< maven-source-plugin:3.0.1:jar (attach-sources) < generate-sources @ emp-connector <<<
[INFO] 
[INFO] 
[INFO] --- maven-source-plugin:3.0.1:jar (attach-sources) @ emp-connector ---
[INFO] Building jar: /Users/JCatter2/Desktop/EMP-Connector/target/emp-connector-0.0.1-SNAPSHOT-sources.jar
[INFO] 
[INFO] --- maven-shade-plugin:2.1:shade (default) @ emp-connector ---
[INFO] Including org.cometd.java:cometd-java-client:jar:4.0.4 in the shaded jar.
[INFO] Including org.cometd.java:bayeux-api:jar:4.0.4 in the shaded jar.
[INFO] Including org.cometd.java:cometd-java-common:jar:4.0.4 in the shaded jar.
[INFO] Including org.eclipse.jetty:jetty-util-ajax:jar:9.4.18.v20190429 in the shaded jar.
[INFO] Including org.eclipse.jetty:jetty-util:jar:9.4.18.v20190429 in the shaded jar.
[INFO] Including org.eclipse.jetty:jetty-io:jar:9.4.18.v20190429 in the shaded jar.
[INFO] Including org.eclipse.jetty:jetty-http:jar:9.4.18.v20190429 in the shaded jar.
[INFO] Including org.eclipse.jetty:jetty-client:jar:9.4.18.v20190429 in the shaded jar.
[INFO] Including org.slf4j:slf4j-api:jar:1.7.26 in the shaded jar.
[INFO] Attaching shaded artifact.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.699 s
[INFO] Finished at: 2021-02-18T01:20:12-08:00
[INFO] ------------------------------------------------------------------------

JCatter2@TMC02CJ7FVMD6R EMP-Connector % java -classpath target/emp-connector-0.f0.1-SNAPSHOT-phat.jar 
com.salesforce.emp.connector.example.BearerTokenExample instance_url token channel          
Error: Could not find or load main class com.salesforce.emp.connector.example.BearerTokenExample
Caused by: java.lang.ClassNotFoundException: com.salesforce.emp.connector.example.BearerTokenExample

Retry subscribe to Salesforce event when server returns 503 Server too busy

EMP Connector fails retry subscription when SF result in Server too busy response. Instead of dumping need to capture the response and retry again for subscription

Logs that the server returns additional information:

{ext={sfdc={failureReason=503::Server is too busy. Please try your request again later.}}, ...}

As described in the Salesforce documentation here: https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/streaming_handling_errors.htm

the 503 Server too busy is a documented error that should/can be retried.

It would greatly add to the resilience of our system if the Salesforce component would automatically retry a subscribe if failed due to a temporary error.

BearerTokenExample - format?

Hello,

Can you tell for the BearerTokenExample given that I have an OAuth token what should the override bearerToken() return? Is it just the OAuth token, "Bearer " + token, "OAuth " + token? Does the code even work with OAuth or is just with Session Id?

Thanks,

Phat

Receiving Maintenance Page Response from LoginHelper

I am receiving the maintenance page content below when trying to login to my Salesforce environment using a valid username and password with the examples. I am able to manually log in using the credentials.

  <td>
   <table bgcolor="white" cellpadding="0" cellspacing="0" style="border:1px solid #ccc;" width="758">
    <tbody>
     <tr><td><br></td></tr>
     <tr>
      <td>
       <div style="background-color: white; border: 1px solid #ccc; padding: 0px; margin-top: 10px; margin-bottom: 0px; margin-left: 10px; margin-right: 10px;">
        <table bgcolor="white" cellpadding="0" cellspacing="0" width="758">
         <tbody>

          <tr>
           <td><span style="font-family: Verdana; font-size: medium; font-weight: bold;">We are down for maintenance.</span><br><br>Sorry for the inconvenience. We'll be back shortly.</td>
          </tr>
         </tbody>
        </table>
       </div>

      </td>
     </tr>
     <tr>
      <td>
       <span>
        <table border="0" cellpadding="0" cellspacing="0" style="text-align: right;" width="100%">
         <tbody>
          <tr>
           <td>

            <span>
             <span style="font-family: Verdana; font-size: smaller">Powered by <a href="http://force.com">force.com</a></span>
            </span>
           </td>
          </tr>
         </tbody>
        </table>

       </span>
      </td>
     </tr>
    </tbody>
   </table>      
  </td>
 </tr>
 <tr><td><br></td></tr>
 <tr><td></td></tr>

 <tr><td><br></td></tr>
</tbody>


Missing Replay ID in subscription

Hi,

When using replay IDs, after receiving a 401 errors during connect, EMP tries to reconnect, the connect() method is called and a subscription is attempt.

It fails with the follwing message from the server:
DEBUG | o.cometd.client.BayeuxClient.51386e5 | Processing {clientId=5nbgiqfrsirccvmf20obbgdr9sm, channel=/meta/subscribe, id=27, subscription=/topic/topicName, error=400::The replayId for channel {/topic/topicName} wasn't found using the provided replay ID map {{}}. Ensure that the channel name you provided in the replay map is valid and matches the channel name used for subscribing., successful=false}

Indeed the subscribe message was incomplete :

DEBUG | o.c.client.BayeuxClient.51386e5 | Sending messages [{ext={replay={}}, clientId=5nbgiqfrsirccvmf20obbgdr9sm, channel=/meta/subscribe, id=10, subscription=/topic/topicName}]

I think it's because the connect() method calls replay.clear() which is used by ReplayExtension to set the replay ID in the subscription message.

Its seems that a failed subscription should throw a CannotSubscribe Exception, but I can't see any trace of that in my logs (not sure why).

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder"

So i'm following this trailhead https://trailhead.salesforce.com/content/learn/modules/change-data-capture/subscribe-to-events?trail_id=architect-solutions-with-the-right-api and when i need to use this command :
java -jar target/emp-connector-0.0.1-SNAPSHOT-phat.jar orgName orgPassword /data/Employee__ChangeEvent
I keep getting this error

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" java.lang.NumberFormatException: For input string: "/data/Employee__ChangeEvent"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.base/java.lang.Long.parseLong(Long.java:678)
at java.base/java.lang.Long.parseLong(Long.java:817)
at com.salesforce.emp.connector.example.LoginExample.main(LoginExample.java:43)

Missing full data back?

On workbench it provides the following json:

{
  "channel": "/topic/Contact", 
  "clientId": "something", 
  "data": {
    "event": {
      "type": "updated", 
      "createdDate": "2018-01-31T15:08:09.000+0000"
    }, 
    "sobject": {
      "Fraud__c": null, 
      "Print_Subs_Amended__c": null, 
      "LastModifiedDate": "2018-01-31T15:07:57.000+0000" 
    }
  }
} 

However on this it provides:

{
    event = {
        createdDate = 2018 - 02 - 06 T12: 35: 13.560 Z,
        replayId = 171,
        type = updated
    }, sobject = {
        Webhelp_Account__c = false,
        LastModifiedDate = 2018 - 02 - 06 T12: 35: 13.000 Z
    }
}

How can it produce Json with all the existing data properly?

EMP-Connector does not receive notifications

Hi,

I followed the steps (described here: https://trailhead.salesforce.com/en/content/learn/modules/change-data-capture/subscribe-to-events) for installing the EMP-Connector. When I try to start the connector, nothing is shown on the console when I change data in my org:

This is the command I use:
java -jar target/emp-connector-0.0.1-SNAPSHOT-phat.jar ORGUser ORGPw /data/Employee__ChangeEvent

This is the log output i get:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Subscribed: Subscription [/data/Employee__ChangeEvent:-2]

Dependency to SLF4J

Please add a dependency block to the Maven POM.xml for the SLF4J. Without I do get a runtim error.

Dependency block for pow.xml:

org.slf4j
slf4j-simple
1.6.2

Salesforce emp Connector not waiting for event after subscribe in AWS Lambda

I have a simple client using salesforce EMP connector and subscribing to a platform event. When I have it run from commandline on windows machine it works fine and subscription and subsequent events work fine. When I deploy the same code as a Java Lambda function in AWS - it successfully subscribes but does not wait/listen for events. Is there some limitation around using java Bayeaux client in AWS Lambda ?

Below is the code in Lambda handler -

public String handleRequest(Object input, Context context) {
        try {
        context.getLogger().log("Input: " + input);

        Consumer<Map<String, Object>> consumer = event -> System.out.println(String.format("Received:\n%s", JSON.toString(event)));

        BearerTokenProvider tokenProvider = new BearerTokenProvider(() -> {
            try {
                return LoginHelper.login(new URL(SFDC_TEST_URL), SFDC_TEST_USERNAME, SFDC_PASS_SEC_KEY);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });

        BayeuxParameters params = tokenProvider.login();

        EmpConnector connector = new EmpConnector(params);
        LoggingListener loggingListener = new LoggingListener(true, true);


        connector.addListener(META_HANDSHAKE, loggingListener)
                .addListener(META_CONNECT, loggingListener)
                .addListener(META_DISCONNECT, loggingListener)
                .addListener(META_SUBSCRIBE, loggingListener)
                .addListener(META_UNSUBSCRIBE, loggingListener);


        connector.setBearerTokenProvider(tokenProvider);


        connector.setBearerTokenProvider(tokenProvider);

        connector.start().get(5, TimeUnit.SECONDS);

        TopicSubscription subscription = connector.subscribe(SFDC_TOPIC_EVENT_NAME, EmpConnector.REPLAY_FROM_TIP, consumer).get(2, TimeUnit.SECONDS);

        System.out.println(String.format("Subscribed: %s", subscription));

        // TODO: implement your handler
        return "Hello from Lambda!";
        }catch(Exception e) {
            e.printStackTrace();
            return "exited with error";
        }
    }

BearerTokenExample.java argv.length == 4

What is the following code for at BearerTokenExample.java? And when is the length usually equals to 4?

if (argv.length == 4) { replayFrom = Long.parseLong(argv[3]); }

EmpConnector Error: org.xml.sax.SAXParseException

Looks like there is some issue with LoginHelper.

Exception log:

2018-12-14 03:01:55.732  INFO 1 --- [@4e517165-11170] c.salesforce.emp.connector.EmpConnector  : EmpConnector connecting
org.xml.sax.SAXParseException; lineNumber: 2; columnNumber: 10; DOCTYPE is disallowed when the feature "http://apache.org/xml/features/disallow-doctype-decl" set to true.
        at com.sun.org.apache.xerces.internal.util.ErrorHandlerWrapper.createSAXParseException(ErrorHandlerWrapper.java:203)
        at com.sun.org.apache.xerces.internal.util.ErrorHandlerWrapper.fatalError(ErrorHandlerWrapper.java:177)
        at com.sun.org.apache.xerces.internal.impl.XMLErrorReporter.reportError(XMLErrorReporter.java:400)
        at com.sun.org.apache.xerces.internal.impl.XMLErrorReporter.reportError(XMLErrorReporter.java:327)
        at com.sun.org.apache.xerces.internal.impl.XMLScanner.reportFatalError(XMLScanner.java:1472)
        at com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl$PrologDriver.next(XMLDocumentScannerImpl.java:914)
        at com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl.next(XMLDocumentScannerImpl.java:602)
        at com.sun.org.apache.xerces.internal.impl.XMLNSDocumentScannerImpl.next(XMLNSDocumentScannerImpl.java:112)
        at com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanDocument(XMLDocumentFragmentScannerImpl.java:505)
        at com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:842)
        at com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:771)
        at com.sun.org.apache.xerces.internal.parsers.XMLParser.parse(XMLParser.java:141)
        at com.sun.org.apache.xerces.internal.parsers.AbstractSAXParser.parse(AbstractSAXParser.java:1213)
        at com.sun.org.apache.xerces.internal.jaxp.SAXParserImpl$JAXPSAXParser.parse(SAXParserImpl.java:643)
        at com.sun.org.apache.xerces.internal.jaxp.SAXParserImpl.parse(SAXParserImpl.java:327)
        at javax.xml.parsers.SAXParser.parse(SAXParser.java:195)
        at com.salesforce.emp.connector.LoginHelper.login(LoginHelper.java:134)
        at com.salesforce.emp.connector.LoginHelper.login(LoginHelper.java:100)
        at com.abc.platformevent.subscriber.SalesforceBusSubscriber.lambda$generateTokenProvider$0(SalesforceBusSubscriber.java:73)
        at com.salesforce.emp.connector.example.BearerTokenProvider.apply(BearerTokenProvider.java:33)
        at com.salesforce.emp.connector.example.BearerTokenProvider.apply(BearerTokenProvider.java:14)
        at com.salesforce.emp.connector.EmpConnector.bearerToken(EmpConnector.java:305)
        at com.salesforce.emp.connector.EmpConnector.connect(EmpConnector.java:263)
        at com.salesforce.emp.connector.EmpConnector.reconnect(EmpConnector.java:316)
        at com.salesforce.emp.connector.EmpConnector.access$900(EmpConnector.java:31)
        at com.salesforce.emp.connector.EmpConnector$AuthFailureListener.onMessage(EmpConnector.java:336)
        at org.cometd.common.AbstractClientSession$AbstractSessionChannel.notifyOnMessage(AbstractClientSession.java:597)
        at org.cometd.common.AbstractClientSession$AbstractSessionChannel.notifyMessageListeners(AbstractClientSession.java:582)
        at org.cometd.common.AbstractClientSession.notifyListeners(AbstractClientSession.java:294)
        at org.cometd.common.AbstractClientSession.receive(AbstractClientSession.java:260)
        at org.cometd.client.BayeuxClient.failConnect(BayeuxClient.java:776)
        at org.cometd.client.BayeuxClient.processConnect(BayeuxClient.java:762)
        at org.cometd.client.BayeuxClient.processMessages(BayeuxClient.java:610)
        at org.cometd.client.BayeuxClient.access$3100(BayeuxClient.java:100)
        at org.cometd.client.BayeuxClient$MessageTransportListener.onMessages(BayeuxClient.java:1190)
        at org.cometd.client.transport.LongPollingTransport$2.onComplete(LongPollingTransport.java:236)
        at org.eclipse.jetty.client.ResponseNotifier.notifyComplete(ResponseNotifier.java:193)
        at org.eclipse.jetty.client.ResponseNotifier.notifyComplete(ResponseNotifier.java:185)
        at org.eclipse.jetty.client.HttpReceiver.terminateResponse(HttpReceiver.java:464)
        at org.eclipse.jetty.client.HttpReceiver.responseSuccess(HttpReceiver.java:410)
        at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.messageComplete(HttpReceiverOverHTTP.java:301)
        at org.eclipse.jetty.http.HttpParser.handleContentMessage(HttpParser.java:628)
        at org.eclipse.jetty.http.HttpParser.parseContent(HttpParser.java:1594)
        at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:1442)
        at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.parse(HttpReceiverOverHTTP.java:173)
        at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.process(HttpReceiverOverHTTP.java:134)
        at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.receive(HttpReceiverOverHTTP.java:72)
        at org.eclipse.jetty.client.http.HttpChannelOverHTTP.receive(HttpChannelOverHTTP.java:133)
        at org.eclipse.jetty.client.http.HttpConnectionOverHTTP.onFillable(HttpConnectionOverHTTP.java:155)
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102)
        at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:291)
        at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:151)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102)
2018-12-14 03:02:10.979  INFO 1 --- [@4e517165-11170] org.cometd.bayeux.client.ClientSession   : Exception while invoking listener com.salesforce.emp.connector.EmpConnector$AuthFailureListener@56b0be33

java.lang.RuntimeException: java.lang.NullPointerException
        at com.salesforce.emp.connector.example.BearerTokenProvider.apply(BearerTokenProvider.java:35) ~[EMP-Connector-98bce92f001ede1a3df61047e1011dc4ae728a46.jar!/:na]
        at com.salesforce.emp.connector.example.BearerTokenProvider.apply(BearerTokenProvider.java:14) ~[EMP-Connector-98bce92f001ede1a3df61047e1011dc4ae728a46.jar!/:na]
        at com.salesforce.emp.connector.EmpConnector.bearerToken(EmpConnector.java:305) ~[EMP-Connector-98bce92f001ede1a3df61047e1011dc4ae728a46.jar!/:na]
        at com.salesforce.emp.connector.EmpConnector.connect(EmpConnector.java:263) ~[EMP-Connector-98bce92f001ede1a3df61047e1011dc4ae728a46.jar!/:na]
        at com.salesforce.emp.connector.EmpConnector.reconnect(EmpConnector.java:316) ~[EMP-Connector-98bce92f001ede1a3df61047e1011dc4ae728a46.jar!/:na]
        at com.salesforce.emp.connector.EmpConnector.access$900(EmpConnector.java:31) ~[EMP-Connector-98bce92f001ede1a3df61047e1011dc4ae728a46.jar!/:na]
        at com.salesforce.emp.connector.EmpConnector$AuthFailureListener.onMessage(EmpConnector.java:336) ~[EMP-Connector-98bce92f001ede1a3df61047e1011dc4ae728a46.jar!/:na]
        at org.cometd.common.AbstractClientSession$AbstractSessionChannel.notifyOnMessage(AbstractClientSession.java:597) [cometd-java-common-3.1.4.jar!/:na]
        at org.cometd.common.AbstractClientSession$AbstractSessionChannel.notifyMessageListeners(AbstractClientSession.java:582) [cometd-java-common-3.1.4.jar!/:na]
        at org.cometd.common.AbstractClientSession.notifyListeners(AbstractClientSession.java:294) [cometd-java-common-3.1.4.jar!/:na]
        at org.cometd.common.AbstractClientSession.receive(AbstractClientSession.java:260) [cometd-java-common-3.1.4.jar!/:na]
        at org.cometd.client.BayeuxClient.failConnect(BayeuxClient.java:776) [cometd-java-client-3.1.4.jar!/:na]
        at org.cometd.client.BayeuxClient.processConnect(BayeuxClient.java:762) [cometd-java-client-3.1.4.jar!/:na]
        at org.cometd.client.BayeuxClient.processMessages(BayeuxClient.java:610) [cometd-java-client-3.1.4.jar!/:na]
        at org.cometd.client.BayeuxClient.access$3100(BayeuxClient.java:100) [cometd-java-client-3.1.4.jar!/:na]
        at org.cometd.client.BayeuxClient$MessageTransportListener.onMessages(BayeuxClient.java:1190) [cometd-java-client-3.1.4.jar!/:na]
        at org.cometd.client.transport.LongPollingTransport$2.onComplete(LongPollingTransport.java:236) [cometd-java-client-3.1.4.jar!/:na]
        at org.eclipse.jetty.client.ResponseNotifier.notifyComplete(ResponseNotifier.java:193) [jetty-client-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.client.ResponseNotifier.notifyComplete(ResponseNotifier.java:185) [jetty-client-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.client.HttpReceiver.terminateResponse(HttpReceiver.java:464) [jetty-client-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.client.HttpReceiver.responseSuccess(HttpReceiver.java:410) [jetty-client-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.messageComplete(HttpReceiverOverHTTP.java:301) [jetty-client-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.http.HttpParser.handleContentMessage(HttpParser.java:628) [jetty-http-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.http.HttpParser.parseContent(HttpParser.java:1594) [jetty-http-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:1442) [jetty-http-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.parse(HttpReceiverOverHTTP.java:173) [jetty-client-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.process(HttpReceiverOverHTTP.java:134) [jetty-client-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.receive(HttpReceiverOverHTTP.java:72) [jetty-client-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.client.http.HttpChannelOverHTTP.receive(HttpChannelOverHTTP.java:133) [jetty-client-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.client.http.HttpConnectionOverHTTP.onFillable(HttpConnectionOverHTTP.java:155) [jetty-client-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281) [jetty-io-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102) [jetty-io-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:291) [jetty-io-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.io.ssl.SslConnection$3.succeeded(SslConnection.java:151) [jetty-io-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102) [jetty-io-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118) [jetty-io-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:319) [jetty-util-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:175) [jetty-util-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:133) [jetty-util-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366) [jetty-util-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:754) [jetty-util-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:672) [jetty-util-9.4.9.v20180320.jar!/:9.4.9.v20180320]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
Caused by: java.lang.NullPointerException: null
        at com.salesforce.emp.connector.example.BearerTokenProvider.apply(BearerTokenProvider.java:33) ~[EMP-Connector-98bce92f001ede1a3df61047e1011dc4ae728a46.jar!/:na]
        ... 42 common frames omitted

        at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:319)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:175)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:133)
        at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:754)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:672)
        at java.lang.Thread.run(Thread.java:748)

Similar issue: https://stackoverflow.com/questions/10837706/solve-security-issue-parsing-xml-using-sax-parser

How to subscribe from replayId ?

I am getting "400::The replayId {1553783} you provided was invalid. Please provide a valid ID, -2 to replay all events, or -1 to replay only new events." I am trying to subscribe topic from a replayid when server restarts. I am storing replayid in another database to maintain the state.

Prefer event driven architecture instead of continued polling

This API connects to salesforce and then continually polls to listen to events which is not the latest architecture pattern.
With such implementation your consumer consumes a lot of memory and compute capacity. Because of these reasons this library is very error prone.

Maven Dependency Not Found

Hi,

I'm unable to find the following dependency in Maven. I'm getting dependency not found error. Could someone point to a valid repository.

com.salesforce.conduit
emp-connector
0.0.1-SNAPSHOT

EMP connector stops receiving notifications after an hour or so.

I was performing some tests on my POC code and found that after sometime ( 1- 2 hours) notifications are not getting received, I waited for sometime assuming they might take time in some case and left it for another 1 hour but no notification I then restarted my program and pending notification got received, this happened all the time We performed test so I am logging issue for investigation

Error: INVALID_LOGIN: Invalid username, password, security token, user not active, or user locked out

Hi Team,

We are able to login but once in a week we are getting user locked out error.
When we are getting 401 or 403 error user is able to login and new session is created but once in a week user is not able to get new session and user locked out. We are getting below error.
Error: INVALID_LOGIN: Invalid username, password, security token, user not active, or user locked out
Could you please let us know what changes we need to do.

BayeuxParameters Streaming API version?

In the interface BayeuxParameters, you specify the Streaming API version to 39.0.

Is this correct?

When creating the pushtopic as in the example (https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/create_a_pushtopic.htm#create_a_pushtopic) i cannot create it with pushTopic.ApiVersion = 39.0; Salesforce returns an error : Line: 10, Column: 1
System.DmlException: Insert failed. First exception on row 0; first error: INVALID_FIELD, Invalid ApiVersion: [ApiVersion]

If i change the api version to 38.0, SFDC creates the pushtopic without any problem.

Do the versions of the API in your code need to match the version in the PushTopic ?

When i try to run the LoginExample, i get following stacktrace ::

Exception in thread "main" java.util.concurrent.ExecutionException: java.net.ConnectException: Cannot connect [https://na39.salesforce.com/cometd/replay/38.0] : {exception=org.cometd.common.TransportException: {httpCode=400}, message={ext={replay=true}, supportedConnectionTypes=[long-polling], channel=/meta/handshake, id=1, version=1.0}, httpCode=400, connectionType=long-polling}
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at com.salesforce.emp.connector.example.LoginExample.main(LoginExample.java:48)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.net.ConnectException: Cannot connect [https://na39.salesforce.com/cometd/replay/38.0] : {exception=org.cometd.common.TransportException: {httpCode=400}, message={ext={replay=true}, supportedConnectionTypes=[long-polling], channel=/meta/handshake, id=1, version=1.0}, httpCode=400, connectionType=long-polling}
at com.salesforce.emp.connector.EmpConnector.lambda$connect$3(EmpConnector.java:224)
at org.cometd.common.AbstractClientSession$AbstractSessionChannel.notifyOnMessage(AbstractClientSession.java:500)
at org.cometd.common.AbstractClientSession.notifyListener(AbstractClientSession.java:267)
at org.cometd.client.BayeuxClient.notifyListeners(BayeuxClient.java:999)
at org.cometd.common.AbstractClientSession.receive(AbstractClientSession.java:241)
at org.cometd.client.BayeuxClient.failMessage(BayeuxClient.java:987)
at org.cometd.client.BayeuxClient.failMessages(BayeuxClient.java:962)
at org.cometd.client.BayeuxClient$PublishTransportListener.onFailure(BayeuxClient.java:1199)
at org.cometd.client.BayeuxClient$HandshakeTransportListener.onFailure(BayeuxClient.java:1248)
at org.cometd.client.transport.LongPollingTransport$2.onComplete(LongPollingTransport.java:279)
at org.eclipse.jetty.client.ResponseNotifier.notifyComplete(ResponseNotifier.java:193)
at org.eclipse.jetty.client.ResponseNotifier.notifyComplete(ResponseNotifier.java:185)
at org.eclipse.jetty.client.HttpReceiver.terminateResponse(HttpReceiver.java:446)
at org.eclipse.jetty.client.HttpReceiver.responseSuccess(HttpReceiver.java:393)
at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.messageComplete(HttpReceiverOverHTTP.java:265)
at org.eclipse.jetty.http.HttpParser.parseContent(HttpParser.java:1514)
at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:1272)
at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.parse(HttpReceiverOverHTTP.java:156)
at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.process(HttpReceiverOverHTTP.java:117)
at org.eclipse.jetty.client.http.HttpReceiverOverHTTP.receive(HttpReceiverOverHTTP.java:69)
at org.eclipse.jetty.client.http.HttpChannelOverHTTP.receive(HttpChannelOverHTTP.java:89)
at org.eclipse.jetty.client.http.HttpConnectionOverHTTP.onFillable(HttpConnectionOverHTTP.java:122)
at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)
at java.lang.Thread.run(Thread.java:745)

I'm not behind a proxy server and when i'm consulting the login history in SFDC, i can see a succesfull login.

My params for the LoginExample are "user password /topic/InvoiceStatementUpdates"

what am i doing wrong?

Execution Exception - com.salesforce.emp.connector.CannotSubscribe: Unable to subscribe to [/event/test_event__e:-2] [https://my.salesforce.com/cometd/39.0] : 403::Organization total events daily limit exceeded

Hi expertise,

We are invoking EMP Connector from a scheduler which runs after each 30 sec, only when previous executions/run is completed.
While subscribe to SF event bus from emp connector, we always query new reply id's based on last successful processed reply id. Still in SF logs we noticed that same replay id is requested multiple times within some milliseconds by same user {note: user is used by only one application}... can someone please assist me in what scenario this can happen ?

Thanks in advance..!

Security Token issue

Over the last 2 weeks, if I tried to use the EMP Connector, it continually failed to log in to my developer org. it seems that when a security token is added to the password in the DevLoginExample java call, the process instantly voids the security token.

My only solution is to add IP Ranges to my profile so the EMP Connector does no require a security token to log in. This is fine for demos - but obviously not acceptable for Production orgs.

Is it possible to swap the parser/deserializer?

I've run into some issues where the JSON parser has issues with nested objects, is it possible to swap out the parser that cometd is using in this abstraction? If not, it might make sense to allow the Consumer to take a String and use message.getJSON() instead to provide better handling there. Let me know if you want to go the string route, I'm happy to write a PR for that.

Invalid channel id

Hello,

I am following the Salesforce Trailhead to subscribe to Change Data Capture Event Channel using EMP-Connector. I run the following command in GIT bash but always get the error below. I have tried researching a solution but had not luck. Could anyone provide some direction?

Command run against a Handson Org
java -jar target/emp-connector-0.0.1-SNAPSHOT-phat.jar /data/AccountChangeEvent

Error
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" java.lang.IllegalArgumentException: Invalid channel id: C:/tools/Git/data/AccountChangeEvent
at org.cometd.bayeux.ChannelId.(ChannelId.java:57)
at org.cometd.client.BayeuxClient.newChannelId(BayeuxClient.java:460)
at org.cometd.common.AbstractClientSession.getChannel(AbstractClientSession.java:140)
at org.cometd.common.AbstractClientSession.getChannel(AbstractClientSession.java:126)
at com.salesforce.emp.connector.EmpConnector$SubscriptionImpl.subscribe(EmpConnector.java:87)
at com.salesforce.emp.connector.EmpConnector.subscribe(EmpConnector.java:212)
at com.salesforce.emp.connector.example.LoginExample.main(LoginExample.java:59)

Thanks for any help,
Neil

Getting 403::create_denied

Subscription for a single topic works successfully but when I try to subscribe to another topic, i get the the exception below :-

com.salesforce.emp.connector.CannotSubscribe: Unable to subscribe to [/topic/13q8_3zpqi_j7akbccq:-1] [https://ap4.salesforce.com/cometd/39.0] : 403:denied_by_security_policy:create_denied
	at com.salesforce.emp.connector.EmpConnector.lambda$subscribe$6(EmpConnector.java:378)

I am confused here why it is not allowing me to subscribe since I only have 11 topics in my developer salesforce account, subscribed to one of them and trying to subscribe to another. As per the documentation, maximum 20 subscriptions are allowed per topic.

When a topic's query is updated or a field is removed, it simply doesn't send anymore?

Is there anyway so that if a topic is updated or field is removed, that the program acknowledges this and refreshes itself? I presume the field removal is an inherent thing from within salesforce, but my plan was to keep this process on all the time and send the json data through JDBC to our database so i'm thinking of ways in which this process will break.

Or are people doing a method where they are continuously sending parameters with the java command based on which replayid has been parsed

Thank you.

Memory leak

There appears to be a memory leak. This is with three topics and on the resubscribe-on-disconnect branch.

emp-connector-resubscribe-leaks

fix reconnect issues

when reconnecting, the atomicboolean running gets set from true to false, but never back to true again (only the start method does this)
this means when getting 401 errors, if the disconnect messages fail, the client will never exit and will stay, meaning that after awhile you will exhaust your 64 connections

Report disconnection

As it was mention in another issue #4, the subscription becomes dead after a period of time without the client being notified. Is it possible to report disconnection so that the client can reconnect?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.