crobox / clickhouse-scala-client Goto Github PK
View Code? Open in Web Editor NEWClickhouse Scala Client with Reactive Streams support
License: GNU Lesser General Public License v3.0
Clickhouse Scala Client with Reactive Streams support
License: GNU Lesser General Public License v3.0
Users now have to import a bunch of types that are always needed when defining a schema from seperate packages.
We should bundle them in one package import to make the DSL more user-friendly.
Add support for ddl in the cluster
Currently we cannot write
select(a) from b where (c = 0 and b = "true") >= d
in the DSL
We would have to be smart about the column combinator tokenizing where we make the tokenizer detect where the order of execution matters, and then let it insert brackets.
I suggest adding a property to the tokenizeCondition
method in the ClickhouseTokenizer (eg wrapInBrackets: Boolean
) which should be mandatory.. the callee can decide if brackets need to be around it by looking at the operator wich is being applied around it. Basically any non-monoidic operators would require braces
I am using your client and having hard time to figure out how to specify the multiquery
option when trying to write multiple write sql queries with one call of client.executeRequest()
Im doing it this way
client.executeRequest(scripts.mkString("\n"), new QuerySettings(settings = Map("multiquery" -> "true")))
Also I tried to specify this option .withValue("crobox.clickhouse.client.settings.custom.multiquery", ConfigValueFactory.fromAnyRef("true"))
But nothing seems to work and both of those attempts return error from clickhouse
com.crobox.clickhouse.ClickhouseException: Server [my host] returned code 404 Not Found; Code: 115. DB::Exception: Setting multiquery is neither a builtin setting nor started with the prefix 'SQL_' registered for user-defined settings. (UNKNOWN_SETTING) (version 23.6.2.18 (official build)
Currently when using the batch actor to stream insert data, if one entry in the batch causes the insert to fail then the entire batch is discarded, which leads to data loss.
We should try to split the batch in multiple subbatches
and try to insert those, and split any failures again up to some configured threshold.
Hi team,
I am working with integrating our scala runtime with Clickhouse data sources and came across your work. I would love to prototype with it for production use cases but am prohibited from doing so because of the GPL3 licensing.
Would your team be open to publishing this repository under a different license? I know this is not a light request, and I understand if this is not something you all might be comfortable with. I just figured it would be worth asking.
I hope it goes without saying that I like your work and wish you all well. Thank you!
I import your jar package in my project.
BUT how can I use your project? IT makes me confused.
Firstly I think it is something like jdbc driver , but it turns out I am wrong.
Hello, i am using the clickhouse-client for consumer from apache pulsar.
i think i'm doing something wrong. I configure the application.cong and overwrite the indexer options, but the batching it doesn't seem to work since the values ββare inserted in clickhouse of in batches of 50 rows.
now i'm using the next code
sink.runWith(Source.single(Insert("prueba.con", new String(msg.getData))))
what I hope is that messages accumulate until the batch or flush interval is fulfilled
Code
val config: Config
val client: ClickhouseClient = new ClickhouseClient(Option(config))
client.query("SELECT 1").map(result => print(result))
The above code is working with version: 20.3.12.112
Got the following Exception:
09:30:43,800 INFO [ClickhouseClient] Starting Clickhouse Client connecting to SingleHostBalancer(http://localhost:8123)
09:30:44,979 WARN [ClickhouseClient] Query execution exception while executing idempotent query, retries left: 3
com.crobox.clickhouse.ClickhouseException: Server [http://localhost:8123] returned code 404 Not Found; Code: 115, e.displayText() = DB::Exception: Unknown setting profile (version 20.9.2.20 (official build))
, query SELECT 1
at com.crobox.clickhouse.internal.ClickhouseResponseParser.$anonfun$processClickhouseResponse$4(ClickhouseResponseParser.scala:51)
at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307)
at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:92)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:92)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
09:30:45,000 WARN [ClickhouseClient] Query execution exception while executing idempotent query, retries left: 2
Hi,
I was wondering how I configure the client to use https instead of http?
It's needed to communicate with the new ClickHouse cloud offering, which means my requests go over the open internet.
I don't see any options for it in the config.
having the following test-case you could expect that this tests passes:
val first = (2 === 2).and(3 === 3)
val second = (2 === 2).and(3 === 3)
first should be(second)
but somehow it seems object equality is not implemented fully in the new DSL v2 and fails with a
LogicalFunction(com.crobox.clickhouse.dsl.column.Magnets$$anon$36@62a60260,And,com.crobox.clickhouse.dsl.column.Magnets$$anon$36@664f03cb) was not equal to LogicalFunction(com.crobox.clickhouse.dsl.column.Magnets$$anon$36@30c9e27,And,com.crobox.clickhouse.dsl.column.Magnets$$anon$36@72663249)
Currently we are doing some basic calculations to split data in time series for timestamps, but recently clickhouse release a number of relative time functions which would help. We should update to use the functions directly
When I want to do the following:
SELECT from_pv as from_start FROM (
SELECT from as from_pv FROM report.pages
)
I would probably do it like this:
val from_pv = Pages.from as "from_pv"
select(from_pv as "from_start").from(
select(from_pv).from(report.pages)
)
but that is currently causing below query:
SELECT from as from_pv as from_start FROM (
SELECT from as from_pv FROM report.pages
)
Hi, guys, thanks for this lib, it's very useful.
Meet some issues:
The reason for it is in com.crobox.clickhouse.stream.ClickhouseSink in method insertSink:
If in recover block we will return Seq.empty, everything will work fine and after one bad insert will not break whole pipeline.
Second issue is an optional feature, but first one is a big problem, imho.
Thanks :)
Hello,
I tried to copy paste the example from the readme, but it turns out that it's not enough.
I hardly suggest to update the doc and put at least one fully working example.
Right now, I'am able to run the program, but nothing special happens, it seems that some thread are hanging as the program does not exit at end of main:
val config: Config = ConfigFactory.parseString(
"""
|crobox.clickhouse.client {
| connection: {
| type = "single-host",
| host = "localhost",
| port = 8123
| }
| maximum-frame-length = 100000
| retries = 3
|}
|""".stripMargin)
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
val client = new ClickhouseClient(Some(config))
client.query("SEECT 1 + 1").map(result => {
println(s"Got query result $result")
})
This code don't print anything
Any plans about it?
When tokenizing a HAVING clause, I noticed a bug resulting in the aggregation function to be stripped from the resulting query.
This is probably due to ClickhouseTokenizerModule::tokenizeCondition()
.
Currently its not possible to use PREWHERE, a usefull part of select statements
Hi team,
I've seen that scala 3 support was merged 2 months ago, but publish was skipped.
Is there any reason for that?
When do you plan to release client with Scala 3 support?
Thanks
ClickhouseQueryExecutor offers "def insert",
but we currently do not support insert statements in the DSL.
Should not be to much work to build, and probably form a solid use-case for many users of the DSL.
my table:
CREATE TABLE default.dmp_rtup3 (message
String) ENGINE = Memory()
I use the example in wiki to test:
val client: ClickhouseClient
client.sink("INSERT INTO my_table", Source.single(ByteString("el1"))).map(result => println(result))
but i failed:
Server [ht] returned code 400 Bad Request; Code: 62, e.displayText() = DB::Exception: Syntax error: failed at position 1 (line 1, col 1): INSERTR INTO ads_dmp3
el1. Expected one of: ATTACH, DETACH, DROP, SHOW, SELECT, WITH, KILL, TRUNCATE, DESC, DESCRIBE, SYSTEM query, SELECT subquery, list of elements, ALTER query, ALTER TABLE, EXISTS, CREATE TABLE or ATTACH TABLE query, Query with output, SHOW PROCESSLIST query, SHOW PROCESSLIST, RENAME query, RENAME TABLE, SELECT query, possibly with UNION, SET query, SHOW [TEMPORARY] TABLES|DATABASES [[NOT] LIKE 'str'], EXISTS or SHOW CREATE query, SELECT query, subquery, possibly with UNION, USE query, CHECK TABLE, DESCRIBE query, DROP query, INSERT query, INSERT INTO, KILL QUERY query, OPTIMIZE query, OPTIMIZE TABLE, SELECT query, CREATE, SET, USE, Query (version 19.13.2.19)
, query INSERTR INTO ads_dmp3
at com.crobox.clickhouse.internal.ClickhouseResponseParser$$anonfun$processClickhouseResponse$1$$anonfun$apply$4.apply(ClickhouseResponseParser.scala:51)
at com.crobox.clickhouse.internal.ClickhouseResponseParser$$anonfun$processClickhouseResponse$1$$anonfun$apply$4.apply(ClickhouseResponseParser.scala:49)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:253)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:92)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:92)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:92)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
We have a number of operations which can be executed between dates and numbers, but no support to subtract two dates. This should produce a column of int as result.
When i have a table
class Table{
val id: Int
val enabled: Boolean
}
I set up a temporary column
val enabledCount = NativeColumn("enabled_count")[Int]
And a query
select(sum(enabled) as enabledCount).from(Table)
The compiler complains, that the "right" column of type "Int" doesn't match the expected type "Boolean"
In the meanwhile i get around this problem with
select(sum(enabled) as enabledCount.name).from(Table)
Hello, is there a possibility to retrieve the failed message?
Now, i am using recoverWith and I get it to send me the error message with the messages that have failed in a string, but it is not useful to me to retrieve the message.
I am thinking in send id of message in de Insert block, and then put this how x-ClickHouse-Query-Id, and recover and then send it again as an error in case the message is not sent
this is the code:
Await.result(source.runWith(sink).recoverWith{
case ex :RuntimeException => Future.successful(println(ex.getCause))
}, 10.seconds)
Exist a best posibility for get the message what fails?
Currently the ClickhouseSink
uses groupWithin
to buffer the items which need to be inserted into clickhouse. This causes a larger memory footprint and also bursts of data being sent to clickhouse.
We should basically rotate the stream to a different http sink whenever the batch size or batch timeout has been reached.
If the queue is full and all hosts are unhealthy then the client can never recover. e.g.
com.crobox.clickhouse.balancing.discovery.health.HostHealthChecker
throws
Host http://192.168.x.x:8123 status is DEAD because of exception
with stacktrace
java.lang.RuntimeException: Queue is full
at com.crobox.clickhouse.internal.ClickHouseExecutor.$anonfun$singleRequest$1(ClickHouseExecutor.scala:50)
at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:303)
at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:37)
at scala.concurrent.impl.CallbackRunnable.run_aroundBody0(Promise.scala:60)
at scala.concurrent.impl.CallbackRunnable$AjcClosure1.run(Promise.scala:1)
at org.aspectj.runtime.reflect.JoinPointImpl.proceed(JoinPointImpl.java:149)
Only way to resolve this seems to be to restart the service currently
Our data can contains "DB::Exception". For example - search history in google.
When query this data we have:
com.crobox.clickhouse.ClickhouseException: Clickhouse Internal Error
Maybe is there a better way to handle errors in clickhouse?
Running the following example, with "-Xmx512m" will cause a java.lang.OutOfMemoryError: Java heap space
within a minute:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.crobox.clickhouse.ClickhouseClient
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
import scala.concurrent.duration._
object MemoryLeakApp extends App {
implicit val system = ActorSystem("memory")
implicit val materializer = ActorMaterializer()
import system.dispatcher
val size = 1024 * 1024
val clickhouseClient = new ClickhouseClient(ConfigFactory.parseString(
"""
|crobox.clickhouse.client {
| connection {
| type = "single-host",
| host = "localhost",
| port = 8123
| }
|}
""".stripMargin).withFallback(ConfigFactory.defaultReference()))
def execute(): Unit = {
val source = Source.fromFutureSource(Future {
s"SELECT * FROM system.numbers LIMIT $size"
}.map(query =>
clickhouseClient.queryWithProgress(query)
)).mapMaterializedValue(_.flatten)
Sink.ignore.runWith(source).foreach(response =>
println("Received response with", response.length)
)
}
system.scheduler.schedule(0.seconds, 1.seconds)({execute()})
}
Somehow using the Source.fromFutureSource
with .mapMaterializedValue(_.flatten)
causes the issue to appear...
Currently we offer progress sources which contain only the query status and the query result is returned as the materialized value of the source.
In certain cases we want the returned body to be streamed in the same source, maybe as a different QueryProgress
case class. This is especially imported for queries which return a lot of data as we currently buffer the entire body result in memory by draining the body source when running with progress.
The DSL uses strict typing now, so implementing custom types doesn't work anymore. We need to implement a column type that is internally represented as the clickhouse column type and outwards exposed as the custom type
Proposed signature:
class TypeMappedColumn[E,V](val name: String)(implicit val ev: QueryValue[E]) extends NativeColumn[V]
Not sure about the usefulness of the implicit queryval parameter here at this time.. we just know it MUST be defined, and this forces it and serves as a way to access it for query result parsers.
Could not find support for WITH clause
such as:
/* this example would return TOP 10 of most huge tables */
WITH
(
SELECT sum(bytes)
FROM system.parts
WHERE active
) AS total_disk_usage
SELECT
(sum(bytes) / total_disk_usage) * 100 AS table_disk_usage,
table
FROM system.parts
GROUP BY table
ORDER BY table_disk_usage DESC
LIMIT 10;
I am trying to use Crobox Clickhouse Client along with Clickhouse cluster configuration(2 shards with 2 replicas).
When Clickhouse Client is starting, I am getting the error saying that Clickhouse Client is not able to connect to the host 127.0.0.1. After looking into the source code of ClusterConnectionFlow
it turned out that client is sending the following query to some Clickhouse replica: SELECT host_address FROM system.clusters WHERE cluster='$cluster'
which in fact returns the following result:
127.0.0.1
10.244.106.15
10.244.5.26
10.244.3.2
The fact that 127.0.0.1 is returned is expected behaviour of CH cluster according to the answers from CH team from yandex: the replica has loopback address as host_address
for itself.
How should I use crobox client for cluster setup then?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
π Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. πππ
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google β€οΈ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.