Code Monkey home page Code Monkey logo

Comments (5)

dai-chen avatar dai-chen commented on August 23, 2024

Need this backoff retry capability on client instance returned from FlintClient.createClient(). This is used by Flint REPL job, Flint data source and Flint index data/metadata. We may wrap the client created and intercept method can be retried.

from opensearch-spark.

kaituo avatar kaituo commented on August 23, 2024

related stack trace when OSClient.doesIndexExist meets connection exception:

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:143) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:52) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:89) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.opensearch.flint.spark.ppl.FlintSparkPPLParser.parsePlan(FlintSparkPPLParser.scala:62) ~[ppl-spark-integration-assembly-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
	at org.opensearch.flint.spark.sql.FlintSparkSqlParser.$anonfun$parsePlan$1(FlintSparkSqlParser.scala:64) ~[flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
	at org.opensearch.flint.spark.sql.FlintSparkSqlParser.parse(FlintSparkSqlParser.scala:104) ~[flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
	at org.opensearch.flint.spark.sql.FlintSparkSqlParser.parsePlan(FlintSparkSqlParser.scala:56) ~[flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
	at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:620) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) ~[spark-catalyst_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:620) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.sql.FlintJobExecutor.executeQuery(FlintJobExecutor.scala:344) ~[sql-job-assembly-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
	at org.apache.spark.sql.FlintJobExecutor.executeQuery$(FlintJobExecutor.scala:334) ~[sql-job-assembly-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
	at org.apache.spark.sql.FlintJob$.executeQuery(FlintJob.scala:39) ~[sql-job-assembly-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
	at org.apache.spark.sql.FlintJob$.main(FlintJob.scala:66) ~[sql-job-assembly-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
	at org.apache.spark.sql.FlintJob.main(FlintJob.scala) ~[sql-job-assembly-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1006) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1095) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1104) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
Exception in thread "main" java.lang.IllegalStateException: Failed to check if index query_results2 exists
	at org.apache.spark.sql.OSClient.$anonfun$doesIndexExist$1(OSClient.scala:151)
	at org.apache.spark.sql.OSClient.$anonfun$doesIndexExist$1$adapted(OSClient.scala:145)
	at org.apache.spark.sql.OSClient.using(OSClient.scala:99)
	at org.apache.spark.sql.OSClient.doesIndexExist(OSClient.scala:145)
	at org.apache.spark.sql.FlintJobExecutor.writeDataFrameToOpensearch(FlintJobExecutor.scala:109)
	at org.apache.spark.sql.FlintJobExecutor.writeDataFrameToOpensearch$(FlintJobExecutor.scala:105)
	at org.apache.spark.sql.FlintJob$.writeDataFrameToOpensearch(FlintJob.scala:39)
	at org.apache.spark.sql.FlintJob$.$anonfun$main$3(FlintJob.scala:86)
	at org.apache.spark.sql.FlintJob$.$anonfun$main$3$adapted(FlintJob.scala:86)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.FlintJob$.main(FlintJob.scala:86)
	at org.apache.spark.sql.FlintJob.main(FlintJob.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1006)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1095)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1104)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.ConnectException: Timeout connecting to [search-managed-flint-os-1-yptv4jzmlqwmltxje42bplwj2a.eu-west-1.es.amazonaws.com/3.248.31.31:443]
	at org.opensearch.client.RestClient.extractAndWrapCause(RestClient.java:953)
	at org.opensearch.client.RestClient.performRequest(RestClient.java:332)
	at org.opensearch.client.RestClient.performRequest(RestClient.java:320)
	at org.opensearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1911)
	at org.opensearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1894)
	at org.opensearch.client.IndicesClient.exists(IndicesClient.java:920)
	at org.apache.spark.sql.OSClient.$anonfun$doesIndexExist$1(OSClient.scala:148)
	... 23 more
Caused by: java.net.ConnectException: Timeout connecting to [search-....eu-west-1.es.amazonaws.com/3.248.31.31:443]
	at org.apache.http.nio.pool.RouteSpecificPool.timeout(RouteSpecificPool.java:169)
	at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:632)
	at org.apache.http.nio.pool.AbstractNIOConnPool$InternalSessionRequestCallback.timeout(AbstractNIOConnPool.java:898)
	at org.apache.http.impl.nio.reactor.SessionRequestImpl.timeout(SessionRequestImpl.java:198)
	at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processTimeouts(DefaultConnectingIOReactor.java:213)
	at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:158)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
	at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
	at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
	at java.base/java.lang.Thread.run(Thread.java:833)

from opensearch-spark.

penghuo avatar penghuo commented on August 23, 2024

Retry if it is ThrottlingException

from opensearch-spark.

kaituo avatar kaituo commented on August 23, 2024

@dai-chen We also need to handle circuit breaker exception from OpenSearch. Streaming job will exit if that happens now.

23/11/17 17:55:32 INFO BlockManagerInfo: Removed broadcast_4_piece0 on [2a05:d018:1af:5c00:d15a:44ca:deac:2c79]:44219 in memory (size: 19.7 KiB, free: 7.9 GiB)
23/11/17 17:55:32 ERROR MicroBatchExecution: Query flint_my_glue1_default_http_logs_plain_skipping_index [id = a5c5aecb-82cc-41cf-b76f-f7860d552148, runId = 414d1f01-7c67-4ed1-a36f-0d336a30d884] terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: Authorized committer (attemptNumber=0, stage=4, partition=41) failed; but task commit success, data duplication may happen. reason=ExceptionFailure(org.opensearch.OpenSearchStatusException,OpenSearch exception [type=circuit_breaking_exception, reason=[parent] Data too large, data for [<http_request>] would be [8290212852/7.7gb], which is larger than the limit of [8160437862/7.5gb], real usage: [8265203952/7.6gb], new bytes reserved: [25008900/23.8mb], usages [request=0/0b, fielddata=0/0b, in_flight_requests=1777890674/1.6gb]],[Ljava.lang.StackTraceElement;@4f42c51e,OpenSearchStatusException[OpenSearch exception [type=circuit_breaking_exception, reason=[parent] Data too large, data for [<http_request>] would be [8290212852/7.7gb], which is larger than the limit of [8160437862/7.5gb], real usage: [8265203952/7.6gb], new bytes reserved: [25008900/23.8mb], usages [request=0/0b, fielddata=0/0b, in_flight_requests=1777890674/1.6gb]]]
	at org.opensearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:207)
	at org.opensearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2228)
	at org.opensearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2205)
	at org.opensearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1924)
	at org.opensearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1877)
	at org.opensearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1845)
	at org.opensearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:364)
	at org.opensearch.flint.core.storage.OpenSearchWriter.flush(OpenSearchWriter.java:61)
	at com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.flush(WriterBasedJsonGenerator.java:967)
	at org.apache.spark.sql.flint.json.FlintJacksonGenerator.flush(FlintJacksonGenerator.scala:258)
	at org.apache.spark.sql.flint.FlintPartitionWriter.commit(FlintPartitionWriter.scala:70)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:482)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:563)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:566)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
	Suppressed: org.opensearch.client.ResponseException: method [POST], host [https://search-....eu-west-1.es.amazonaws.com], URI [/_bulk?refresh=wait_for&timeout=1m], status line [HTTP/1.1 429 429]
{"error":{"root_cause":[{"type":"circuit_breaking_exception","reason":"[parent] Data too large, data for [<http_request>] would be [8290212852/7.7gb], which is larger than the limit of [8160437862/7.5gb], real usage: [8265203952/7.6gb], new bytes reserved: [25008900/23.8mb], usages [request=0/0b, fielddata=0/0b, in_flight_requests=1777890674/1.6gb]","bytes_wanted":8290212852,"bytes_limit":8160437862,"durability":"TRANSIENT"}],"type":"circuit_breaking_exception","reason":"[parent] Data too large, data for [<http_request>] would be [8290212852/7.7gb], which is larger than the limit of [8160437862/7.5gb], real usage: [8265203952/7.6gb], new bytes reserved: [25008900/23.8mb], usages [request=0/0b, fielddata=0/0b, in_flight_requests=1777890674/1.6gb]","bytes_wanted":8290212852,"bytes_limit":8160437862,"durability":"TRANSIENT"},"status":429}
		at org.opensearch.client.RestClient.convertResponse(RestClient.java:375)
		at org.opensearch.client.RestClient.performRequest(RestClient.java:345)
		at org.opensearch.client.RestClient.performRequest(RestClient.java:320)
		at org.opensearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1911)
		... 22 more

from opensearch-spark.

dai-chen avatar dai-chen commented on August 23, 2024

Sure, feel free to keep all exceptions we've seen so far. We can decide what's the best approach in our case together. Will figure out how to try-catch the HTTP call logic in OS client first.

from opensearch-spark.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.