Code Monkey home page Code Monkey logo

clickhouse-scala-client's People

Contributors

aaabramov avatar acidflow avatar ashevc avatar ecyshor avatar elbvs avatar erdebee avatar f1yegor avatar lwolters avatar sjoerdmulder avatar toztemel avatar valericus avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

clickhouse-scala-client's Issues

Column condition / comparison doesnt allow for more than just references

Currently we cannot write
select(a) from b where (c = 0 and b = "true") >= d
in the DSL

We would have to be smart about the column combinator tokenizing where we make the tokenizer detect where the order of execution matters, and then let it insert brackets.

I suggest adding a property to the tokenizeCondition method in the ClickhouseTokenizer (eg wrapInBrackets: Boolean) which should be mandatory.. the callee can decide if brackets need to be around it by looking at the operator wich is being applied around it. Basically any non-monoidic operators would require braces

Multiquery setting

I am using your client and having hard time to figure out how to specify the multiquery option when trying to write multiple write sql queries with one call of client.executeRequest()

Im doing it this way

client.executeRequest(scripts.mkString("\n"), new QuerySettings(settings = Map("multiquery" -> "true")))

Also I tried to specify this option .withValue("crobox.clickhouse.client.settings.custom.multiquery", ConfigValueFactory.fromAnyRef("true"))
But nothing seems to work and both of those attempts return error from clickhouse
com.crobox.clickhouse.ClickhouseException: Server [my host] returned code 404 Not Found; Code: 115. DB::Exception: Setting multiquery is neither a builtin setting nor started with the prefix 'SQL_' registered for user-defined settings. (UNKNOWN_SETTING) (version 23.6.2.18 (official build)

Offering clickhouse-scala-client under another LICENSE

Hi team,

I am working with integrating our scala runtime with Clickhouse data sources and came across your work. I would love to prototype with it for production use cases but am prohibited from doing so because of the GPL3 licensing.

Would your team be open to publishing this repository under a different license? I know this is not a light request, and I understand if this is not something you all might be comfortable with. I just figured it would be worth asking.

I hope it goes without saying that I like your work and wish you all well. Thank you!

can u show me more usage examples?

I import your jar package in my project.
BUT how can I use your project? IT makes me confused.
Firstly I think it is something like jdbc driver , but it turns out I am wrong.

How to increase performance?

Hello, i am using the clickhouse-client for consumer from apache pulsar.
i think i'm doing something wrong. I configure the application.cong and overwrite the indexer options, but the batching it doesn't seem to work since the values ​​are inserted in clickhouse of in batches of 50 rows.

now i'm using the next code

sink.runWith(Source.single(Insert("prueba.con", new String(msg.getData))))

what I hope is that messages accumulate until the batch or flush interval is fulfilled

Client doesn't work with latest release of Clickhouse: 20.9.2.20

Code
val config: Config
val client: ClickhouseClient = new ClickhouseClient(Option(config))

client.query("SELECT 1").map(result => print(result))

The above code is working with version: 20.3.12.112

Got the following Exception:

09:30:43,800 INFO [ClickhouseClient] Starting Clickhouse Client connecting to SingleHostBalancer(http://localhost:8123)
09:30:44,979 WARN [ClickhouseClient] Query execution exception while executing idempotent query, retries left: 3
com.crobox.clickhouse.ClickhouseException: Server [http://localhost:8123] returned code 404 Not Found; Code: 115, e.displayText() = DB::Exception: Unknown setting profile (version 20.9.2.20 (official build))
, query SELECT 1
at com.crobox.clickhouse.internal.ClickhouseResponseParser.$anonfun$processClickhouseResponse$4(ClickhouseResponseParser.scala:51)
at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307)
at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:92)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:92)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
09:30:45,000 WARN [ClickhouseClient] Query execution exception while executing idempotent query, retries left: 2

HTTPS support

Hi,

I was wondering how I configure the client to use https instead of http?
It's needed to communicate with the new ClickHouse cloud offering, which means my requests go over the open internet.

I don't see any options for it in the config.

DSL v2 doesn't fully implement equality

having the following test-case you could expect that this tests passes:

val first = (2 === 2).and(3 === 3)
val second = (2 === 2).and(3 === 3)
first should be(second)

but somehow it seems object equality is not implemented fully in the new DSL v2 and fails with a

LogicalFunction(com.crobox.clickhouse.dsl.column.Magnets$$anon$36@62a60260,And,com.crobox.clickhouse.dsl.column.Magnets$$anon$36@664f03cb) was not equal to LogicalFunction(com.crobox.clickhouse.dsl.column.Magnets$$anon$36@30c9e27,And,com.crobox.clickhouse.dsl.column.Magnets$$anon$36@72663249)

Re-aliasing an aliased column doesnt work

When I want to do the following:

SELECT from_pv as from_start FROM (
  SELECT from as from_pv FROM report.pages
)

I would probably do it like this:

val from_pv = Pages.from as "from_pv"

select(from_pv as "from_start").from(
  select(from_pv).from(report.pages)
)

but that is currently causing below query:

SELECT from as  from_pv as from_start FROM (
  SELECT from as from_pv FROM report.pages
)

Insert sink stops working after one bad insert

Hi, guys, thanks for this lib, it's very useful.

Meet some issues:

  1. Use case: we send one invalid message to CH and insertions fail. Then we try to insert valid message in CH.
    What I expect: insertion of first message failed and valid message inserted successfully.
    What I get: after first failed insert insertion pipeline stop working and we can not load data (even valid) from that moment

The reason for it is in com.crobox.clickhouse.stream.ClickhouseSink in method insertSink:

If in recover block we will return Seq.empty, everything will work fine and after one bad insert will not break whole pipeline.

  1. It would be nice to have an opportunity to pass to insertSink function for processing bad inserts, that would be called in recover section (in the code section above). It would be usefull, for example, for custom logging

Second issue is an optional feature, but first one is a big problem, imho.

Thanks :)

Not sure how to use the client

Hello,

I tried to copy paste the example from the readme, but it turns out that it's not enough.
I hardly suggest to update the doc and put at least one fully working example.

Right now, I'am able to run the program, but nothing special happens, it seems that some thread are hanging as the program does not exit at end of main:

val config: Config = ConfigFactory.parseString(
    """
      |crobox.clickhouse.client {
      |    connection: {
      |        type = "single-host",
      |        host = "localhost",
      |        port = 8123
      |    }
      |    maximum-frame-length = 100000
      |    retries = 3
      |}
      |""".stripMargin)
      
  implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global

  val client = new ClickhouseClient(Some(config))
  client.query("SEECT 1 + 1").map(result => {
    println(s"Got query result $result")
  })

This code don't print anything

Release Scala 3 artefact

Hi team,

I've seen that scala 3 support was merged 2 months ago, but publish was skipped.

Is there any reason for that?
When do you plan to release client with Scala 3 support?

Thanks

Proper support for insert statement

ClickhouseQueryExecutor offers "def insert",
but we currently do not support insert statements in the DSL.
Should not be to much work to build, and probably form a solid use-case for many users of the DSL.

can you help me run my app

Even now I still can not make my app running~~
Can you show me a little paragraph of code or a tiny app ?
I am new in scala. I am a pythoner before......

help me. Thanks.

2018-11-14 9 47 33

How to Use the sink function? Thank You.

my table:
CREATE TABLE default.dmp_rtup3 (message String) ENGINE = Memory()

I use the example in wiki to test:
val client: ClickhouseClient
client.sink("INSERT INTO my_table", Source.single(ByteString("el1"))).map(result => println(result))

but i failed:
Server [ht] returned code 400 Bad Request; Code: 62, e.displayText() = DB::Exception: Syntax error: failed at position 1 (line 1, col 1): INSERTR INTO ads_dmp3
el1. Expected one of: ATTACH, DETACH, DROP, SHOW, SELECT, WITH, KILL, TRUNCATE, DESC, DESCRIBE, SYSTEM query, SELECT subquery, list of elements, ALTER query, ALTER TABLE, EXISTS, CREATE TABLE or ATTACH TABLE query, Query with output, SHOW PROCESSLIST query, SHOW PROCESSLIST, RENAME query, RENAME TABLE, SELECT query, possibly with UNION, SET query, SHOW [TEMPORARY] TABLES|DATABASES [[NOT] LIKE 'str'], EXISTS or SHOW CREATE query, SELECT query, subquery, possibly with UNION, USE query, CHECK TABLE, DESCRIBE query, DROP query, INSERT query, INSERT INTO, KILL QUERY query, OPTIMIZE query, OPTIMIZE TABLE, SELECT query, CREATE, SET, USE, Query (version 19.13.2.19)
, query INSERTR INTO ads_dmp3

at com.crobox.clickhouse.internal.ClickhouseResponseParser$$anonfun$processClickhouseResponse$1$$anonfun$apply$4.apply(ClickhouseResponseParser.scala:51)
at com.crobox.clickhouse.internal.ClickhouseResponseParser$$anonfun$processClickhouseResponse$1$$anonfun$apply$4.apply(ClickhouseResponseParser.scala:49)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:253)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:92)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:92)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:92)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:49)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Add algebraic operations between dates

We have a number of operations which can be executed between dates and numbers, but no support to subtract two dates. This should produce a column of int as result.

Typechecking of the "as" keyword doesn't regard return type of operations on columns

When i have a table

class Table{
  val id: Int
  val enabled: Boolean
}

I set up a temporary column
val enabledCount = NativeColumn("enabled_count")[Int]

And a query
select(sum(enabled) as enabledCount).from(Table)

The compiler complains, that the "right" column of type "Int" doesn't match the expected type "Boolean"

In the meanwhile i get around this problem with
select(sum(enabled) as enabledCount.name).from(Table)

streaming raw and decompression

Thanks for your jobs!

I found an improvement on streaming raw result query, in which way the lib wound not decompress the compressed data, and hiding the HTTP headers from user who can never get the 'Content-Encoding' to decompress the data manaully.

image

Get the message failing

Hello, is there a possibility to retrieve the failed message?

Now, i am using recoverWith and I get it to send me the error message with the messages that have failed in a string, but it is not useful to me to retrieve the message.

I am thinking in send id of message in de Insert block, and then put this how x-ClickHouse-Query-Id, and recover and then send it again as an error in case the message is not sent
this is the code:

Await.result(source.runWith(sink).recoverWith{
case ex :RuntimeException => Future.successful(println(ex.getCause))
}, 10.seconds)

Exist a best posibility for get the message what fails?

Clickhouse sink stream request to clickhouse instead of buffering in memory

Currently the ClickhouseSink uses groupWithin to buffer the items which need to be inserted into clickhouse. This causes a larger memory footprint and also bursts of data being sent to clickhouse.

We should basically rotate the stream to a different http sink whenever the batch size or batch timeout has been reached.

Queue is full deadlock because of Healthchecker

If the queue is full and all hosts are unhealthy then the client can never recover. e.g.

com.crobox.clickhouse.balancing.discovery.health.HostHealthChecker throws
Host http://192.168.x.x:8123 status is DEAD because of exception
with stacktrace

java.lang.RuntimeException: Queue is full
	at com.crobox.clickhouse.internal.ClickHouseExecutor.$anonfun$singleRequest$1(ClickHouseExecutor.scala:50)
	at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:303)
	at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:37)
	at scala.concurrent.impl.CallbackRunnable.run_aroundBody0(Promise.scala:60)
	at scala.concurrent.impl.CallbackRunnable$AjcClosure1.run(Promise.scala:1)
	at org.aspectj.runtime.reflect.JoinPointImpl.proceed(JoinPointImpl.java:149)

Only way to resolve this seems to be to restart the service currently

Clickhouse Internal Error when data contains "DB::Exception"

Our data can contains "DB::Exception". For example - search history in google.

When query this data we have:
com.crobox.clickhouse.ClickhouseException: Clickhouse Internal Error

Maybe is there a better way to handle errors in clickhouse?

Possible memory leak in queryWithProgress

Running the following example, with "-Xmx512m" will cause a java.lang.OutOfMemoryError: Java heap space within a minute:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.crobox.clickhouse.ClickhouseClient
import com.typesafe.config.ConfigFactory

import scala.concurrent.Future
import scala.concurrent.duration._

object MemoryLeakApp extends App {

  implicit val system = ActorSystem("memory")
  implicit val materializer = ActorMaterializer()
  import system.dispatcher
  val size = 1024 * 1024
  val clickhouseClient = new ClickhouseClient(ConfigFactory.parseString(
    """
      |crobox.clickhouse.client {
      |    connection {
      |        type = "single-host",
      |        host = "localhost",
      |        port = 8123
      |    }
      |}
    """.stripMargin).withFallback(ConfigFactory.defaultReference()))

  def execute(): Unit = {
    val source = Source.fromFutureSource(Future {
      s"SELECT * FROM system.numbers LIMIT $size"
    }.map(query =>
        clickhouseClient.queryWithProgress(query)
    )).mapMaterializedValue(_.flatten)
    Sink.ignore.runWith(source).foreach(response =>
      println("Received response with", response.length)
    )
  }
  system.scheduler.schedule(0.seconds, 1.seconds)({execute()})

}

Somehow using the Source.fromFutureSource with .mapMaterializedValue(_.flatten) causes the issue to appear...

Query streaming progress should publish the result as streaming parts

Currently we offer progress sources which contain only the query status and the query result is returned as the materialized value of the source.
In certain cases we want the returned body to be streamed in the same source, maybe as a different QueryProgress case class. This is especially imported for queries which return a lot of data as we currently buffer the entire body result in memory by draining the body source when running with progress.

Create `Type mapped column`

The DSL uses strict typing now, so implementing custom types doesn't work anymore. We need to implement a column type that is internally represented as the clickhouse column type and outwards exposed as the custom type

Proposed signature:

class TypeMappedColumn[E,V](val name: String)(implicit val ev: QueryValue[E]) extends NativeColumn[V]

Not sure about the usefulness of the implicit queryval parameter here at this time.. we just know it MUST be defined, and this forces it and serves as a way to access it for query result parsers.

Add support for with clause

Could not find support for WITH clause
such as:

/* this example would return TOP 10 of most huge tables */
WITH
    (
        SELECT sum(bytes)
        FROM system.parts
        WHERE active
    ) AS total_disk_usage
SELECT
    (sum(bytes) / total_disk_usage) * 100 AS table_disk_usage,
    table
FROM system.parts
GROUP BY table
ORDER BY table_disk_usage DESC
LIMIT 10;

Unable to use 'cluster-aware' connection type with cluster

I am trying to use Crobox Clickhouse Client along with Clickhouse cluster configuration(2 shards with 2 replicas).
When Clickhouse Client is starting, I am getting the error saying that Clickhouse Client is not able to connect to the host 127.0.0.1. After looking into the source code of ClusterConnectionFlow it turned out that client is sending the following query to some Clickhouse replica: SELECT host_address FROM system.clusters WHERE cluster='$cluster' which in fact returns the following result:
127.0.0.1
10.244.106.15
10.244.5.26
10.244.3.2

The fact that 127.0.0.1 is returned is expected behaviour of CH cluster according to the answers from CH team from yandex: the replica has loopback address as host_address for itself.

How should I use crobox client for cluster setup then?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.