thomasnield / rxkotlin-jdbc Goto Github PK
View Code? Open in Web Editor NEWFluent RxJava JDBC extension functions for Kotlin
License: Apache License 2.0
Fluent RxJava JDBC extension functions for Kotlin
License: Apache License 2.0
Just created a new request for a new deployment:
Example 1:
conn.select("SELECT * FROM USER WHERE ID = :id AND FLAG = :flag")
.setInt("id", 4)
.setBoolean("flag", true)
.toFlowable { it.getInt("ID") to it.getString("USERNAME") }
.subscribe(::println)
Example 2:
conn.select("SELECT * FROM USER WHERE ID = :id AND FLAG = :flag")
.setParam("id", 4)
.setParam("flag", true)
.toFlowable { it.getInt("ID") to it.getString("USERNAME") }
.subscribe(::println)
Example 3:
conn.select("SELECT * FROM USER WHERE ID = :id AND FLAG = :flag")
.setParams("id" to 4, "flag" to true)
.toFlowable { it.getInt("ID") to it.getString("USERNAME") }
.subscribe(::println)
fun ResultSet.asList() = (1..this.metaData.columnCount).asSequence().map {
this.getObject(it)
}.toList()
fun ResultSet.asMap() = (1..this.metaData.columnCount).asSequence().map {
metaData.getColumnName(it) to getObject(it)
}.toMap()
Hi,
There are plans to migrate to rxkotlin 3 and Kotlin 4.1.10 ?
Thank you
This function inside PreparedStatementBuilder
:
fun parameters(vararg parameters: Any?) {
furtherOps += { it.processParameters(parameters) }
}
Needs to be changed to:
fun parameters(vararg parameters: Any?) {
parameters.forEach { parameter(it) }
}
That would then delegate to this function:
fun parameter(value: Any?) {
furtherOps += { it.processParameter(namelessParameterIndex.getAndIncrement(), value) }
}
which handles the auto-incrementing of indices for nameless parameters.
Sometimes it can be helpful for API's to allow the client to choose consuming a query as a Flowable
, an Observable
, or a Sequence
.
It would also be nice if the (ResultSet) -> T
operation was already set regardless of the return pipeline type.
Here is the solution:
class Pipeline<T: Any>(val selectOperation: SelectOperation,
val mapper: (ResultSet) -> T
) {
fun toObservable(): Observable<T> = selectOperation.toObservable(mapper)
fun toFlowable(): Flowable<T> = selectOperation.toFlowable(mapper)
fun toSequence(): ResultSetSequence<T> = selectOperation.toSequence(mapper)
fun blockingFirst() = selectOperation.blockingFirst(mapper)
fun blockingFirstOrNull() = selectOperation.blockingFirstOrNull(mapper)
}
I'm using rxkotlin-jdbc and a Hikari data source in a TornadoFX project and found that a connection is leaking whenever I use fun DataSource.execute
to delete objects from an underlying PostgreSQL database. I came across this by enabling logging with the JVM parameter -Dorg.slf4j.simpleLogger.log.com.zaxxer.hikari=debug
and setting the hikari config's leakDetectionThreshold
property to 60_000 ms, which showed the following output in my console:
[HikariPool-1 housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Pool stats (total=10, active=0, idle=10, waiting=0)
[HikariPool-1 housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Pool stats (total=10, active=1, idle=9, waiting=0)
[HikariPool-1 housekeeper] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool-1 - Pool stats (total=10, active=1, idle=9, waiting=0)
[HikariPool-1 housekeeper] WARN com.zaxxer.hikari.pool.ProxyLeakTask - Connection leak detection triggered for org.postgresql.jdbc.PgConnection@3fd29f40 on thread tornadofx-thread-2, stack trace follows
java.lang.Exception: Apparent connection leak detected
at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:100)
at org.nield.rxkotlinjdbc.DatasourceKt$execute$1.invoke(datasource.kt:78)
at org.nield.rxkotlinjdbc.DatasourceKt$execute$1.invoke(datasource.kt)
at org.nield.rxkotlinjdbc.PreparedStatementBuilder.toPreparedStatement(PreparedStatementBuilder.kt:80)
at org.nield.rxkotlinjdbc.UpdateOperation$toSingle$1.call(UpdateOperation.kt:38)
at org.nield.rxkotlinjdbc.UpdateOperation$toSingle$1.call(UpdateOperation.kt:6)
at io.reactivex.internal.operators.single.SingleDefer.subscribeActual(SingleDefer.java:36)
at io.reactivex.Single.subscribe(Single.java:3394)
at io.reactivex.Single.subscribe(Single.java:3380)
at <<my tornadofx view class>>$deleteSelected$1.invoke(<<my tornadofx view class>>:157)
at <<my tornadofx view class>>$deleteSelected$1.invoke(<<my tornadofx view class>>:17)
at tornadofx.FXTask.call(Async.kt:459)
at javafx.concurrent.Task$TaskCallable.call(Task.java:1423)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
o
The specific function in violation:
fun deleteRegistrations(registrations: List<Registration>, idexam: Int) = db.execute(
"DELETE FROM registered WHERE idexam = ? AND code in (${registrations.joinToString { "?" }});").parameter(
idexam).parameters(*registrations.map {
it.code
}.toTypedArray()).toSingle()
jdk: 1.8.0_144
kotlin version: 1.3.10
rxkotlinfx maven version: 2.2.2
hikaricp maven version: 3.2.0
Not sure this is a bug of Hikari or rxkotlin-jdbc, but I found that swapping the execute
to insert
(and toSingle()
to toSingle{true}
) fixes the leak. Seems like the connection is not being closed and returned to the pool correctly?
Currently these three "builders" mutate a central instance. To allow reusing them with different parameters, it might be good to do a clone-and-modify operation for each parameter()
and whereOptional()
call.
I'm trying to perform a batch insert into a PostgreSQL database. I'm trying to use a multi-insert statement (search for multirow values syntax at https://www.postgresql.org/docs/11/sql-insert.html) rather than a batch of single-insert statements. To this end, I'm building a template string with a set of parameter placeholders (question marks) and using the parameters method to set their values.
The code (call test
and pass a connection to a PostgreSQL database):
data class Person(val firstName: String, val lastName: String)
fun upsertTemplate(n: Int) = "INSERT INTO person (firstname, lastname) VALUES ${(1..n).joinToString { "(?,?)" }}"
fun upsert(persons: List<Person>, connection: Connection) {
val blockingGet = connection.insert(upsertTemplate(persons.size))
.parameters(persons.flatMap { listOf(it.firstName, it.lastName) })
.toObservable { 1 }
.toList()
.blockingGet()
}
fun test(connection: Connection) {
upsert(listOf(Person("john", "doe"), Person("jane", "doe")), connection)
}
When executing this, I get an exception:
org.postgresql.util.PSQLException: No value specified for parameter 1.
If instead I insert the parameters one at a time (looping over the parameters list), everything is working as expected:
fun upsert(persons: List<Person>, connection: Connection) {
val insertOperation = connection.insert(upsertTemplate(persons.size))
persons.flatMap { listOf(it.firstName, it.lastName) }
.forEach {
insertOperation.parameter(it)
}
val blockingGet = insertOperation
.toObservable { 1 }
.toList()
.blockingGet()
}
Am I doing something wrong here? I don't want to use batchExecute
because I don't want to perform a lot of single insert statements in 1 transaction, I want to use PostgreSQL's multirow insert.
In progress
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.